[jira] [Created] (KAFKA-7665) Replace BaseConsumerRecord with ConsumerRecord in MM

2018-11-20 Thread huxihx (JIRA)
huxihx created KAFKA-7665:
-

 Summary: Replace BaseConsumerRecord with ConsumerRecord in MM
 Key: KAFKA-7665
 URL: https://issues.apache.org/jira/browse/KAFKA-7665
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Affects Versions: 2.1.0
Reporter: huxihx


Replace deprecated `BaseConsumerRecord` with ConsumerRecord in MirrorMaker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Jenkins build is back to normal : kafka-2.1-jdk8 #59

2018-11-20 Thread Apache Jenkins Server
See 




Jenkins build is back to normal : kafka-trunk-jdk8 #3212

2018-11-20 Thread Apache Jenkins Server
See 




Build failed in Jenkins: kafka-trunk-jdk8 #3211

2018-11-20 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-7536: Initialize TopologyTestDriver with non-null topic (#5923)

--
[...truncated 2.38 MB...]

org.apache.kafka.connect.runtime.ConnectMetricsTest > 
testGettingGroupWithOddNumberOfTags PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testMetricGroupIdIdentity 
STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testMetricGroupIdIdentity 
PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testRecreateWithoutClose 
STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testRecreateWithoutClose 
PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testGettingGroupWithTags 
STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testGettingGroupWithTags 
PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testRecreateWithClose 
STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testRecreateWithClose 
PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testKafkaMetricsNotNull 
STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testKafkaMetricsNotNull 
PASSED

org.apache.kafka.connect.converters.ShortConverterTest > testBytesNullToNumber 
STARTED

org.apache.kafka.connect.converters.ShortConverterTest > testBytesNullToNumber 
PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testSerializingIncorrectType STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testSerializingIncorrectType PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testDeserializingHeaderWithTooManyBytes STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testDeserializingHeaderWithTooManyBytes PASSED

org.apache.kafka.connect.converters.ShortConverterTest > testNullToBytes STARTED

org.apache.kafka.connect.converters.ShortConverterTest > testNullToBytes PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testSerializingIncorrectHeader STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testSerializingIncorrectHeader PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testDeserializingDataWithTooManyBytes STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testDeserializingDataWithTooManyBytes PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testConvertingSamplesToAndFromBytes STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testConvertingSamplesToAndFromBytes PASSED

org.apache.kafka.connect.converters.FloatConverterTest > testBytesNullToNumber 
STARTED

org.apache.kafka.connect.converters.FloatConverterTest > testBytesNullToNumber 
PASSED

org.apache.kafka.connect.converters.FloatConverterTest > 
testSerializingIncorrectType STARTED

org.apache.kafka.connect.converters.FloatConverterTest > 
testSerializingIncorrectType PASSED

org.apache.kafka.connect.converters.FloatConverterTest > 
testDeserializingHeaderWithTooManyBytes STARTED

org.apache.kafka.connect.converters.FloatConverterTest > 
testDeserializingHeaderWithTooManyBytes PASSED

org.apache.kafka.connect.converters.FloatConverterTest > testNullToBytes STARTED

org.apache.kafka.connect.converters.FloatConverterTest > testNullToBytes PASSED

org.apache.kafka.connect.converters.FloatConverterTest > 
testSerializingIncorrectHeader STARTED

org.apache.kafka.connect.converters.FloatConverterTest > 
testSerializingIncorrectHeader PASSED

org.apache.kafka.connect.converters.FloatConverterTest > 
testDeserializingDataWithTooManyBytes STARTED

org.apache.kafka.connect.converters.FloatConverterTest > 
testDeserializingDataWithTooManyBytes PASSED

org.apache.kafka.connect.converters.FloatConverterTest > 
testConvertingSamplesToAndFromBytes STARTED

org.apache.kafka.connect.converters.FloatConverterTest > 
testConvertingSamplesToAndFromBytes PASSED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectSchemaless STARTED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectSchemaless PASSED

org.apache.kafka.connect.converters.ByteArrayConverterTest > testToConnectNull 
STARTED

org.apache.kafka.connect.converters.ByteArrayConverterTest > testToConnectNull 
PASSED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectBadSchema STARTED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectBadSchema PASSED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectNull STARTED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectNull PASSED

org.apache.kafka.connect.converters.ByteArrayConverterTest > testToConnect 
STARTED

org.apache.kafka.connect.converters.ByteArrayConverterTest > testToConnect 
PASSED

org.apache.kafka.connect.converters.ByteArrayConverterTest > testFromConnect 
STARTED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 

Build failed in Jenkins: kafka-2.1-jdk8 #58

2018-11-20 Thread Apache Jenkins Server
See 


Changes:

[lindong28] Bump version to 2.1.0

[lindong28] MINOR: Update version numbers to 2.1.1-SNAPSHOT

--
[...truncated 2.62 MB...]
org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideNonPrefixedCustomConfigsWithPrefixedConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideNonPrefixedCustomConfigsWithPrefixedConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > shouldAcceptAtLeastOnce STARTED

org.apache.kafka.streams.StreamsConfigTest > shouldAcceptAtLeastOnce PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldUseCorrectDefaultsWhenNoneSpecified STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldUseCorrectDefaultsWhenNoneSpecified PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldAllowSettingProducerEnableIdempotenceIfEosDisabled STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldAllowSettingProducerEnableIdempotenceIfEosDisabled PASSED

org.apache.kafka.streams.StreamsConfigTest > defaultSerdeShouldBeConfigured 
STARTED

org.apache.kafka.streams.StreamsConfigTest > defaultSerdeShouldBeConfigured 
PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSetDifferentDefaultsIfEosEnabled STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSetDifferentDefaultsIfEosEnabled PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > testGetProducerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > testGetProducerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowStreamsExceptionIfValueSerdeConfigFails STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowStreamsExceptionIfValueSerdeConfigFails PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldResetToDefaultIfConsumerAutoCommitIsOverridden STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldResetToDefaultIfConsumerAutoCommitIsOverridden PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowExceptionIfNotAtLestOnceOrExactlyOnce STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowExceptionIfNotAtLestOnceOrExactlyOnce PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldAllowSettingConsumerIsolationLevelIfEosDisabled STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldAllowSettingConsumerIsolationLevelIfEosDisabled PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSpecifyOptimizationWhenNotExplicitlyAddedToConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSpecifyOptimizationWhenNotExplicitlyAddedToConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > testGetGlobalConsumerConfigs 
STARTED

org.apache.kafka.streams.StreamsConfigTest > testGetGlobalConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedProducerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedProducerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig PASSED

org.apache.kafka.streams.StreamsConfigTest > 
testGetRestoreConsumerConfigsWithRestoreConsumerOverridenPrefix STARTED

org.apache.kafka.streams.StreamsConfigTest > 
testGetRestoreConsumerConfigsWithRestoreConsumerOverridenPrefix PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosEnabled
 STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosEnabled
 PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultProducerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultProducerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedGlobalConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedGlobalConsumerConfigs PASSED


[jira] [Resolved] (KAFKA-7528) Standardize on Min/Avg/Max Kafka metrics' default value

2018-11-20 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7528.
--
Resolution: Fixed

> Standardize on Min/Avg/Max Kafka metrics' default value
> ---
>
> Key: KAFKA-7528
> URL: https://issues.apache.org/jira/browse/KAFKA-7528
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
> Fix For: 2.2.0
>
>
> KIP-386: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-386%3A+Make+Min+metrics%27+default+value+consistent+with+Max+metrics|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=95652345]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-11-20 Thread Mayuresh Gharat
Hi Boyang,

Thanks for updating the KIP. This is a step good direction for stateful
applications and also mirroring applications whose latency is affected due
to the rebalance issues that we have today.

I had a few questions on the current version of the KIP :
For the effectiveness of the KIP, consumer with member.name set will *not
send leave group request* when they go offline

> By this you mean, even if the application has not called
> KafkaConsumer.poll() within session timeout, it will not be sending the
> LeaveGroup request, right?
>

Broker will maintain an in-memory mapping of {member.name → member.id} to
track member uniqueness.

> When is the member.name removed from this map?
>

Member.id must be set if the *member.name  *is already
within the map. Otherwise reply MISSING_MEMBER_ID

> How is this case handled on the client side? What is the application that
> is using the KafkaConsumer suppose to do in this scenario?
>

Session timeout is the timeout we will trigger rebalance when a member goes
offline for too long (not sending heartbeat request). To make static
membership effective, we should increase the default max session timeout to
30 min so that end user could config it freely.

> This would mean that it might take more time to detect unowned topic
> partitions and may cause delay for applications that perform data mirroring
> tasks. I discussed this with our sre and we have a suggestion to make here
> as listed below separately.
>

Currently there is a config called *rebalance timeout* which is configured
by consumer *max.poll.intervals*. The reason we set it to poll interval is
because consumer could only send request within the call of poll() and we
want to wait sufficient time for the join group request. When reaching
rebalance timeout, the group will move towards completingRebalance stage
and remove unjoined groups

> you meant remove unjoined members of the group, right ?
>

Currently there is a config called *rebalance timeout* which is configured
by consumer *max.poll.intervals*. The reason we set it to poll interval is
because consumer could only send request within the call of poll() and we
want to wait sufficient time for the join group request. When reaching
rebalance timeout, the group will move towards completingRebalance stage
and remove unjoined groups. This is actually conflicting with the design of
static membership, because those temporarily unavailable members will
potentially reattempt the join group and trigger extra rebalances.
Internally we would optimize this logic by having rebalance timeout only in
charge of stopping prepare rebalance stage, without removing non-responsive
members immediately.

> What do you mean by " Internally we would optimize this logic by having
> rebalance timeout only in charge of stopping prepare rebalance stage,
> without removing non-responsive members immediately." There would not be a
> full rebalance if the lagging consumer sent a JoinGroup request later,
> right ? If yes, can you highlight this in the KIP ?
>

Scale Up

> The KIP talks about scale up scenario but its not quite clear how we
> handle it. Are we adding a separate "expansion.timeout" or we adding status
> "learner" ?. Can you shed more light on how this is handled in the KIP, if
> its handled?
>


*Discussion*
Larger session timeouts causing latency rise for getting data for un-owned
topic partitions :

> I think Jason had brought this up earlier about having a way to say how
> many members/consumer hosts are you choosing to be in the consumer group.
> If we can do this, then in case of mirroring applications we can do this :
> Lets say we have a mirroring application that consumes from Kafka cluster
> A and produces to Kafka cluster B.
> Depending on the data and the Kafka cluster configuration, Kafka service
> providers can set a mirroring group saying that it will take, for example
> 300 consumer hosts/members to achieve the desired throughput and latency
> for mirroring and can have additional 10 consumer hosts as spare in the
> same group.
> So when the first 300 members/consumers to join the group will start
> mirroring the data from Kafka cluster A to Kafka cluster B.
> The remaining 10 consumer members can sit idle.
> The moment one of the consumer (for example: consumer number 54) from the
> first 300 members go out of the group (crossed session timeout), it (the
> groupCoordinator) can just assign the topicPartitions from the consumer
> member 54 to one of the spare hosts.
> Once the consumer member 54 comes back up, it can start as being a part of
> the spare pool.
> This enables us to have lower session timeouts and low latency mirroring,
> in cases where the service providers are OK with having spare hosts.
> This would mean that we would tolerate n consumer members leaving and
> rejoining the group and still provide low latency as long as n <= number of
> spare consumers.
> If there are no spare host available, we can get back to the 

Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-11-20 Thread Boyang Chen
Thanks Konstantine for correcting the details within proposal for me! Addressed 
them below:


> In a few places the new/proposed changes are referred to as "current".
> Which is a bit confusing considering that there is a protocol in place
> already, and by "current" someone might understand the existing one.

Fixed the `current` usage with `proposed`!

> There's the following sentence in the "Public Interfaces" section:
>"Since for many stateful consumer/stream applications, the state shuffling
> is more painful than short time partial unavailability."
> However, my understanding is that the changes proposed with KIP-345 will
> not exploit any partial availability.
We are proposing to extend session timeout inside static membership and change 
rebalance timeout not to remove unjoined members, which means we would detect a 
consumer failure slower from broker side perspective. This is what I mean by 
"partial unavailability": some topic partition is not making progress due to 
consumer dead/hanging.

> In the rejected alternatives, under point 2) I read "we can copy the member
> id to the config files". I believe it means to say "member name" unless I'm
> missing something about reusing member ids
Updated that section!

Let me know if this makes sense to you!

Boyang

From: Konstantine Karantasis 
Sent: Wednesday, November 21, 2018 2:18 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by 
specifying member id

Hi Boyang.

Thanks for preparing this KIP! It is making good progress and will be a
great improvement for stateful Kafka applications.

Apologies for my late reply, I was away for a while. Lots of great comments
so far, so I'll probably second most of them in what I suggest below at
this point.

When I first read the KIP, I wanted to start at the end with something that
wasn't highlighted a lot. That was the topic related to handling duplicate
members. I see now that the initial suggestion of handling this situation
during offset commit has been removed, and I agree with that. Issues
related to membership seem to be handled better when the member joins the
group rather than when it tries to commit offsets. This also simplifies how
many request types need to change in order to incorporate the new member
name field.

I also agree with what Jason and Guozhang have said regarding timeouts.
Although semantically, it's easier to think of every operation having its
own timeout, operationally this can become a burden. Thus, consolidation
seems preferable here. The definition of embedded protocols on top of the
base group membership protocol for rebalancing gives enough flexibility to
address such needs in each client component separately.

Finally, some minor comments:
In a few places the new/proposed changes are referred to as "current".
Which is a bit confusing considering that there is a protocol in place
already, and by "current" someone might understand the existing one. I'd
recommend using new/proposed or equivalent when referring to changes
introduced with KIP-345 and current/existing or equivalent when referring
to existing behavior.

There's the following sentence in the "Public Interfaces" section:
"Since for many stateful consumer/stream applications, the state shuffling
is more painful than short time partial unavailability."
However, my understanding is that the changes proposed with KIP-345 will
not exploit any partial availability. A suggestion for dealing with
temporary imbalances has been made in "Incremental Cooperative Rebalancing"
which can work well with KIP-345, but here I don't see proposed changes
that suggest that some resources (e.g. partitions) will keep being used
while others will not be utilized. Thus, you might want to adjust this
sentence. Correct me if I'm missing something related to that.

In the rejected alternatives, under point 2) I read "we can copy the member
id to the config files". I believe it means to say "member name" unless I'm
missing something about reusing member ids. Also below I read: "By allowing
consumers to optionally specifying a member id" which probably implies
"member name" again. In a sense this section highlights a potential
confusion between member name and member id. I wonder if we could come up
with a better term for the new field. StaticTag, StaticLabel, or even
StaticName are some suggestions that could potentially help with confusion
between MemberId and MemberName and what corresponds to what. But I
wouldn't like to disrupt the discussion with naming conventions too much at
this point. I just mention it here as a thought.

Looking forward to see the final details of this KIP. Great work so far!

Konstantine


On Tue, Nov 20, 2018 at 4:23 AM Boyang Chen  wrote:

> Thanks Guozhang for the great summary here, and I have been following up
> the action items here.
>
>
>   1.  I already updated the KIP to remove the expansion timeout and
> registration timeout. 

[jira] [Resolved] (KAFKA-7536) TopologyTestDriver cannot pre-populate KTable or GlobalKTable

2018-11-20 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7536.
--
   Resolution: Fixed
 Assignee: Guozhang Wang
Fix Version/s: 2.0.2
   2.1.1
   2.2.0

> TopologyTestDriver cannot pre-populate KTable or GlobalKTable
> -
>
> Key: KAFKA-7536
> URL: https://issues.apache.org/jira/browse/KAFKA-7536
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Dmitry Minkovsky
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 2.2.0, 2.1.1, 2.0.2
>
>
> I have a GlobalKTable that's defined as
> {code}
> GlobalKTable userIdsByEmail = topology  
>.globalTable(USER_IDS_BY_EMAIL.name,
>USER_IDS_BY_EMAIL.consumed(),
>Materialized.as("user-ids-by-email"));
> {code}
> And the following test in Spock:
> {code}
> def topology = // my topology
> def driver = new TopologyTestDriver(topology, config())
> def cleanup() {
> driver.close()
> }
> def "create from email request"() {
> def store = driver.getKeyValueStore('user-ids-by-email')
> store.put('string', ByteString.copyFrom(new byte[0]))
> // more, but it fails at the `put` above
> {code}
> When I run this, I get the following:
> {code}
> [2018-10-23 19:35:27,055] INFO 
> (org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock 
> Restoring state for global store user-ids-by-email
> java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
>   at pony.message.MessageWriteStreamsTest.create from mailgun email 
> request(MessageWriteStreamsTest.groovy:52)
> [2018-10-23 19:35:27,189] INFO 
> (org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread 
> [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
> {code}
> The same issue applies to KTable.
> I've noticed that I can {{put()}} to the store if I first write to it with 
> {{driver.pipeInput}}. But otherwise I get the above error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-258: Allow to Store Record Timestamps in RocksDB

2018-11-20 Thread Matthias J. Sax
It's an interesting idea to use second store, to maintain the
timestamps. However, each RocksDB instance implies some overhead. In
fact, we are looking into ColumnFamilies atm to see if we can use those
and merge multiple RocksDBs into a single one to reduce this overhead.

-Matthias

On 11/20/18 5:15 AM, Patrik Kleindl wrote:
> Hi Adam
> 
> Sounds great, I was already planning to ask around if anyone had tackled
> this.
> We have a use case very similar to what you described in KAFKA-4212, only
> with Global State Stores.
> I have tried a few things with the normal DSL but was not really successful.
> Schedule/Punctuate is not possible, supplying a windowed store is also not
> allowed and the process method has no knowledge of the timestamp of the
> record.
> And anything loaded on startup is not filtered anyway.
> 
> Regarding 4212, wouldn't it be easier (although a little less
> space-efficient) to track the Timestamps in a separate Store with 
> ?
> This would leave the original store intact and allow a migration of the
> timestamps without touching the other data.
> 
> So I am very interested in your PR :-)
> 
> best regards
> 
> Patrik
> 
> On Tue, 20 Nov 2018 at 04:46, Adam Bellemare 
> wrote:
> 
>> Hi Matthias
>>
>> Thanks - I figured that it was probably a case of just too much to do and
>> not enough time. I know how that can go. I am asking about this one in
>> relation to https://issues.apache.org/jira/browse/KAFKA-4212, adding a TTL
>> to RocksDB. I have outlined a bit about my use-case within 4212, but for
>> brevity here it is:
>>
>> My case:
>> 1) I have a RocksDB with TTL implementation working where records are aged
>> out using the TTL that comes with RocksDB (very simple).
>> 2) We prevent records from loading from the changelog if recordTime + TTL <
>> referenceTimeStamp (default = System.currentTimeInMillis() ).
>>
>> This assumes that the records are stored with the same time reference (say
>> UTC) as the consumer materializing the RocksDB store.
>>
>> My questions about KIP-258 are as follows:
>> 1) How does "we want to be able to store record timestamps in KTables"
>> differ from inserting records into RocksDB with TTL at consumption time? I
>> understand that it could be a difference of some seconds, minutes, hours,
>> days etc between when the record was published and now, but given the
>> nature of how RocksDB TTL works (eventual - based on compaction) I don't
>> see how a precise TTL can be achieved, such as that which one can get with
>> windowed stores.
>>
>> 2) Are you looking to change how records are inserted into a TTL RocksDB,
>> such that the TTL would take effect from the record's published time? If
>> not, what would be the ideal workflow here for a single record with TTL
>> RocksDB?
>> ie: Record Timestamp: 100
>> TTL: 50
>> Record inserted into rocksDB: 110
>> Record to expire at 150?
>>
>> 3) I'm not sure I fully understand the importance of the upgrade path. I
>> have read the link to (https://issues.apache.org/jira/browse/KAFKA-3522)
>> in
>> the KIP, and I can understand that a state-store on disk may not represent
>> what the application is expecting. I don't think I have the full picture
>> though, because that issue seems to be easy to fix with a simple versioned
>> header or accompanying file, forcing the app to rebuild the state if the
>> version is incompatible. Can you elaborate or add a scenario to the KIP
>> that illustrates the need for the upgrade path?
>>
>> Thanks,
>>
>> Adam
>>
>>
>>
>>
>> On Sun, Nov 11, 2018 at 1:43 PM Matthias J. Sax 
>> wrote:
>>
>>> Adam,
>>>
>>> I am still working on it. Was pulled into a lot of other tasks lately so
>>> this was delayed. Also had some discussions about simplifying the
>>> upgrade path with some colleagues and I am prototyping this atm. Hope to
>>> update the KIP accordingly soon.
>>>
>>> -Matthias
>>>
>>> On 11/10/18 7:41 AM, Adam Bellemare wrote:
 Hello Matthias

 I am curious as to the status of this KIP. TTL and expiry of records
>> will
 be extremely useful for several of our business use-cases, as well as
 another KIP I had been working on.

 Thanks



 On Mon, Aug 13, 2018 at 10:29 AM Eno Thereska 
 wrote:

> Hi Matthias,
>
> Good stuff. Could you comment a bit on how future-proof is this
>> change?
>>> For
> example, if we want to store both event timestamp "and" processing
>> time
>>> in
> RocksDB will we then need another interface (e.g. called
> KeyValueWithTwoTimestampsStore)?
>
> Thanks
> Eno
>
> On Thu, Aug 9, 2018 at 2:30 PM, Matthias J. Sax <
>> matth...@confluent.io>
> wrote:
>
>> Thanks for your input Guozhang and John.
>>
>> I see your point, that the upgrade API is not simple. If you don't
>> thinks it's valuable to make generic store upgrades possible (atm),
>> we
>> can make the API internal, too. The impact is, that we only support a
>> predefined set up 

Re: Need to subscribe to mail list and get access to contribute to jira tickets

2018-11-20 Thread Matthias J. Sax
Mailing list subscription is self-service: https://kafka.apache.org/contact

On 11/20/18 8:39 AM, Manikumar wrote:
> Hi,
> 
> I have given JIRA permissions for "kaushik srinivas" JIRA username.
> 
> On Tue, Nov 20, 2018 at 3:29 PM KAUSHIK SRINIVAS <
> kaushiksrinivas...@gmail.com> wrote:
> 
>> Hi,
>>
>> Need subscription to kafka mailing list.
>>
>> Also need to assign jira tickets to myself. Have worked on few pull
>> requests and need to submit the code.
>>
>> Need support in getting the required permissions to assign the kafka jira
>> ticket to myself.
>>
>> Thanks & Regards,
>> kaushik
>>
> 



signature.asc
Description: OpenPGP digital signature


Jenkins build is back to normal : kafka-trunk-jdk11 #107

2018-11-20 Thread Apache Jenkins Server
See 




Build failed in Jenkins: kafka-trunk-jdk8 #3210

2018-11-20 Thread Apache Jenkins Server
See 


Changes:

[manikumar.reddy] KAFKA-7616; Make MockConsumer only add entries to the 
partition map

--
[...truncated 2.63 MB...]
org.apache.kafka.connect.runtime.ConnectMetricsTest > 
testGettingGroupMultipleTimes PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testCreatingTags STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testCreatingTags PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > 
testMetricGroupIdWithoutTags STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > 
testMetricGroupIdWithoutTags PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > 
testCreatingTagsWithOddNumberOfTags STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > 
testCreatingTagsWithOddNumberOfTags PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > 
testGettingGroupWithOddNumberOfTags STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > 
testGettingGroupWithOddNumberOfTags PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testMetricGroupIdIdentity 
STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testMetricGroupIdIdentity 
PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testRecreateWithoutClose 
STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testRecreateWithoutClose 
PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testGettingGroupWithTags 
STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testGettingGroupWithTags 
PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testRecreateWithClose 
STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testRecreateWithClose 
PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testKafkaMetricsNotNull 
STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testKafkaMetricsNotNull 
PASSED

org.apache.kafka.connect.converters.ShortConverterTest > testBytesNullToNumber 
STARTED

org.apache.kafka.connect.converters.ShortConverterTest > testBytesNullToNumber 
PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testSerializingIncorrectType STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testSerializingIncorrectType PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testDeserializingHeaderWithTooManyBytes STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testDeserializingHeaderWithTooManyBytes PASSED

org.apache.kafka.connect.converters.ShortConverterTest > testNullToBytes STARTED

org.apache.kafka.connect.converters.ShortConverterTest > testNullToBytes PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testSerializingIncorrectHeader STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testSerializingIncorrectHeader PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testDeserializingDataWithTooManyBytes STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testDeserializingDataWithTooManyBytes PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testConvertingSamplesToAndFromBytes STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testConvertingSamplesToAndFromBytes PASSED

org.apache.kafka.connect.converters.FloatConverterTest > testBytesNullToNumber 
STARTED

org.apache.kafka.connect.converters.FloatConverterTest > testBytesNullToNumber 
PASSED

org.apache.kafka.connect.converters.FloatConverterTest > 
testSerializingIncorrectType STARTED

org.apache.kafka.connect.converters.FloatConverterTest > 
testSerializingIncorrectType PASSED

org.apache.kafka.connect.converters.FloatConverterTest > 
testDeserializingHeaderWithTooManyBytes STARTED

org.apache.kafka.connect.converters.FloatConverterTest > 
testDeserializingHeaderWithTooManyBytes PASSED

org.apache.kafka.connect.converters.FloatConverterTest > testNullToBytes STARTED

org.apache.kafka.connect.converters.FloatConverterTest > testNullToBytes PASSED

org.apache.kafka.connect.converters.FloatConverterTest > 
testSerializingIncorrectHeader STARTED

org.apache.kafka.connect.converters.FloatConverterTest > 
testSerializingIncorrectHeader PASSED

org.apache.kafka.connect.converters.FloatConverterTest > 
testDeserializingDataWithTooManyBytes STARTED

org.apache.kafka.connect.converters.FloatConverterTest > 
testDeserializingDataWithTooManyBytes PASSED

org.apache.kafka.connect.converters.FloatConverterTest > 
testConvertingSamplesToAndFromBytes STARTED

org.apache.kafka.connect.converters.FloatConverterTest > 
testConvertingSamplesToAndFromBytes PASSED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectSchemaless STARTED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectSchemaless PASSED

org.apache.kafka.connect.converters.ByteArrayConverterTest > testToConnectNull 
STARTED


[jira] [Resolved] (KAFKA-7577) Semantics of Table-Table Join with Null Message Are Incorrect

2018-11-20 Thread Daren Thomas (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daren Thomas resolved KAFKA-7577.
-
Resolution: Not A Problem

> Semantics of Table-Table Join with Null Message Are Incorrect
> -
>
> Key: KAFKA-7577
> URL: https://issues.apache.org/jira/browse/KAFKA-7577
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Daren Thomas
>Priority: Major
>
> Observed behavior of Table-Table join with a Null Message does not match the 
> semantics described in the documentation 
> ([https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#ktable-ktable-join).]
>   The expectation is:
>  * Message A results in [A, null] from the Left Join
>  * Message null (tombstone) results in null (tombstone) from the Left Join
>  The observed behavior was that the null (tombstone) message did not pass 
> through the Left Join to the output topic like expected.  This behavior was 
> observed with and without caching enabled, against a test harness, and 
> against a local Confluent 5.0.0 platform.  It was also observed that the 
> KTableKTableLeftJoinProcessor.process() function was not called for the null 
> (tombstone) message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

2018-11-20 Thread ChienHsing Wu
Hi Matt,

Thanks for the feedback. 

The issue with the current design is that it stays on the previous partition 
even if the last poll call consumes the max.poll.records; it will consume all 
records in that partition available at the consumer side to serve multiple poll 
calls before moving to the next partition. 

Introducing another threshold at partition level will decrease the number of 
records consumed in one partition within one poll call but will still use that 
same partition as the starting one in the next poll call. 

The same effect can be achieved by setting max.poll.records to 100 I believe. 
The main difference is that the client will need to make more poll calls when 
that value is set to 100, and because of the non-blocking nature I believe the 
cost of extra poll calls are not significant. 

Further thoughts?

Thanks, CH

-Original Message-
From: Matt Farmer  
Sent: Monday, November 19, 2018 9:32 PM
To: dev@kafka.apache.org
Subject: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across 
Partitions in KafkaConsumer

Hi there,

Thanks for the KIP.

We’ve run into issues with this at Mailchimp so something to address consuming 
behavior would save us from having to always ensure we’re running enough 
consumers that each consumer has only one partition (which is our usual MO).

I wonder though if it would be simpler and more powerful to define the maximum 
number of records the consumer should pull from one partition before pulling 
some records from another?

So if you set max.poll.records to 500 and then some new setting, 
max.poll.records.per.partition, to 100 then the Consumer would switch what 
partition it reads from every 100 records - looping back around to the first 
partition that had records if there aren’t 5 or more partitions with records.

What do you think?

On Mon, Nov 19, 2018 at 9:11 AM ChienHsing Wu  wrote:

> Hi, could anyone please review this KIP?
>
> Thanks, ChienHsing
>
> From: ChienHsing Wu
> Sent: Friday, November 09, 2018 1:10 PM
> To: dev@kafka.apache.org
> Subject: RE: [DISCUSS] KIP-387: Fair Message Consumption Across 
> Partitions in KafkaConsumer
>
> Just to check: Will anyone review this? It's been silent for a week...
> Thanks, ChienHsing
>
> From: ChienHsing Wu
> Sent: Monday, November 05, 2018 4:18 PM
> To: 'dev@kafka.apache.org'  dev@kafka.apache.org>>
> Subject: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions 
> in KafkaConsumer
>
> Hi I just put together the KIP page as requested. This email is to 
> start the discussion thread.
>
> KIP: KIP-387: Fair Message Consumption Across Partitions in 
> KafkaConsumer< 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_
> confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsumption
> -2BAcross-2BPartitions-2Bin-2BKafkaConsumer=DwIFaQ=ZgVRmm3mf2P1-XD
> AyDsu4A=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I=HhHPjfcrk8XioE
> m16n75UIKYwi8c8YrzVrp5tBK7LX8=gBGG4GvzPu-xhQ-uUqlq30U-bzwcKZ_lNP1bF5
> 49_KU=
> >
> Pull Request: 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache
> _kafka_pull_5838=DwIFaQ=ZgVRmm3mf2P1-XDAyDsu4A=Az03wMrbL9ToLW0OF
> yo3wo3985rhAKPMLmmg6RA3V7I=HhHPjfcrk8XioEm16n75UIKYwi8c8YrzVrp5tBK7L
> X8=cJ2JGXAUQx4ymtMv_MLtGq7QiUJV3xBzKcS_Nwla08A=
> Jira: 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org
> _jira_browse_KAFKA-2D3932=DwIFaQ=ZgVRmm3mf2P1-XDAyDsu4A=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I=HhHPjfcrk8XioEm16n75UIKYwi8c8YrzVrp5tBK7LX8=TfIIF2Ui9YEVxxwAbko0j-fT_mMVHf5Yywapc0w8eEA=
>
> Thanks, CH
>


[RESULTS] [VOTE] Release Kafka version 2.1.0

2018-11-20 Thread Dong Lin
This vote passes with 10 +1 votes (3 bindings) and no 0 or -1 votes.

+1 votes

PMC Members:

* Guozhang Wang
* Jason Gustafson
* Dong Lin


Committers:
* Vahid Hashemian
* ManiKumar Reddy


Community:
* Jonathan Santilli
* Eno Thereska
* Andras Beni

* Jakub Scholz

* Satish Duggana


0 votes
* No votes


-1 votes
* No votes

Vote thread:https://markmail.org/message/qzg3xhduj3otodkr

I will continue with the release process and send announcement email.

Cheers,

Dong


[jira] [Resolved] (KAFKA-6971) Passing in help flag to kafka-console-producer should print arg options

2018-11-20 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-6971.
--
Resolution: Duplicate

> Passing in help flag to kafka-console-producer should print arg options
> ---
>
> Key: KAFKA-6971
> URL: https://issues.apache.org/jira/browse/KAFKA-6971
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, producer 
>Affects Versions: 1.1.0
>Reporter: Yeva Byzek
>Priority: Minor
>  Labels: newbie
>
> {{kafka-console-consumer --help}} prints "help is not a recognized option" as 
> well as output of options
> {{kafka-console-producer --help}} prints "help is not a recognized option" 
> but no output of options
> Possible solutions:
> (a) Enhance {{kafka-console-producer}} to also print out all options when a 
> user passes in an unrecognized option
> (b) Enhance both {{kafka-console-producer}} and {{kafka-console-consumer}} to 
> legitimately accept the {{--help}} flag



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-11-20 Thread Konstantine Karantasis
Hi Boyang.

Thanks for preparing this KIP! It is making good progress and will be a
great improvement for stateful Kafka applications.

Apologies for my late reply, I was away for a while. Lots of great comments
so far, so I'll probably second most of them in what I suggest below at
this point.

When I first read the KIP, I wanted to start at the end with something that
wasn't highlighted a lot. That was the topic related to handling duplicate
members. I see now that the initial suggestion of handling this situation
during offset commit has been removed, and I agree with that. Issues
related to membership seem to be handled better when the member joins the
group rather than when it tries to commit offsets. This also simplifies how
many request types need to change in order to incorporate the new member
name field.

I also agree with what Jason and Guozhang have said regarding timeouts.
Although semantically, it's easier to think of every operation having its
own timeout, operationally this can become a burden. Thus, consolidation
seems preferable here. The definition of embedded protocols on top of the
base group membership protocol for rebalancing gives enough flexibility to
address such needs in each client component separately.

Finally, some minor comments:
In a few places the new/proposed changes are referred to as "current".
Which is a bit confusing considering that there is a protocol in place
already, and by "current" someone might understand the existing one. I'd
recommend using new/proposed or equivalent when referring to changes
introduced with KIP-345 and current/existing or equivalent when referring
to existing behavior.

There's the following sentence in the "Public Interfaces" section:
"Since for many stateful consumer/stream applications, the state shuffling
is more painful than short time partial unavailability."
However, my understanding is that the changes proposed with KIP-345 will
not exploit any partial availability. A suggestion for dealing with
temporary imbalances has been made in "Incremental Cooperative Rebalancing"
which can work well with KIP-345, but here I don't see proposed changes
that suggest that some resources (e.g. partitions) will keep being used
while others will not be utilized. Thus, you might want to adjust this
sentence. Correct me if I'm missing something related to that.

In the rejected alternatives, under point 2) I read "we can copy the member
id to the config files". I believe it means to say "member name" unless I'm
missing something about reusing member ids. Also below I read: "By allowing
consumers to optionally specifying a member id" which probably implies
"member name" again. In a sense this section highlights a potential
confusion between member name and member id. I wonder if we could come up
with a better term for the new field. StaticTag, StaticLabel, or even
StaticName are some suggestions that could potentially help with confusion
between MemberId and MemberName and what corresponds to what. But I
wouldn't like to disrupt the discussion with naming conventions too much at
this point. I just mention it here as a thought.

Looking forward to see the final details of this KIP. Great work so far!

Konstantine


On Tue, Nov 20, 2018 at 4:23 AM Boyang Chen  wrote:

> Thanks Guozhang for the great summary here, and I have been following up
> the action items here.
>
>
>   1.  I already updated the KIP to remove the expansion timeout and
> registration timeout. Great to see them being addressed in client side!
>   2.  I double checked the design and I believe that it is ok to have both
> static member and dynamic member co-exist in the same group. So the upgrade
> shouldn't be destructive and we are removing the two membership protocol
> switching APIs.
>   3.  I only have question about this one. I'm still reading the KafkaApis
> code here. Should I just use the same authorization logic for
> ForceStaticRebalanceRequest as JoinGroupRequest?
>   4.  I'm very excited to see this work with K8! Like you suggested, this
> feature could be better addressed in a separate KIP because it is pretty
> independent. I could start drafting the KIP once the current proposal is
> approved.
>   5.  I believe that we don't need fencing in offset commit request, since
> duplicate member.name issue could be handled by join group request. We
> shall reject join group with known member name but no member id (which
> means we already have an active member using this identity).
>   6.  I agree to remove that internal config once we move forward with
> static membership. And I already removed the entire section from the KIP.
>
> Let me know if you have other concerns.
>
> Best,
> Boyang
> 
> From: Guozhang Wang 
> Sent: Tuesday, November 20, 2018 4:21 PM
> To: dev
> Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> specifying member id
>
> Hello Boyang,
>
> Thanks a lot for the KIP! It is a great write-up and I appreciate 

Re: [VOTE] 2.1.0 RC1

2018-11-20 Thread Dong Lin
Thanks everyone for your test and vote!

We have removed those unnecessary files (as Ismael mentioned) from the
staging repository and this release has collected enough votes. I will
continue with the release process.

On Tue, Nov 20, 2018 at 12:08 AM Jason Gustafson  wrote:

> +1
>
> I verified the release and upgrade notes. I also went through the basic
> quickstart.
>
> Great job running the release, Dong! Thanks for all the effort.
>
> -Jason
>
> On Mon, Nov 19, 2018 at 10:50 AM, Dong Lin  wrote:
>
> > Hey Ismael,
> >
> > I checked out a clean copy of Kafka and reuploaded artifacts for
> 2.1.0-rc1
> > without code change. There are still those new files in
> > https://repository.apache.org/content/groups/staging/org/
> > apache/kafka/kafka_2.12/2.1.0.
> > I compared 2.0 and 2.1 branch but did not find any suspicious change in
> > release.py and build.gradle.
> >
> > Since doing a new release could not address this right away and there is
> no
> > known impact on user due to these redundant files, I am inclined to still
> > release 2.1.0-rc1 so that user can start to use the new features soon.
> What
> > do you think?
> >
> > Thanks,
> > Dong
> >
> >
> > On Mon, Nov 19, 2018 at 2:16 AM Dong Lin  wrote:
> >
> > > Hey Ismael,
> > >
> > > Thanks much for catching this! Sorry I didn't catch this issue before.
> > >
> > > These files were uploaded by release.py scrip in the repo.
> > kafka_2.12-2.1.
> > > 0.mapping contains the following string and the other files are the
> > > signature and hash of the file kafka_2.12-2.1.0.mapping:
> > >
> > >
> > > /home/dolin/research/kafka/.release_work_dir/kafka/core/
> > build/libs/kafka_2.12-2.1.0.jar
> > >
> > >
> /home/dolin/research/kafka/.release_work_dir/kafka/core/build/tmp/scala/
> > compilerAnalysis/compileScala.analysis
> > >
> > > It is weird to have these files.. Let me generate another release
> > > candidate and try to fix this issue.
> > >
> > > Thanks,
> > > Dong
> > >
> >
>


[jira] [Created] (KAFKA-7664) Docs kafka.apache.org does not include CLI to re-assign replicas between disks of the same broker

2018-11-20 Thread Sudarshan Pathalam (JIRA)
Sudarshan Pathalam created KAFKA-7664:
-

 Summary: Docs kafka.apache.org does not include CLI to re-assign 
replicas between disks of the same broker
 Key: KAFKA-7664
 URL: https://issues.apache.org/jira/browse/KAFKA-7664
 Project: Kafka
  Issue Type: Improvement
  Components: website
Affects Versions: 2.0.1
Reporter: Sudarshan Pathalam


KIP 113 Support replicas movement between log directories 
KAFKA-5163 in resolved state with fix in AK 1.1

Please include CLI and description to re-assign replicas between disks of the 
same broker



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7616) MockConsumer can return ConsumerRecords objects with a non-empty map but no records

2018-11-20 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-7616.
--
   Resolution: Fixed
Fix Version/s: 2.2.0

Issue resolved by pull request 5901
[https://github.com/apache/kafka/pull/5901]

> MockConsumer can return ConsumerRecords objects with a non-empty map but no 
> records
> ---
>
> Key: KAFKA-7616
> URL: https://issues.apache.org/jira/browse/KAFKA-7616
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.1
>Reporter: Stig Rohde Døssing
>Assignee: Stig Rohde Døssing
>Priority: Trivial
> Fix For: 2.2.0
>
>
> The ConsumerRecords returned from MockConsumer.poll can return false for 
> isEmpty while not containing any records. This behavior is because 
> MockConsumer.poll eagerly adds entries to the returned Map List>, based on which partitions have been added. If no 
> records are returned for a partition, e.g. because the position was too far 
> ahead, the entry for that partition will still be there.
>  
> The MockConsumer should lazily add entries to the map as they are needed, 
> since it is more in line with how the real consumer behaves.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7663) Custom Processor supplied on addGlobalStore is not used when restoring state from topic

2018-11-20 Thread Frederic Tardif (JIRA)
Frederic Tardif created KAFKA-7663:
--

 Summary: Custom Processor supplied on addGlobalStore is not used 
when restoring state from topic
 Key: KAFKA-7663
 URL: https://issues.apache.org/jira/browse/KAFKA-7663
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0
Reporter: Frederic Tardif
 Attachments: image-2018-11-20-11-42-14-697.png

I have implemented a StreamBuilder#{{addGlobalStore}} supplying a custom 
processor responsible to transform a K,V record from the input stream into a 
V,K records. It works fine and my {{store.all()}} does print the correct 
persisted V,K records. However, if I clean the local store and restart the 
stream app, the global table is reloaded but without going through the 
processor supplied; instead, it calls {{GlobalStateManagerImp#restoreState}} 
which simply stores the input topic K,V records into rocksDB (hence bypassing 
the mapping function of my custom processor). I believe this must not be the 
expected result?

 

this is a follow up on stackoverflow discussion around storing a K,V topic as a 
global table with some stateless transformations based on a "custom" processor 
added on the global store:

[https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic#comment93591818_50993729]

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7659) dummy test

2018-11-20 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-7659.
--
Resolution: Invalid

> dummy test
> --
>
> Key: KAFKA-7659
> URL: https://issues.apache.org/jira/browse/KAFKA-7659
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: kaushik srinivas
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Need to subscribe to mail list and get access to contribute to jira tickets

2018-11-20 Thread Manikumar
Hi,

I have given JIRA permissions for "kaushik srinivas" JIRA username.

On Tue, Nov 20, 2018 at 3:29 PM KAUSHIK SRINIVAS <
kaushiksrinivas...@gmail.com> wrote:

> Hi,
>
> Need subscription to kafka mailing list.
>
> Also need to assign jira tickets to myself. Have worked on few pull
> requests and need to submit the code.
>
> Need support in getting the required permissions to assign the kafka jira
> ticket to myself.
>
> Thanks & Regards,
> kaushik
>


[jira] [Created] (KAFKA-7662) Avro schema upgrade not supported on globalTable

2018-11-20 Thread Frederic Tardif (JIRA)
Frederic Tardif created KAFKA-7662:
--

 Summary: Avro schema upgrade not supported on globalTable 
 Key: KAFKA-7662
 URL: https://issues.apache.org/jira/browse/KAFKA-7662
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 1.0.0
Reporter: Frederic Tardif


I did quite a bit of testing around the avro upgrades, and it did not behave as 
I would have expected when the avro is used as a Key for a global table with a 
rocksDB store

setup:
 * local confluent suite 4.0.2
 * test with stream app and producer (v 1.0.0)
 * schemas (key) :

schemas :
{code:java}
schema version @1
{
"namespace": "com.bell.cts.livecms.livemedia.topic",
"type" : "record",
"name" : "EventKey",
"fields" : [
{"name" : "keyAttribute1", "type" : "string"}
]
}
schema version @2
{
"namespace": "com.bell.cts.livecms.livemedia.topic",
"type" : "record",
"name" : "EventKey",
"fields" : [
{"name" : "keyAttribute1", "type" : "string"},
{"name" : "keyAttribute2", "type" : ["null", "string"], "default" : null}
]
}{code}
 
 * TEST1 (PASS)
 ** using schema version @1 
 ** produce record1=[k@1, v@1] 
 ** stream apps loads record1 in global table and store locally in rocksdb 
 ** asyncAssert that store.get(k@1)=v@1 : PASS
 * TEST2 (PASS)
 ** using schema version @1
 ** delete local store (and checkpoint)
 ** stream apps loads record1 in global table and store locally in rocksdb
 ** asyncAssert that store.get(k@1)=v@1 : PASS
 * TEST3 (FAIL)
 ** using schema version @2 
 ** keep local store
 ** stream apps does not reload record1 from topic because of local offset
 ** asyncAssert that store.get(k@1)=v@1 : FAIL
 ** however store.all().next().key.equals(k@2) , as built using schema version 2
 ** this would be explained by the fact that the rocksdb store has some magic 
byte persisted of the record based on schema version 1
 ** Not ideal, but I could consider accceptable to delete the local store in 
this cases.
 * TEST4 (FAIL)
 ** using schema version @2
 ** delete local store (and checkpoint)
 ** stream apps loads record1 (produced from schema @1) in global table and 
store locally in rocksdb
 ** asyncAssert that store.get(k@2)=v@2 : FAIL
 ** however store.all().next().key.equals(k@2) , as built using schema version 2
 ** I can't quite understand this one. I would have expected that the rockdb 
store should now be provisioned with a serialized version of the record based 
on the schema v2 (as it went though the stream app underpinning the store 
materialization)
 * TEST5 (FAIL)
 ** using schema version @2 
 ** produce record2=[k@2, v@2] (meant to be backward compatible and logically 
equals to record1) 
 ** stream apps does the processing of record1(produced from schema @1) and 
record2 (produced from schema @2) and materialize the global table stored 
locally in rocksdb
 ** asyncAssert that store.get(k@2)=v@2 : PASS but the store now has 2 entries 
!!!
 ** it looks as if the stream.groupBy(key) of the topic underpinning the 
globaltable materialization did not group the 2 record keys together, although 
record1.key.equals(record2.key) is true in Java (by looping in the store)

reading from the upstream raw topic throughout the testing :
{code:java}
/tmp$ kafka-avro-console-consumer --topic topic-test-5 --bootstrap-server 
localhost:9092 --property schema.registry.url=http://127.0.0.1:8081 --property 
print.key=true --from-beginning 
{"keyAttribute1":"key-attribute-1"} {"valueAttribute1":"value-1"}
{"keyAttribute1":"key-attribute-1"} {"valueAttribute1":"value-1"}
{"keyAttribute1":"key-attribute-1"} {"valueAttribute1":"value-1"}
{"keyAttribute1":"key-attribute-1","keyAttribute2":null}
{"valueAttribute1":"value-1"}{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


"version" in kafka-reassign-partitions's JSON input file is not updated by KIP-113

2018-11-20 Thread Attila Sasvari
Hi there,

KIP-113 added a new, optional filed to the input JSON file of
kafka-reassign-partitions:

{
  "version" : int,
  "partitions" : [
{
  "topic" : str,
  "partition" : int,
  "replicas" : [int],
  "log_dirs" : [str]<-- NEW. A log directory can be either "any",
or a valid absolute path that begins with '/'. This is an optional filed.
It is treated as an array of "any" if this field is not explicitly
specified in the json file.
},
...
  ]
}
{code}

KIP-113 says:
This KIP is a pure addition. So there is no backward compatibility concern.

Is it intentional that "version" remained 1?

Regards,
Attila


[DISCUSS] KIP-375 Kafka Clients - make Metadata#TOPIC_EXPIRY_MS configurable

2018-11-20 Thread Pavel Moukhataev
Hello

https://cwiki.apache.org/confluence/display/KAFKA/KIP-375%3A+Kafka+Clients+-+make+Metadata%23TOPIC_EXPIRY_MS+configurable

I'd like to introduce new feature for kafka client:
Making org.apache.kafka.clients.Metadata#TOPIC_EXPIRY_MS configurable
Here is KPI
https://cwiki.apache.org/confluence/display/KAFKA/KIP-375%3A+Make+org.apache.kafka.clients.Metadata%23TOPIC_EXPIRY_MS+configurable

The problem is: if application sends records to some topic rarely then
topic metadata gets expired and sending thread is blocked to wait topic
metadata.

Easy fix is to make TOPIC_EXPIRY_MS configurable and make it higher than
topic send interval.

-- 
Pavel
+7-903-258-5544
skype://pavel.moukhataev


[jira] [Reopened] (KAFKA-7565) NPE in KafkaConsumer

2018-11-20 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reopened KAFKA-7565:


> NPE in KafkaConsumer
> 
>
> Key: KAFKA-7565
> URL: https://issues.apache.org/jira/browse/KAFKA-7565
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.1
>Reporter: Alexey Vakhrenev
>Priority: Critical
>
> The stacktrace is
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:221)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:202)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:244)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
> {noformat}
> Couldn't find minimal reproducer, but it happens quite often in our system. 
> We use {{pause()}} and {{wakeup()}} methods quite extensively, maybe it is 
> somehow related.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [Discuss] KIP-389: Enforce group.max.size to cap member metadata growth

2018-11-20 Thread Stanislav Kozlovski
Hey there everybody,

Thanks for the introduction Boyang. I appreciate the effort you are putting
into improving consumer behavior in Kafka.

@Matt
I also believe the default value is high. In my opinion, we should aim to a
default cap around 250. This is because in the current model any consumer
rebalance is disrupting to every consumer. The bigger the group, the longer
this period of disruption.

If you have such a large consumer group, chances are that your client-side
logic could be structured better and that you are not using the high number
of consumers to achieve high throughput.
250 can still be considered of a high upper bound, I believe in practice
users should aim to not go over 100 consumers per consumer group.

In regards to the cap being global/per-broker, I think that we should
consider whether we want it to be global or *per-topic*. For the time
being, I believe that having it per-topic with a global default might be
the best situation. Having it global only seems a bit restricting to me and
it never hurts to support more fine-grained configurability (given it's the
same config, not a new one being introduced).

On Tue, Nov 20, 2018 at 11:32 AM Boyang Chen  wrote:

> Thanks Matt for the suggestion! I'm still open to any suggestion to change
> the default value. Meanwhile I just want to point out that this value is a
> just last line of defense, not a real scenario we would expect.
>
>
> In the meanwhile, I discussed with Stanislav and he would be driving the
> 389 effort from now on. Stanislav proposed the idea in the first place and
> had already come up a draft design, while I will keep focusing on KIP-345
> effort to ensure solving the edge case described in the JIRA<
> https://issues.apache.org/jira/browse/KAFKA-7610>.
>
>
> Thank you Stanislav for making this happen!
>
>
> Boyang
>
> 
> From: Matt Farmer 
> Sent: Tuesday, November 20, 2018 10:24 AM
> To: dev@kafka.apache.org
> Subject: Re: [Discuss] KIP-389: Enforce group.max.size to cap member
> metadata growth
>
> Thanks for the KIP.
>
> Will this cap be a global cap across the entire cluster or per broker?
>
> Either way the default value seems a bit high to me, but that could just be
> from my own usage patterns. I’d have probably started with 500 or 1k but
> could be easily convinced that’s wrong.
>
> Thanks,
> Matt
>
> On Mon, Nov 19, 2018 at 8:51 PM Boyang Chen  wrote:
>
> > Hey folks,
> >
> >
> > I would like to start a discussion on KIP-389:
> >
> >
> >
> https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-389%253A%2BEnforce%2Bgroup.max.size%2Bto%2Bcap%2Bmember%2Bmetadata%2Bgrowthdata=02%7C01%7C%7Cb0ee4fe97ad44cc046eb08d64e8f5d90%7C84df9e7fe9f640afb435%7C1%7C0%7C636782774981237462sdata=Q2T7hIoVq8GiPVhr0HIxVkGNChkiz1Pvk2zyLD5gCu8%3Dreserved=0
> >
> >
> > This is a pretty simple change to cap the consumer group size for broker
> > stability. Give me your valuable feedback when you got time.
> >
> >
> > Thank you!
> >
>


-- 
Best,
Stanislav


[jira] [Created] (KAFKA-7661) Upgrade compatibility matrix

2018-11-20 Thread Pavel Kozlov (JIRA)
Pavel Kozlov created KAFKA-7661:
---

 Summary: Upgrade compatibility matrix
 Key: KAFKA-7661
 URL: https://issues.apache.org/jira/browse/KAFKA-7661
 Project: Kafka
  Issue Type: Improvement
Reporter: Pavel Kozlov


There is compatibility matrix 
[https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix] which 
doesn't have information about recent versions of kafka (1.x.x, 2.x.x).

In particular, I am interested how safe is to use kafka-clients of latest 
version (2.0.1) with older brokers (1.1.0/1.1.1 or older).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-258: Allow to Store Record Timestamps in RocksDB

2018-11-20 Thread Patrik Kleindl
Hi Adam

Sounds great, I was already planning to ask around if anyone had tackled
this.
We have a use case very similar to what you described in KAFKA-4212, only
with Global State Stores.
I have tried a few things with the normal DSL but was not really successful.
Schedule/Punctuate is not possible, supplying a windowed store is also not
allowed and the process method has no knowledge of the timestamp of the
record.
And anything loaded on startup is not filtered anyway.

Regarding 4212, wouldn't it be easier (although a little less
space-efficient) to track the Timestamps in a separate Store with 
?
This would leave the original store intact and allow a migration of the
timestamps without touching the other data.

So I am very interested in your PR :-)

best regards

Patrik

On Tue, 20 Nov 2018 at 04:46, Adam Bellemare 
wrote:

> Hi Matthias
>
> Thanks - I figured that it was probably a case of just too much to do and
> not enough time. I know how that can go. I am asking about this one in
> relation to https://issues.apache.org/jira/browse/KAFKA-4212, adding a TTL
> to RocksDB. I have outlined a bit about my use-case within 4212, but for
> brevity here it is:
>
> My case:
> 1) I have a RocksDB with TTL implementation working where records are aged
> out using the TTL that comes with RocksDB (very simple).
> 2) We prevent records from loading from the changelog if recordTime + TTL <
> referenceTimeStamp (default = System.currentTimeInMillis() ).
>
> This assumes that the records are stored with the same time reference (say
> UTC) as the consumer materializing the RocksDB store.
>
> My questions about KIP-258 are as follows:
> 1) How does "we want to be able to store record timestamps in KTables"
> differ from inserting records into RocksDB with TTL at consumption time? I
> understand that it could be a difference of some seconds, minutes, hours,
> days etc between when the record was published and now, but given the
> nature of how RocksDB TTL works (eventual - based on compaction) I don't
> see how a precise TTL can be achieved, such as that which one can get with
> windowed stores.
>
> 2) Are you looking to change how records are inserted into a TTL RocksDB,
> such that the TTL would take effect from the record's published time? If
> not, what would be the ideal workflow here for a single record with TTL
> RocksDB?
> ie: Record Timestamp: 100
> TTL: 50
> Record inserted into rocksDB: 110
> Record to expire at 150?
>
> 3) I'm not sure I fully understand the importance of the upgrade path. I
> have read the link to (https://issues.apache.org/jira/browse/KAFKA-3522)
> in
> the KIP, and I can understand that a state-store on disk may not represent
> what the application is expecting. I don't think I have the full picture
> though, because that issue seems to be easy to fix with a simple versioned
> header or accompanying file, forcing the app to rebuild the state if the
> version is incompatible. Can you elaborate or add a scenario to the KIP
> that illustrates the need for the upgrade path?
>
> Thanks,
>
> Adam
>
>
>
>
> On Sun, Nov 11, 2018 at 1:43 PM Matthias J. Sax 
> wrote:
>
> > Adam,
> >
> > I am still working on it. Was pulled into a lot of other tasks lately so
> > this was delayed. Also had some discussions about simplifying the
> > upgrade path with some colleagues and I am prototyping this atm. Hope to
> > update the KIP accordingly soon.
> >
> > -Matthias
> >
> > On 11/10/18 7:41 AM, Adam Bellemare wrote:
> > > Hello Matthias
> > >
> > > I am curious as to the status of this KIP. TTL and expiry of records
> will
> > > be extremely useful for several of our business use-cases, as well as
> > > another KIP I had been working on.
> > >
> > > Thanks
> > >
> > >
> > >
> > > On Mon, Aug 13, 2018 at 10:29 AM Eno Thereska 
> > > wrote:
> > >
> > >> Hi Matthias,
> > >>
> > >> Good stuff. Could you comment a bit on how future-proof is this
> change?
> > For
> > >> example, if we want to store both event timestamp "and" processing
> time
> > in
> > >> RocksDB will we then need another interface (e.g. called
> > >> KeyValueWithTwoTimestampsStore)?
> > >>
> > >> Thanks
> > >> Eno
> > >>
> > >> On Thu, Aug 9, 2018 at 2:30 PM, Matthias J. Sax <
> matth...@confluent.io>
> > >> wrote:
> > >>
> > >>> Thanks for your input Guozhang and John.
> > >>>
> > >>> I see your point, that the upgrade API is not simple. If you don't
> > >>> thinks it's valuable to make generic store upgrades possible (atm),
> we
> > >>> can make the API internal, too. The impact is, that we only support a
> > >>> predefined set up upgrades (ie, KV to KVwithTs, Windowed to
> > >>> WindowedWithTS etc) for which we implement the internal interfaces.
> > >>>
> > >>> We can keep the design generic, so if we decide to make it public, we
> > >>> don't need to re-invent it. This will also have the advantage, that
> we
> > >>> can add upgrade pattern for other stores later, too.
> > >>>
> > >>> I also agree, that the `StoreUpgradeBuilder` is a 

Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-11-20 Thread Boyang Chen
Thanks Guozhang for the great summary here, and I have been following up the 
action items here.


  1.  I already updated the KIP to remove the expansion timeout and 
registration timeout. Great to see them being addressed in client side!
  2.  I double checked the design and I believe that it is ok to have both 
static member and dynamic member co-exist in the same group. So the upgrade 
shouldn't be destructive and we are removing the two membership protocol 
switching APIs.
  3.  I only have question about this one. I'm still reading the KafkaApis code 
here. Should I just use the same authorization logic for 
ForceStaticRebalanceRequest as JoinGroupRequest?
  4.  I'm very excited to see this work with K8! Like you suggested, this 
feature could be better addressed in a separate KIP because it is pretty 
independent. I could start drafting the KIP once the current proposal is 
approved.
  5.  I believe that we don't need fencing in offset commit request, since 
duplicate member.name issue could be handled by join group request. We shall 
reject join group with known member name but no member id (which means we 
already have an active member using this identity).
  6.  I agree to remove that internal config once we move forward with static 
membership. And I already removed the entire section from the KIP.

Let me know if you have other concerns.

Best,
Boyang

From: Guozhang Wang 
Sent: Tuesday, November 20, 2018 4:21 PM
To: dev
Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by 
specifying member id

Hello Boyang,

Thanks a lot for the KIP! It is a great write-up and I appreciate your
patience answering to the feedbacks from the community. I'd like to add my
2cents here:

1. By introducing another two timeout configs, registration_timeout and
expansion_timeout, we are effectively having four timeout configs: session
timeout, rebalance timeout (configured as "max.poll.interval.ms" on client
side), and these two. Interplaying these timeout configs can be quite hard
for users with such complexity, and hence I'm wondering if we can simplify
the situation with as less possible timeout configs as possible. Here is a
concrete suggestion I'd like propose:

1.a) Instead of introducing a registration_timeout in addition to the
session_timeout for static members, we can just reuse the session_timeout
and ask users to set it to a larger value when they are upgrading a dynamic
client to a static client by setting the "member.name" at the same time. By
default, the broker-side min.session.timeout is 6 seconds and
max.session.timeout is 5 minutes, which seems reasonable to me (we can of
course modify this broker config to enlarge the valid interval if we want
in practice). And then we should also consider removing the condition for
marking a client as failed if the rebalance timeout has reached while the
JoinGroup was not received, so that the semantics of session_timeout and
rebalance_timeout are totally separated: the former is only used to
determine if a consumer member of the group should be marked as failed and
kicked out of the group, and the latter is only used to determine the
longest time coordinator should wait for PREPARE_REBALANCE phase. In other
words if a member did not send the JoinGroup in time of the
rebalance_timeout, we still include it in the new generation of the group
and use its old subscription info to send to leader for assignment. Later
if the member came back with HeartBeat request, we can still follow the
normal path to bring it to the latest generation while checking that its
sent JoinGroup request contains the same subscription info as we used to
assign the partitions previously (which should be likely the case in
practice). In addition, we should let static members to not send the
LeaveGroup request when it is gracefully shutdown, so that a static member
can only be leaving the group if its session has timed out, OR it has been
indicated to not exist in the group any more (details below).

1.b) We have a parallel discussion about Incremental Cooperative
Rebalancing, in which we will encode the "when to rebalance" logic at the
application level, instead of at the protocol level. By doing this we can
also enable a few other optimizations, e.g. at the Streams level to first
build up the state store as standby tasks and then trigger a second
rebalance to actually migrate the active tasks while keeping the actual
rebalance latency and hence unavailability window to be small (
https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FKAFKA-6145data=02%7C01%7C%7C7272fe45059f4afa205208d64ec12c1e%7C84df9e7fe9f640afb435%7C1%7C0%7C636782988907214421sdata=Etn5Y%2BVFTo0GQ1BydPVMUUYbEf04wyhqzKrs3OjCmC8%3Dreserved=0).
 I'd propose we align
KIP-345 along with this idea, and hence do not add the expansion_timeout as
part of the protocol layer, but only do that at the application's
coordinator / 

[jira] [Created] (KAFKA-7660) Stream Metrics - Memory Analysis

2018-11-20 Thread Patrik Kleindl (JIRA)
Patrik Kleindl created KAFKA-7660:
-

 Summary: Stream Metrics - Memory Analysis
 Key: KAFKA-7660
 URL: https://issues.apache.org/jira/browse/KAFKA-7660
 Project: Kafka
  Issue Type: Bug
  Components: metrics, streams
Affects Versions: 2.0.0
Reporter: Patrik Kleindl
 Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
Mem_References.jpeg

During the analysis of JVM memory two possible issues were shown which I would 
like to bring to your attention:
1) Duplicate strings
Top findings: 
string_content="stream-processor-node-metrics" count="534,277"
string_content="processor-node-id" count="148,437"
string_content="stream-rocksdb-state-metrics" count="41,832"
string_content="punctuate-latency-avg" count="29,681" 
 
"stream-processor-node-metrics"  seems to be used in Sensors.java as a literal 
and not interned.
 
2) The HashMap parentSensors from 
org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
 was reported multiple times as suspicious for potentially keeping alive a lot 
of objects. In our case the reported size was 40-50MB each.
I haven't looked too deep in the code but noticed that the class Sensor.java 
which is used as a key in the HashMap does not implement equals or hashCode 
method. Not sure this is a problem though.
 
The analysis was done with Dynatrace 7.0
We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
 
Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk11 #106

2018-11-20 Thread Apache Jenkins Server
See 


Changes:

[manikumar.reddy] MINOR: Improve maven artifactory url in release.py (#5931)

--
[...truncated 2.25 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecord STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecord PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecord STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecord PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > shouldAdvanceTime 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > shouldAdvanceTime 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairs STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairsAndCustomTimestamps
 STARTED


Re: [Discuss] KIP-389: Enforce group.max.size to cap member metadata growth

2018-11-20 Thread Boyang Chen
Thanks Matt for the suggestion! I'm still open to any suggestion to change the 
default value. Meanwhile I just want to point out that this value is a just 
last line of defense, not a real scenario we would expect.


In the meanwhile, I discussed with Stanislav and he would be driving the 389 
effort from now on. Stanislav proposed the idea in the first place and had 
already come up a draft design, while I will keep focusing on KIP-345 effort to 
ensure solving the edge case described in the 
JIRA.


Thank you Stanislav for making this happen!


Boyang


From: Matt Farmer 
Sent: Tuesday, November 20, 2018 10:24 AM
To: dev@kafka.apache.org
Subject: Re: [Discuss] KIP-389: Enforce group.max.size to cap member metadata 
growth

Thanks for the KIP.

Will this cap be a global cap across the entire cluster or per broker?

Either way the default value seems a bit high to me, but that could just be
from my own usage patterns. I’d have probably started with 500 or 1k but
could be easily convinced that’s wrong.

Thanks,
Matt

On Mon, Nov 19, 2018 at 8:51 PM Boyang Chen  wrote:

> Hey folks,
>
>
> I would like to start a discussion on KIP-389:
>
>
> https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-389%253A%2BEnforce%2Bgroup.max.size%2Bto%2Bcap%2Bmember%2Bmetadata%2Bgrowthdata=02%7C01%7C%7Cb0ee4fe97ad44cc046eb08d64e8f5d90%7C84df9e7fe9f640afb435%7C1%7C0%7C636782774981237462sdata=Q2T7hIoVq8GiPVhr0HIxVkGNChkiz1Pvk2zyLD5gCu8%3Dreserved=0
>
>
> This is a pretty simple change to cap the consumer group size for broker
> stability. Give me your valuable feedback when you got time.
>
>
> Thank you!
>


Build failed in Jenkins: kafka-trunk-jdk8 #3209

2018-11-20 Thread Apache Jenkins Server
See 


Changes:

[manikumar.reddy] MINOR: Improve maven artifactory url in release.py (#5931)

--
[...truncated 2.63 MB...]
org.apache.kafka.connect.transforms.CastTest > castWholeRecordKeyWithSchema 
PASSED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueSchemalessInt8 STARTED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueSchemalessInt8 PASSED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueSchemalessBooleanFalse STARTED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueSchemalessBooleanFalse PASSED

org.apache.kafka.connect.transforms.CastTest > testConfigInvalidSchemaType 
STARTED

org.apache.kafka.connect.transforms.CastTest > testConfigInvalidSchemaType 
PASSED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueSchemalessString STARTED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueSchemalessString PASSED

org.apache.kafka.connect.transforms.CastTest > castFieldsWithSchema STARTED

org.apache.kafka.connect.transforms.CastTest > castFieldsWithSchema PASSED

org.apache.kafka.connect.transforms.HoistFieldTest > withSchema STARTED

org.apache.kafka.connect.transforms.HoistFieldTest > withSchema PASSED

org.apache.kafka.connect.transforms.HoistFieldTest > schemaless STARTED

org.apache.kafka.connect.transforms.HoistFieldTest > schemaless PASSED

org.apache.kafka.connect.transforms.MaskFieldTest > withSchema STARTED

org.apache.kafka.connect.transforms.MaskFieldTest > withSchema PASSED

org.apache.kafka.connect.transforms.MaskFieldTest > schemaless STARTED

org.apache.kafka.connect.transforms.MaskFieldTest > schemaless PASSED

org.apache.kafka.connect.transforms.ValueToKeyTest > withSchema STARTED

org.apache.kafka.connect.transforms.ValueToKeyTest > withSchema PASSED

org.apache.kafka.connect.transforms.ValueToKeyTest > schemaless STARTED

org.apache.kafka.connect.transforms.ValueToKeyTest > schemaless PASSED

org.apache.kafka.connect.transforms.TimestampRouterTest > defaultConfiguration 
STARTED

org.apache.kafka.connect.transforms.TimestampRouterTest > defaultConfiguration 
PASSED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > schemaNameUpdate 
STARTED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > schemaNameUpdate 
PASSED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > 
schemaNameAndVersionUpdateWithStruct STARTED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > 
schemaNameAndVersionUpdateWithStruct PASSED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > 
updateSchemaOfStruct STARTED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > 
updateSchemaOfStruct PASSED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > 
schemaNameAndVersionUpdate STARTED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > 
schemaNameAndVersionUpdate PASSED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > updateSchemaOfNull 
STARTED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > updateSchemaOfNull 
PASSED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > schemaVersionUpdate 
STARTED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > schemaVersionUpdate 
PASSED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > 
updateSchemaOfNonStruct STARTED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > 
updateSchemaOfNonStruct PASSED

org.apache.kafka.connect.transforms.ExtractFieldTest > withSchema STARTED

org.apache.kafka.connect.transforms.ExtractFieldTest > withSchema PASSED

org.apache.kafka.connect.transforms.ExtractFieldTest > testNullWithSchema 
STARTED

org.apache.kafka.connect.transforms.ExtractFieldTest > testNullWithSchema PASSED

org.apache.kafka.connect.transforms.ExtractFieldTest > schemaless STARTED

org.apache.kafka.connect.transforms.ExtractFieldTest > schemaless PASSED

org.apache.kafka.connect.transforms.ExtractFieldTest > testNullSchemaless 
STARTED

org.apache.kafka.connect.transforms.ExtractFieldTest > testNullSchemaless PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessFieldConversion STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessFieldConversion PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testConfigInvalidTargetType STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testConfigInvalidTargetType PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessStringToTimestamp STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessStringToTimestamp PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > testKey STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > testKey PASSED


Need to subscribe to mail list and get access to contribute to jira tickets

2018-11-20 Thread KAUSHIK SRINIVAS
Hi,

Need subscription to kafka mailing list.

Also need to assign jira tickets to myself. Have worked on few pull
requests and need to submit the code.

Need support in getting the required permissions to assign the kafka jira
ticket to myself.

Thanks & Regards,
kaushik


[jira] [Created] (KAFKA-7659) dummy test

2018-11-20 Thread kaushik srinivas (JIRA)
kaushik srinivas created KAFKA-7659:
---

 Summary: dummy test
 Key: KAFKA-7659
 URL: https://issues.apache.org/jira/browse/KAFKA-7659
 Project: Kafka
  Issue Type: Improvement
  Components: tools
Reporter: kaushik srinivas






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Request for permissions to add KIP for user shnguyen

2018-11-20 Thread Guozhang Wang
Hello Shawn,

I should have already added you 4 days ago when replying to this thread,
could you try again?


Guozhang

On Mon, Nov 19, 2018 at 8:47 PM Shawn Nguyen 
wrote:

> Hey folks,
> Any chance I could get permissions to add a KIP
> <
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals#KafkaImprovementProposals-KIPsunderdiscussion
> >?
>
>
> Thanks a bunch,
> Shawn
>
> On Fri, Nov 16, 2018 at 2:54 PM Shawn Nguyen 
> wrote:
>
> > Hi there,
> > Can you grant me permissions for adding a KIP page? My username is
> > shnguyen and email is shavvnnngu...@gmail.com on Confluence.
> >
> > Thanks,
> > Shawn
> >
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-11-20 Thread Guozhang Wang
Hello Boyang,

Thanks a lot for the KIP! It is a great write-up and I appreciate your
patience answering to the feedbacks from the community. I'd like to add my
2cents here:

1. By introducing another two timeout configs, registration_timeout and
expansion_timeout, we are effectively having four timeout configs: session
timeout, rebalance timeout (configured as "max.poll.interval.ms" on client
side), and these two. Interplaying these timeout configs can be quite hard
for users with such complexity, and hence I'm wondering if we can simplify
the situation with as less possible timeout configs as possible. Here is a
concrete suggestion I'd like propose:

1.a) Instead of introducing a registration_timeout in addition to the
session_timeout for static members, we can just reuse the session_timeout
and ask users to set it to a larger value when they are upgrading a dynamic
client to a static client by setting the "member.name" at the same time. By
default, the broker-side min.session.timeout is 6 seconds and
max.session.timeout is 5 minutes, which seems reasonable to me (we can of
course modify this broker config to enlarge the valid interval if we want
in practice). And then we should also consider removing the condition for
marking a client as failed if the rebalance timeout has reached while the
JoinGroup was not received, so that the semantics of session_timeout and
rebalance_timeout are totally separated: the former is only used to
determine if a consumer member of the group should be marked as failed and
kicked out of the group, and the latter is only used to determine the
longest time coordinator should wait for PREPARE_REBALANCE phase. In other
words if a member did not send the JoinGroup in time of the
rebalance_timeout, we still include it in the new generation of the group
and use its old subscription info to send to leader for assignment. Later
if the member came back with HeartBeat request, we can still follow the
normal path to bring it to the latest generation while checking that its
sent JoinGroup request contains the same subscription info as we used to
assign the partitions previously (which should be likely the case in
practice). In addition, we should let static members to not send the
LeaveGroup request when it is gracefully shutdown, so that a static member
can only be leaving the group if its session has timed out, OR it has been
indicated to not exist in the group any more (details below).

1.b) We have a parallel discussion about Incremental Cooperative
Rebalancing, in which we will encode the "when to rebalance" logic at the
application level, instead of at the protocol level. By doing this we can
also enable a few other optimizations, e.g. at the Streams level to first
build up the state store as standby tasks and then trigger a second
rebalance to actually migrate the active tasks while keeping the actual
rebalance latency and hence unavailability window to be small (
https://issues.apache.org/jira/browse/KAFKA-6145). I'd propose we align
KIP-345 along with this idea, and hence do not add the expansion_timeout as
part of the protocol layer, but only do that at the application's
coordinator / assignor layer (Connect, Streams, etc). We can still,
deprecate the "*group.initial.rebalance.delay.ms
*" though as part of this KIP
since we have discussed about its limit and think it is actually not a very
good design and could be replaced with client-side logic above.


2. I'd like to see your thoughts on the upgrade path for this KIP. More
specifically, let's say after we have upgraded broker version to be able to
recognize the new versions of JoinGroup request and the admin requests, how
should we upgrade the clients and enable static groups? On top of my head
if we do a rolling bounce in which we set the member.name config as well as
optionally increase the session.timeout config when we bounce each
instance, then during this rolling bounces we will have a group contained
with both dynamic members and static members. It means that we should have
the group to allow such scenario (i.e. we cannot reject JoinGroup requests
from dynamic members), and hence the "member.name" -> "member.id" mapping
will only be partial at this scenario. Also could you describe if the
upgrade to the first version that support this feature would ever get any
benefits, or only the future upgrade path for rolling bounces could get
benefits out of this feature?

If that's the case and we will do 1) as suggested above, do we still need
the enableStaticMembership and enableDynamicMembership admin requests any
more? Seems it is not necessary any more as we will only have the notion of
"dynamic or static members" that can co-exist in a group while there no
notion of "dynamic or static groups", and hence these two requests are not
needed anymore.


3. We need to briefly talk about the implications for ACL as we introduce
new admin requests that are related to a specific group.id. 

Re: [VOTE] 2.1.0 RC1

2018-11-20 Thread Jason Gustafson
+1

I verified the release and upgrade notes. I also went through the basic
quickstart.

Great job running the release, Dong! Thanks for all the effort.

-Jason

On Mon, Nov 19, 2018 at 10:50 AM, Dong Lin  wrote:

> Hey Ismael,
>
> I checked out a clean copy of Kafka and reuploaded artifacts for 2.1.0-rc1
> without code change. There are still those new files in
> https://repository.apache.org/content/groups/staging/org/
> apache/kafka/kafka_2.12/2.1.0.
> I compared 2.0 and 2.1 branch but did not find any suspicious change in
> release.py and build.gradle.
>
> Since doing a new release could not address this right away and there is no
> known impact on user due to these redundant files, I am inclined to still
> release 2.1.0-rc1 so that user can start to use the new features soon. What
> do you think?
>
> Thanks,
> Dong
>
>
> On Mon, Nov 19, 2018 at 2:16 AM Dong Lin  wrote:
>
> > Hey Ismael,
> >
> > Thanks much for catching this! Sorry I didn't catch this issue before.
> >
> > These files were uploaded by release.py scrip in the repo.
> kafka_2.12-2.1.
> > 0.mapping contains the following string and the other files are the
> > signature and hash of the file kafka_2.12-2.1.0.mapping:
> >
> >
> > /home/dolin/research/kafka/.release_work_dir/kafka/core/
> build/libs/kafka_2.12-2.1.0.jar
> >
> > /home/dolin/research/kafka/.release_work_dir/kafka/core/build/tmp/scala/
> compilerAnalysis/compileScala.analysis
> >
> > It is weird to have these files.. Let me generate another release
> > candidate and try to fix this issue.
> >
> > Thanks,
> > Dong
> >
>


Re: [DISCUSS] KIP-258: Allow to Store Record Timestamps in RocksDB

2018-11-20 Thread Matthias J. Sax
I did not think about how TTL should be integrated exactly. However, it
seems reasonable, to provide TTL not based on RocksDB built-in TTL,
because we want to synchronize deletes in RocksDB with deletes in the
changelog topic (to avoid the filtering you implemented).

Thus, on each put() we could check the stored timestamp instead, and if
the record is expired already, we do a delete() on RocksDB, write a
tombstone to the changelog topic and return `null` to the user.
Additionally, we could have a punctuation() running that expires old
records in the background, too.

This are just initial thoughts though.

Additionally, we want to use the store timestamp for other purposed. For
example, timestamps of output records in aggregations and joins are
non-deterministic atm. With the stored timestamps, we can provide better
semantics for the timestamp of output records.

Additionally, if a topic is read into a table directly  via
`builder.table()`, we can reject out-of-order data based on the store
timestamps (atm, we update the table in offset order and cannot detect
out-of-order data).

For the data format, we change the format from `` to
`>` -- thus, this change is transparent to RocksDB.

About the upgrade path: we don't want to wipe out the store to recreate
it in the new format, because this will imply long reload times (what is
effectively "downtime" for an application as no data is processed), but
we want to provide an upgrade path with zero downtime.


-Matthias

On 11/19/18 7:45 PM, Adam Bellemare wrote:
> Hi Matthias
> 
> Thanks - I figured that it was probably a case of just too much to do and
> not enough time. I know how that can go. I am asking about this one in
> relation to https://issues.apache.org/jira/browse/KAFKA-4212, adding a TTL
> to RocksDB. I have outlined a bit about my use-case within 4212, but for
> brevity here it is:
> 
> My case:
> 1) I have a RocksDB with TTL implementation working where records are aged
> out using the TTL that comes with RocksDB (very simple).
> 2) We prevent records from loading from the changelog if recordTime + TTL <
> referenceTimeStamp (default = System.currentTimeInMillis() ).
> 
> This assumes that the records are stored with the same time reference (say
> UTC) as the consumer materializing the RocksDB store.
> 
> My questions about KIP-258 are as follows:
> 1) How does "we want to be able to store record timestamps in KTables"
> differ from inserting records into RocksDB with TTL at consumption time? I
> understand that it could be a difference of some seconds, minutes, hours,
> days etc between when the record was published and now, but given the
> nature of how RocksDB TTL works (eventual - based on compaction) I don't
> see how a precise TTL can be achieved, such as that which one can get with
> windowed stores.
> 
> 2) Are you looking to change how records are inserted into a TTL RocksDB,
> such that the TTL would take effect from the record's published time? If
> not, what would be the ideal workflow here for a single record with TTL
> RocksDB?
> ie: Record Timestamp: 100
> TTL: 50
> Record inserted into rocksDB: 110
> Record to expire at 150?
> 
> 3) I'm not sure I fully understand the importance of the upgrade path. I
> have read the link to (https://issues.apache.org/jira/browse/KAFKA-3522) in
> the KIP, and I can understand that a state-store on disk may not represent
> what the application is expecting. I don't think I have the full picture
> though, because that issue seems to be easy to fix with a simple versioned
> header or accompanying file, forcing the app to rebuild the state if the
> version is incompatible. Can you elaborate or add a scenario to the KIP
> that illustrates the need for the upgrade path?
> 
> Thanks,
> 
> Adam
> 
> 
> 
> 
> On Sun, Nov 11, 2018 at 1:43 PM Matthias J. Sax 
> wrote:
> 
>> Adam,
>>
>> I am still working on it. Was pulled into a lot of other tasks lately so
>> this was delayed. Also had some discussions about simplifying the
>> upgrade path with some colleagues and I am prototyping this atm. Hope to
>> update the KIP accordingly soon.
>>
>> -Matthias
>>
>> On 11/10/18 7:41 AM, Adam Bellemare wrote:
>>> Hello Matthias
>>>
>>> I am curious as to the status of this KIP. TTL and expiry of records will
>>> be extremely useful for several of our business use-cases, as well as
>>> another KIP I had been working on.
>>>
>>> Thanks
>>>
>>>
>>>
>>> On Mon, Aug 13, 2018 at 10:29 AM Eno Thereska 
>>> wrote:
>>>
 Hi Matthias,

 Good stuff. Could you comment a bit on how future-proof is this change?
>> For
 example, if we want to store both event timestamp "and" processing time
>> in
 RocksDB will we then need another interface (e.g. called
 KeyValueWithTwoTimestampsStore)?

 Thanks
 Eno

 On Thu, Aug 9, 2018 at 2:30 PM, Matthias J. Sax 
 wrote:

> Thanks for your input Guozhang and John.
>
> I see your point, that the upgrade API is not simple. If you don't