[jira] [Commented] (KAFKA-4384) ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted message

2016-11-09 Thread Jun He (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15653249#comment-15653249
 ] 

Jun He commented on KAFKA-4384:
---

Hi [~becket_qin], I have attached my patch including an unit test, please let 
me know if there is anything else I should do. Thanks.

> ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted 
> message
> 
>
> Key: KAFKA-4384
> URL: https://issues.apache.org/jira/browse/KAFKA-4384
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
> Environment: Ubuntu 12.04, AWS D2 instance
>Reporter: Jun He
> Fix For: 0.10.1.1
>
> Attachments: KAFKA-4384.patch
>
>
> We recently discovered an issue in Kafka 0.9.0.1 (), where 
> ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted 
> message. As the same logic exists also in Kafka 0.10.0.0 and 0.10.0.1, they 
> may have the similar issue.
> Here are system logs related to this issue.
> 2016-10-30 - 02:33:05,606 ERROR ReplicaFetcherThread-5-1590174474 
> ReplicaFetcherThread.apply - Found invalid messages during fetch for 
> partition [logs,41] offset 39021512238 error Message is corrupt (stored crc = 
> 2028421553, computed crc = 3577227678)
> 2016-10-30 - 02:33:06,582 ERROR ReplicaFetcherThread-5-1590174474 
> ReplicaFetcherThread.error - [ReplicaFetcherThread-5-1590174474], Error due 
> to kafka.common.KafkaException: - error processing data for partition 
> [logs,41] offset 39021512301 Caused - by: java.lang.RuntimeException: Offset 
> mismatch: fetched offset = 39021512301, log end offset = 39021512238.
> First, ReplicaFetcherThread got a corrupted message (offset 39021512238) due 
> to some blip.
> Line 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L138
>  threw exception
> Then, Line 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L145
>  caught it and logged this error.
> Because 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L134
>  updated the topic partition offset to the fetched latest one in 
> partitionMap. So ReplicaFetcherThread skipped the batch with corrupted 
> messages. 
> Based on 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L84,
>  the ReplicaFetcherThread then directly fetched the next batch of messages 
> (with offset 39021512301)
> Next, ReplicaFetcherThread stopped because the log end offset (still 
> 39021512238) didn't match the fetched message (offset 39021512301).
> A quick fix is to move line 134 to be after line 138.
> Would be great to have your comments and please let me know if a Jira issue 
> is needed. Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (KAFKA-4384) ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted message

2016-11-09 Thread Jun He (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun He updated KAFKA-4384:
--
Comment: was deleted

(was: Patch for KAFKA-4384 (ReplicaFetcherThread stopped after 
ReplicaFetcherThread received a corrupted message)
)

> ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted 
> message
> 
>
> Key: KAFKA-4384
> URL: https://issues.apache.org/jira/browse/KAFKA-4384
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
> Environment: Ubuntu 12.04, AWS D2 instance
>Reporter: Jun He
> Fix For: 0.10.1.1
>
> Attachments: KAFKA-4384.patch
>
>
> We recently discovered an issue in Kafka 0.9.0.1 (), where 
> ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted 
> message. As the same logic exists also in Kafka 0.10.0.0 and 0.10.0.1, they 
> may have the similar issue.
> Here are system logs related to this issue.
> 2016-10-30 - 02:33:05,606 ERROR ReplicaFetcherThread-5-1590174474 
> ReplicaFetcherThread.apply - Found invalid messages during fetch for 
> partition [logs,41] offset 39021512238 error Message is corrupt (stored crc = 
> 2028421553, computed crc = 3577227678)
> 2016-10-30 - 02:33:06,582 ERROR ReplicaFetcherThread-5-1590174474 
> ReplicaFetcherThread.error - [ReplicaFetcherThread-5-1590174474], Error due 
> to kafka.common.KafkaException: - error processing data for partition 
> [logs,41] offset 39021512301 Caused - by: java.lang.RuntimeException: Offset 
> mismatch: fetched offset = 39021512301, log end offset = 39021512238.
> First, ReplicaFetcherThread got a corrupted message (offset 39021512238) due 
> to some blip.
> Line 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L138
>  threw exception
> Then, Line 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L145
>  caught it and logged this error.
> Because 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L134
>  updated the topic partition offset to the fetched latest one in 
> partitionMap. So ReplicaFetcherThread skipped the batch with corrupted 
> messages. 
> Based on 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L84,
>  the ReplicaFetcherThread then directly fetched the next batch of messages 
> (with offset 39021512301)
> Next, ReplicaFetcherThread stopped because the log end offset (still 
> 39021512238) didn't match the fetched message (offset 39021512301).
> A quick fix is to move line 134 to be after line 138.
> Would be great to have your comments and please let me know if a Jira issue 
> is needed. Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4384) ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted message

2016-11-09 Thread Jun He (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun He updated KAFKA-4384:
--
 Reviewer: Jiangjie Qin
Affects Version/s: 0.10.1.0
   Status: Patch Available  (was: Open)

Patch for KAFKA-4384 (ReplicaFetcherThread stopped after ReplicaFetcherThread 
received a corrupted message)


> ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted 
> message
> 
>
> Key: KAFKA-4384
> URL: https://issues.apache.org/jira/browse/KAFKA-4384
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.1, 0.10.0.0, 0.9.0.1, 0.10.1.0
> Environment: Ubuntu 12.04, AWS D2 instance
>Reporter: Jun He
> Fix For: 0.10.1.1
>
> Attachments: KAFKA-4384.patch
>
>
> We recently discovered an issue in Kafka 0.9.0.1 (), where 
> ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted 
> message. As the same logic exists also in Kafka 0.10.0.0 and 0.10.0.1, they 
> may have the similar issue.
> Here are system logs related to this issue.
> 2016-10-30 - 02:33:05,606 ERROR ReplicaFetcherThread-5-1590174474 
> ReplicaFetcherThread.apply - Found invalid messages during fetch for 
> partition [logs,41] offset 39021512238 error Message is corrupt (stored crc = 
> 2028421553, computed crc = 3577227678)
> 2016-10-30 - 02:33:06,582 ERROR ReplicaFetcherThread-5-1590174474 
> ReplicaFetcherThread.error - [ReplicaFetcherThread-5-1590174474], Error due 
> to kafka.common.KafkaException: - error processing data for partition 
> [logs,41] offset 39021512301 Caused - by: java.lang.RuntimeException: Offset 
> mismatch: fetched offset = 39021512301, log end offset = 39021512238.
> First, ReplicaFetcherThread got a corrupted message (offset 39021512238) due 
> to some blip.
> Line 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L138
>  threw exception
> Then, Line 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L145
>  caught it and logged this error.
> Because 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L134
>  updated the topic partition offset to the fetched latest one in 
> partitionMap. So ReplicaFetcherThread skipped the batch with corrupted 
> messages. 
> Based on 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L84,
>  the ReplicaFetcherThread then directly fetched the next batch of messages 
> (with offset 39021512301)
> Next, ReplicaFetcherThread stopped because the log end offset (still 
> 39021512238) didn't match the fetched message (offset 39021512301).
> A quick fix is to move line 134 to be after line 138.
> Would be great to have your comments and please let me know if a Jira issue 
> is needed. Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4384) ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted message

2016-11-09 Thread Jun He (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun He updated KAFKA-4384:
--
Attachment: KAFKA-4384.patch

Patch for KAFKA-4384 (ReplicaFetcherThread stopped after ReplicaFetcherThread 
received a corrupted message)


> ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted 
> message
> 
>
> Key: KAFKA-4384
> URL: https://issues.apache.org/jira/browse/KAFKA-4384
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1, 0.10.0.0, 0.10.0.1
> Environment: Ubuntu 12.04, AWS D2 instance
>Reporter: Jun He
> Fix For: 0.10.1.1
>
> Attachments: KAFKA-4384.patch
>
>
> We recently discovered an issue in Kafka 0.9.0.1 (), where 
> ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted 
> message. As the same logic exists also in Kafka 0.10.0.0 and 0.10.0.1, they 
> may have the similar issue.
> Here are system logs related to this issue.
> 2016-10-30 - 02:33:05,606 ERROR ReplicaFetcherThread-5-1590174474 
> ReplicaFetcherThread.apply - Found invalid messages during fetch for 
> partition [logs,41] offset 39021512238 error Message is corrupt (stored crc = 
> 2028421553, computed crc = 3577227678)
> 2016-10-30 - 02:33:06,582 ERROR ReplicaFetcherThread-5-1590174474 
> ReplicaFetcherThread.error - [ReplicaFetcherThread-5-1590174474], Error due 
> to kafka.common.KafkaException: - error processing data for partition 
> [logs,41] offset 39021512301 Caused - by: java.lang.RuntimeException: Offset 
> mismatch: fetched offset = 39021512301, log end offset = 39021512238.
> First, ReplicaFetcherThread got a corrupted message (offset 39021512238) due 
> to some blip.
> Line 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L138
>  threw exception
> Then, Line 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L145
>  caught it and logged this error.
> Because 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L134
>  updated the topic partition offset to the fetched latest one in 
> partitionMap. So ReplicaFetcherThread skipped the batch with corrupted 
> messages. 
> Based on 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L84,
>  the ReplicaFetcherThread then directly fetched the next batch of messages 
> (with offset 39021512301)
> Next, ReplicaFetcherThread stopped because the log end offset (still 
> 39021512238) didn't match the fetched message (offset 39021512301).
> A quick fix is to move line 134 to be after line 138.
> Would be great to have your comments and please let me know if a Jira issue 
> is needed. Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4396) Seeing offsets not resetting even when reset policy is configured explicitly

2016-11-09 Thread Justin Miller (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15653148#comment-15653148
 ] 

Justin Miller commented on KAFKA-4396:
--

Hi huxi, thanks for responding.

I do have that set to false as I'm doing a

{code}
 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
{code}

with a custom OffsetCommitCallback to verify that I'm getting the offsets 
committed with no exception. I haven't tried running it on a new consumer group 
but I can try that tomorrow (though I would note that the problem only seems to 
manifest itself after it's processed a number of time batches. I do save all 
the parquet files that are generated for the time batch before I commit the 
offsets, this process can take up to 8 minutes. Should I perhaps just commit 
the offsets and deal with a potential data loss if retried puts to S3 fail?

Getting really close to putting this system in production. I've tweaked quite a 
few settings on the kafka consumer (can provide ConsumerConfigs if that would 
help), Streaming Kafka 0.10 has been very impressive so far!



> Seeing offsets not resetting even when reset policy is configured explicitly
> 
>
> Key: KAFKA-4396
> URL: https://issues.apache.org/jira/browse/KAFKA-4396
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justin Miller
>
> I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be 
> two separate errors, I'm not sure. What's puzzling is that I'm setting 
> auto.offset.reset to latest and it's still throwing an 
> OffsetOutOfRangeException, behavior that's contrary to the code. Please help! 
> :)
> {code}
> val kafkaParams = Map[String, Object](
>   "group.id" -> consumerGroup,
>   "bootstrap.servers" -> bootstrapServers,
>   "key.deserializer" -> classOf[ByteArrayDeserializer],
>   "value.deserializer" -> classOf[MessageRowDeserializer],
>   "auto.offset.reset" -> "latest",
>   "enable.auto.commit" -> (false: java.lang.Boolean),
>   "max.poll.records" -> persisterConfig.maxPollRecords,
>   "request.timeout.ms" -> persisterConfig.requestTimeoutMs,
>   "session.timeout.ms" -> persisterConfig.sessionTimeoutMs,
>   "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs,
>   "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs
> )
> {code}
> {code}
> 16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory 
> on xyz (size: 146.3 KB, free: 8.4 GB)
> 16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID 
> 38837, xyz): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
> Offsets out of range with no configured reset policy for partitions: 
> {topic=231884473}
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
> at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
> at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
> at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
> at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
> at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPo

Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-09 Thread radai
my personal opinion - a log compacted topic is basically a kv-store, so a
map API.
map.put(key, null) is not the same as map.remove(key), which to me means a
null value should not represent a delete. a delete should be explicit
(meaning flag).


On Wed, Nov 9, 2016 at 11:01 AM, Mayuresh Gharat  wrote:

> I see the reasoning and might be inclined to agree a bit :
> If we go to stage 2, the only difference is that we can theoretically
> support a null value non-tombstone message in a log compacted topic, but I
> am not sure if that has any use case.
>
> But as an end goal I see that kafka should clearly specify what it means by
> a tombstone : is it the attribute flag OR is it the null value. If we just
> do stage 1, I don't think we are defining the end-goal completely.
> Again this is more about semantics of correctness of end state.
>
> Thanks,
>
> Mayuresh
>
> On Wed, Nov 9, 2016 at 10:49 AM, Becket Qin  wrote:
>
> > I am not sure if we need the second stage. Wouldn't it be enough to say
> > that a message is a tombstone if one of the following is true?
> > 1. tombstone flag is set.
> > 2. value is null.
> >
> > If we go to stage 2, the only difference is that we can theoretically
> > support a null value non-tombstone message in a log compacted topic, but
> I
> > am not sure if that has any use case.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Wed, Nov 9, 2016 at 9:23 AM, Mayuresh Gharat <
> > gharatmayures...@gmail.com>
> > wrote:
> >
> > > I think it will be a good idea. +1
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Wed, Nov 9, 2016 at 9:13 AM, Michael Pearce 
> > > wrote:
> > >
> > > > +1 Mayuresh, I think this is a good solution/strategy.
> > > >
> > > > Shall we update the KIP with this? Becket/Jun/Joel any comments to
> add
> > > > before we do?
> > > >
> > > > On 08/11/2016, 17:29, "Mayuresh Gharat" 
> > > > wrote:
> > > >
> > > > I think the migration can be done in 2 stages :
> > > >
> > > > 1) In first stage the broker should understand the attribute flag
> > as
> > > > well
> > > > as Null for the value for log compaction.
> > > > 2) In second stage we move on to supporting only the attribute
> flag
> > > > for log
> > > > compaction.
> > > >
> > > > I agree with Becket that for older clients (consumers) the broker
> > > might
> > > > have to down convert a message that has the attribute flag set
> for
> > > log
> > > > compacting but has a non null value. But this should be in first
> > > stage.
> > > > Once all the clients have upgraded (clients start recognizing the
> > > > attribute
> > > > flag), we can move the broker to stage 2.
> > > >
> > > > Thanks,
> > > >
> > > > Mayuresh
> > > >
> > > > On Tue, Nov 8, 2016 at 12:06 AM, Michael Pearce <
> > > michael.pea...@ig.com
> > > > >
> > > > wrote:
> > > >
> > > > > Also we can add further guidance:
> > > > >
> > > > > To  avoid the below caveat to organisations by promoting of
> > > > upgrading all
> > > > > consumers first before relying on producing tombstone messages
> > with
> > > > data
> > > > >
> > > > > Sent using OWA for iPhone
> > > > > 
> > > > > From: Michael Pearce
> > > > > Sent: Tuesday, November 8, 2016 8:03:32 AM
> > > > > To: dev@kafka.apache.org
> > > > > Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
> > > > >
> > > > > Thanks Jun on the feedback, I think I understand the
> issue/point
> > > now.
> > > > >
> > > > > We def can add that on older client version if tombstone marker
> > > make
> > > > the
> > > > > value null to preserve behaviour.
> > > > >
> > > > > There is one caveats to this:
> > > > >
> > > > > * we have to be clear that data is lost if reading via old
> > > > client/message
> > > > > format - I don't think this is a big issue as mostly the
> idea/use
> > > > case is
> > > > > around meta data transport as such would only be as bad as
> > current
> > > > situation
> > > > >
> > > > > Re having configurable broker this was to handle cases like you
> > > > described
> > > > > but in another way by allowing organisation choose the
> behaviour
> > of
> > > > the
> > > > > compaction per broker or per topic so they could manage their
> > > > transition to
> > > > > using tombstone markers.
> > > > >
> > > > > On hind sight it maybe easier to just upgrade and downgrade the
> > > > messages
> > > > > on version as you propose.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Sent using OWA for iPhone
> > > > > 
> > > > > From: Jun Rao 
> > > > > Sent: Tuesday, November 8, 2016 12:34:41 AM
> > > > > To: dev@kafka.apache.org
> > > > > Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
> > > > >
> > > > > For the use case, one po

[jira] [Resolved] (KAFKA-4328) The parameters for creating the ZkUtils object is reverse

2016-11-09 Thread Matt Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Wang resolved KAFKA-4328.
--
Resolution: Fixed

> The parameters for creating the ZkUtils object is reverse
> -
>
> Key: KAFKA-4328
> URL: https://issues.apache.org/jira/browse/KAFKA-4328
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0, 0.10.0.1
> Environment: software platform
>Reporter: Matt Wang
>  Labels: patch
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When creating the ZkUtils object, the parameters of zkSessionTimeOutMs and 
> zkConnectionTimeoutMs is reverse. Though the default values of these 
> parameters are both 6000, it will have some problem, especially when we want 
> to reset these values. 
> The pull requests address is:
> https://github.com/apache/kafka/pull/1646



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-81: Max in-flight fetches

2016-11-09 Thread radai
selectively reading from sockets achieves memory control (up to and not
including talk of (de)compression)

this is exactly what i (also, even mostly) did for kip-72 - which i hope in
itself should be a reason to think about both KIPs at the same time because
the changes will be similar (at least in intent) and might result in
duplicated effort.

a pool API is a way to "scale" all the way from just maintaining a variable
holding amount of available memory (which is what my current kip-72 code
does and what this kip proposes IIUC) all the way up to actually re-using
buffers without any changes to the code using the pool - just drop in a
different pool impl.

for "edge nodes" (producer/consumer) the performance gain in actually
pooling large buffers may be arguable, but i suspect for brokers regularly
operating on 1MB-sized requests (which is the norm at linkedin) the
resulting memory fragmentation is an actual bottleneck (i have initial
micro-benchmark results to back this up but have not had the time to do a
full profiling run).

so basically I'm saying we may be doing (very) similar things in mostly the
same areas of code.

On Wed, Nov 2, 2016 at 11:35 AM, Mickael Maison 
wrote:

> electively reading from the socket should enable to
> control the memory usage without impacting performance. I've had look
> at that today and I can see how that would work.
> I'll update the KIP accordingly tomorrow.
>


Build failed in Jenkins: kafka-trunk-jdk8 #1032

2016-11-09 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] MINOR: add upgrade guide for Kafka Streams API

--
[...truncated 12292 lines...]
org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testMissingUsernameSaslPlain PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testUnauthenticatedApiVersionsRequestOverPlaintext STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testUnauthenticatedApiVersionsRequestOverPlaintext PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testMechanismPluggability STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testMechanismPluggability PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testApiVersionsRequestWithUnsupportedVersion STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testApiVersionsRequestWithUnsupportedVersion PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testMissingPasswordSaslPlain STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testMissingPasswordSaslPlain PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testInvalidLoginModule STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testInvalidLoginModule PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testInvalidMechanism STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testInvalidMechanism PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testDisabledMechanism STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testDisabledMechanism PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testPacketSizeTooBig STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testPacketSizeTooBig PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testInvalidUsernameSaslPlain STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testInvalidUsernameSaslPlain PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testMultipleServerMechanisms STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testMultipleServerMechanisms PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testValidSaslPlainOverPlaintext STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testValidSaslPlainOverPlaintext PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testValidSaslPlainOverSsl STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testValidSaslPlainOverSsl PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testInvalidApiVersionsRequestSequence STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testInvalidApiVersionsRequestSequence PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testDisallowedKafkaRequestsBeforeAuthentication STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testDisallowedKafkaRequestsBeforeAuthentication PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testUnauthenticatedApiVersionsRequestOverSsl STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testUnauthenticatedApiVersionsRequestOverSsl PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testInvalidPasswordSaslPlain STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testInvalidPasswordSaslPlain PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testInvalidSaslPacket STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testInvalidSaslPacket PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testSaslHandshakeRequestWithUnsupportedVersion STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testSaslHandshakeRequestWithUnsupportedVersion PASSED
:clients:determineCommitId UP-TO-DATE
:clients:createVersionFile
:clients:jar UP-TO-DATE
:core:compileJava UP-TO-DATE
:core:compileScala
:79:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.

org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP
 ^


Re: [VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2016-11-09 Thread radai
Hi Jun,

Thank you for taking the time to review this.

1. short version - yes, the concern is bugs, but the cost is tiny and worth
it, and its a common pattern. long version:
   1.1 detecting these types of bugs (leaks) cannot be easily done with
simple testing, but requires stress/stability tests that run for a long
time (long enough to hit OOM, depending on leak size and available memory).
this is why some sort of leak detector is "standard practice" .for example
look at netty 
(http://netty.io/wiki/reference-counted-objects.html#leak-detection-levels)
-
they have way more complicated built-in leak detection enabled by default.
as a concrete example - during development i did not properly dispose of
in-progress KafkaChannel.receive when a connection was abruptly closed and
I only found it because of the log msg printed by the pool.
   1.2 I have a benchmark suite showing the performance cost of the gc pool
is absolutely negligible -
https://github.com/radai-rosenblatt/kafka-benchmarks/tree/master/memorypool-benchmarks
   1.3 as for the complexity of the impl - its just ~150 lines and pretty
straight forward. i think the main issue is that not many people are
familiar with weak refs and ref queues.

   how about making the pool impl class a config param (generally good
going forward), make the default be the simple pool, and keep the GC one as
a dev/debug/triage aid?

2. the KIP itself doesnt specifically treat SSL at all - its an
implementation detail. as for my current patch, it has some minimal
treatment of SSL - just enough to not mute SSL sockets mid-handshake - but
the code in SslTransportLayer still allocates buffers itself. it is my
understanding that netReadBuffer/appReadBuffer shouldn't grow beyond 2 x
sslEngine.getSession().getPacketBufferSize(), which i assume to be small.
they are also long lived (they live for the duration of the connection)
which makes a poor fit for pooling. the bigger fish to fry i think is
decompression - you could read a 1MB blob into a pool-provided buffer and
then decompress it into 10MB of heap allocated on the spot :-) also, the
ssl code is extremely tricky.
   2.2 just to make sure, youre talking about Selector.java: while
((networkReceive = channel.read()) != null) addToStagedReceives(channel,
networkReceive); ? if so youre right, and i'll fix that (probably by
something similar to immediatelyConnectedKeys, not sure yet)

3. isOutOfMemory is self explanatory (and i'll add javadocs and update the
wiki). isLowOnMem is basically the point where I start randomizing the
selection key handling order to avoid potential starvation. its rather
arbitrary and now that i think of it should probably not exist and be
entirely contained in Selector (where the shuffling takes place). will fix.

4. will do.

5. I prefer -1 or 0 as an explicit "OFF" (or basically anything <=0).
Long.MAX_VALUE would still create a pool, that would still waste time
tracking resources. I dont really mind though if you have a preferred magic
value for off.





On Wed, Nov 9, 2016 at 9:28 AM, Jun Rao  wrote:

> Hi, Radai,
>
> Thanks for the KIP. Some comments below.
>
> 1. The KIP says "to facilitate faster implementation (as a safety net) the
> pool will be implemented in such a way that memory that was not release()ed
> (but still garbage collected) would be detected and "reclaimed". this is to
> prevent "leaks" in case of code paths that fail to release() properly.".
> What are the cases that could cause memory leaks? If we are concerned about
> bugs, it seems that it's better to just do more testing to make sure the
> usage of the simple implementation (SimpleMemoryPool) is solid instead of
> adding more complicated logic (GarbageCollectedMemoryPool) to hide the
> potential bugs.
>
> 2. I am wondering how much this KIP covers the SSL channel implementation.
> 2.1 SslTransportLayer maintains netReadBuffer, netWriteBuffer,
> appReadBuffer per socket. Should those memory be accounted for in memory
> pool?
> 2.2 One tricky thing with SSL is that during a KafkaChannel.read(), it's
> possible for multiple NetworkReceives to be returned since multiple
> requests' data could be encrypted together by SSL. To deal with this, we
> stash those NetworkReceives in Selector.stagedReceives and give it back to
> the poll() call one NetworkReceive at a time. What this means is that, if
> we stop reading from KafkaChannel in the middle because memory pool is
> full, this channel's key may never get selected for reads (even after the
> read interest is turned on), but there are still pending data for the
> channel, which will never get processed.
>
> 3. The code has the following two methods in MemoryPool, which are not
> described in the KIP. Could you explain how they are used in the wiki?
> isLowOnMemory()
> isOutOfMemory()
>
> 4. Could you also describe in the KIP at the high level, how the read
> interest bit for the socket is turned on/o

Upgrading from kafka-0.8.1.1 to kafka-0.9.0.1

2016-11-09 Thread Divyajothi Baskaran
Hi,
For the past 6 months,I am the dev for our solution written on top of 
kafka-0.8.1.1. It is in stable for us. We thought we would upgrade to 
kafka-0.9.0.1.
With the server upgrade, we did not face any issues.

We have our own solution built to extract the messages and write to different 
destinations and also messages read by storm. For our unit tests we were using 
the following maven artifact

org.apache.kafka

kafka_2.9.2

0.8.1.1


I could not find, 0.9.0.1 version for kafka_2.9.2. Hence I moved to kafka_2.11 
first. This is the artifact used:

org.apache.kafka

kafka_2.11

0.9.0.1


I was running into following issue:

  *   scala.ScalaObject not found issue
  *   java.lang.NoSuchMethodError: 
scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
  *   kafkaConfig issue with NoSuchMethodError 
(Ljava/util/map;)Ljava/util/map

Also most of the time, I would run into KafkaServerStartable(both in 
kafka_2.10-0.9.0.1 and kafka_2.11-0.9.0.1) hang issue. But with the same unit 
tests, I never got into kafka server hang issue with kafka_2.9.2.


Could you please help me with my problem ?

Am I missing anything?


Thanks,

Divya



[jira] [Commented] (KAFKA-4396) Seeing offsets not resetting even when reset policy is configured explicitly

2016-11-09 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15652596#comment-15652596
 ] 

huxi commented on KAFKA-4396:
-

I notice that you set "enable.auto.commit" to false meaning you might want to 
commit offsets manually.  Would you run into this problem if you firstly launch 
the consumer program with a new group.id? 

> Seeing offsets not resetting even when reset policy is configured explicitly
> 
>
> Key: KAFKA-4396
> URL: https://issues.apache.org/jira/browse/KAFKA-4396
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justin Miller
>
> I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be 
> two separate errors, I'm not sure. What's puzzling is that I'm setting 
> auto.offset.reset to latest and it's still throwing an 
> OffsetOutOfRangeException, behavior that's contrary to the code. Please help! 
> :)
> {code}
> val kafkaParams = Map[String, Object](
>   "group.id" -> consumerGroup,
>   "bootstrap.servers" -> bootstrapServers,
>   "key.deserializer" -> classOf[ByteArrayDeserializer],
>   "value.deserializer" -> classOf[MessageRowDeserializer],
>   "auto.offset.reset" -> "latest",
>   "enable.auto.commit" -> (false: java.lang.Boolean),
>   "max.poll.records" -> persisterConfig.maxPollRecords,
>   "request.timeout.ms" -> persisterConfig.requestTimeoutMs,
>   "session.timeout.ms" -> persisterConfig.sessionTimeoutMs,
>   "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs,
>   "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs
> )
> {code}
> {code}
> 16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory 
> on xyz (size: 146.3 KB, free: 8.4 GB)
> 16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID 
> 38837, xyz): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
> Offsets out of range with no configured reset policy for partitions: 
> {topic=231884473}
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
> at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
> at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
> at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
> at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
> at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID 
> 39388) in 12043 ms on xyz (1/16)
> 16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.0 (TID 
> 39375) in 13444 ms on xyz (2/16)
> 16/11/09 23:10:44 WARN TaskSetManager: Lost task 1.0 in stage 151.0 (TID 
> 38843, xyz): java.util.ConcurrentModificationException: KafkaConsumer is not 
> safe for multi-threaded access
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
> at 
> org.apache.kafka.clients.consumer.K

Build failed in Jenkins: kafka-0.10.1-jdk7 #84

2016-11-09 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] MINOR: add upgrade guide for Kafka Streams API

--
[...truncated 12159 lines...]
org.apache.kafka.common.security.ssl.SslFactoryTest > testClientMode STARTED

org.apache.kafka.common.security.ssl.SslFactoryTest > testClientMode PASSED

org.apache.kafka.common.security.ssl.SslFactoryTest > 
testSslFactoryConfiguration STARTED

org.apache.kafka.common.security.ssl.SslFactoryTest > 
testSslFactoryConfiguration PASSED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testPrincipalNameCanContainSeparator STARTED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testPrincipalNameCanContainSeparator PASSED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testEqualsAndHashCode STARTED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testEqualsAndHashCode PASSED

org.apache.kafka.common.internals.PartitionStatesTest > testSet STARTED

org.apache.kafka.common.internals.PartitionStatesTest > testSet PASSED

org.apache.kafka.common.internals.PartitionStatesTest > testClear STARTED

org.apache.kafka.common.internals.PartitionStatesTest > testClear PASSED

org.apache.kafka.common.internals.PartitionStatesTest > testRemove STARTED

org.apache.kafka.common.internals.PartitionStatesTest > testRemove PASSED

org.apache.kafka.common.internals.PartitionStatesTest > testMoveToEnd STARTED

org.apache.kafka.common.internals.PartitionStatesTest > testMoveToEnd PASSED

org.apache.kafka.common.internals.PartitionStatesTest > testUpdateAndMoveToEnd 
STARTED

org.apache.kafka.common.internals.PartitionStatesTest > testUpdateAndMoveToEnd 
PASSED

org.apache.kafka.common.internals.PartitionStatesTest > testPartitionValues 
STARTED

org.apache.kafka.common.internals.PartitionStatesTest > testPartitionValues 
PASSED

org.apache.kafka.common.utils.UtilsTest > testAbs STARTED

org.apache.kafka.common.utils.UtilsTest > testAbs PASSED

org.apache.kafka.common.utils.UtilsTest > testMin STARTED

org.apache.kafka.common.utils.UtilsTest > testMin PASSED

org.apache.kafka.common.utils.UtilsTest > testJoin STARTED

org.apache.kafka.common.utils.UtilsTest > testJoin PASSED

org.apache.kafka.common.utils.UtilsTest > testReadBytes STARTED

org.apache.kafka.common.utils.UtilsTest > testReadBytes PASSED

org.apache.kafka.common.utils.UtilsTest > testGetHost STARTED

org.apache.kafka.common.utils.UtilsTest > testGetHost PASSED

org.apache.kafka.common.utils.UtilsTest > testGetPort STARTED

org.apache.kafka.common.utils.UtilsTest > testGetPort PASSED

org.apache.kafka.common.utils.UtilsTest > testCloseAll STARTED

org.apache.kafka.common.utils.UtilsTest > testCloseAll PASSED

org.apache.kafka.common.utils.UtilsTest > testFormatAddress STARTED

org.apache.kafka.common.utils.UtilsTest > testFormatAddress PASSED

org.apache.kafka.common.utils.CrcTest > testUpdateInt STARTED

org.apache.kafka.common.utils.CrcTest > testUpdateInt PASSED

org.apache.kafka.common.utils.CrcTest > testUpdate STARTED

org.apache.kafka.common.utils.CrcTest > testUpdate PASSED

org.apache.kafka.common.utils.AbstractIteratorTest > testEmptyIterator STARTED

org.apache.kafka.common.utils.AbstractIteratorTest > testEmptyIterator PASSED

org.apache.kafka.common.utils.AbstractIteratorTest > testIterator STARTED

org.apache.kafka.common.utils.AbstractIteratorTest > testIterator PASSED
:clients:determineCommitId UP-TO-DATE
:clients:createVersionFile
:clients:jar UP-TO-DATE
:core:compileJava UP-TO-DATE
:core:compileScala
:79:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.

org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP
 ^
:36:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
 commitTimestamp: Long = 
org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP,

  ^
:37:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
 expireTimestamp: Long = 
org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) {

  ^


Build failed in Jenkins: kafka-trunk-jdk7 #1681

2016-11-09 Thread Apache Jenkins Server
See 

Changes:

[me] KAFKA-4364: Remove secrets from DEBUG logging

[wangguoz] MINOR: add upgrade guide for Kafka Streams API

--
[...truncated 12251 lines...]
org.apache.kafka.clients.NetworkClientTest > testSimpleRequestResponse STARTED

org.apache.kafka.clients.NetworkClientTest > testSimpleRequestResponse PASSED

org.apache.kafka.clients.NetworkClientTest > 
testDisconnectDuringUserMetadataRequest STARTED

org.apache.kafka.clients.NetworkClientTest > 
testDisconnectDuringUserMetadataRequest PASSED

org.apache.kafka.clients.NetworkClientTest > testClose STARTED

org.apache.kafka.clients.NetworkClientTest > testClose PASSED

org.apache.kafka.clients.NetworkClientTest > testLeastLoadedNode STARTED

org.apache.kafka.clients.NetworkClientTest > testLeastLoadedNode PASSED

org.apache.kafka.clients.NetworkClientTest > testConnectionDelayConnected 
STARTED

org.apache.kafka.clients.NetworkClientTest > testConnectionDelayConnected PASSED

org.apache.kafka.clients.NetworkClientTest > testRequestTimeout STARTED

org.apache.kafka.clients.NetworkClientTest > testRequestTimeout PASSED

org.apache.kafka.clients.NetworkClientTest > 
testSimpleRequestResponseWithStaticNodes STARTED

org.apache.kafka.clients.NetworkClientTest > 
testSimpleRequestResponseWithStaticNodes PASSED

org.apache.kafka.clients.NetworkClientTest > testConnectionDelay STARTED

org.apache.kafka.clients.NetworkClientTest > testConnectionDelay PASSED

org.apache.kafka.clients.NetworkClientTest > testSendToUnreadyNode STARTED

org.apache.kafka.clients.NetworkClientTest > testSendToUnreadyNode PASSED

org.apache.kafka.clients.NetworkClientTest > testConnectionDelayDisconnected 
STARTED

org.apache.kafka.clients.NetworkClientTest > testConnectionDelayDisconnected 
PASSED

org.apache.kafka.clients.MetadataTest > testListenerCanUnregister STARTED

org.apache.kafka.clients.MetadataTest > testListenerCanUnregister PASSED

org.apache.kafka.clients.MetadataTest > testTopicExpiry STARTED

org.apache.kafka.clients.MetadataTest > testTopicExpiry PASSED

org.apache.kafka.clients.MetadataTest > testFailedUpdate STARTED

org.apache.kafka.clients.MetadataTest > testFailedUpdate PASSED

org.apache.kafka.clients.MetadataTest > testMetadataUpdateWaitTime STARTED

org.apache.kafka.clients.MetadataTest > testMetadataUpdateWaitTime PASSED

org.apache.kafka.clients.MetadataTest > testUpdateWithNeedMetadataForAllTopics 
STARTED

org.apache.kafka.clients.MetadataTest > testUpdateWithNeedMetadataForAllTopics 
PASSED

org.apache.kafka.clients.MetadataTest > testClusterListenerGetsNotifiedOfUpdate 
STARTED

org.apache.kafka.clients.MetadataTest > testClusterListenerGetsNotifiedOfUpdate 
PASSED

org.apache.kafka.clients.MetadataTest > testTimeToNextUpdate_RetryBackoff 
STARTED

org.apache.kafka.clients.MetadataTest > testTimeToNextUpdate_RetryBackoff PASSED

org.apache.kafka.clients.MetadataTest > testMetadata STARTED

org.apache.kafka.clients.MetadataTest > testMetadata PASSED

org.apache.kafka.clients.MetadataTest > testTimeToNextUpdate_OverwriteBackoff 
STARTED

org.apache.kafka.clients.MetadataTest > testTimeToNextUpdate_OverwriteBackoff 
PASSED

org.apache.kafka.clients.MetadataTest > testTimeToNextUpdate STARTED

org.apache.kafka.clients.MetadataTest > testTimeToNextUpdate PASSED

org.apache.kafka.clients.MetadataTest > testListenerGetsNotifiedOfUpdate STARTED

org.apache.kafka.clients.MetadataTest > testListenerGetsNotifiedOfUpdate PASSED

org.apache.kafka.clients.MetadataTest > testNonExpiringMetadata STARTED

org.apache.kafka.clients.MetadataTest > testNonExpiringMetadata PASSED
:clients:determineCommitId UP-TO-DATE
:clients:createVersionFile
:clients:jar UP-TO-DATE
:core:compileJava UP-TO-DATE
:core:compileScala
:79:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.

org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP
 ^
:36:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
 commitTimestamp: Long = 
org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP,

  ^
:37:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
 expireTimestamp: Long = 
org.apache.kafka.common.requests.Of

Build failed in Jenkins: kafka-trunk-jdk8 #1031

2016-11-09 Thread Apache Jenkins Server
See 

Changes:

[me] KAFKA-4364: Remove secrets from DEBUG logging

--
[...truncated 14356 lines...]

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning PASSED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetTaskWithKeyAndSerializerWhenNotRunning STARTED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetTaskWithKeyAndSerializerWhenNotRunning PASSED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetAllTasksWithStoreWhenNotRunning STARTED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetAllTasksWithStoreWhenNotRunning PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartOnceClosed STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartOnceClosed PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCleanup STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCleanup PASSED

org.apache.kafka.streams.KafkaStreamsTest > testStartAndClose STARTED

org.apache.kafka.streams.KafkaStreamsTest > testStartAndClose PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCloseIsIdempotent STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCloseIsIdempotent PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCannotCleanupWhileRunning 
STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCannotCleanupWhileRunning PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartTwice STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartTwice PASSED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegion[0] STARTED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegion[0] PASSED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegion[1] STARTED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegion[1] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryState[0] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryState[0] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable[0] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable[0] FAILED
java.lang.AssertionError: Condition not met within timeout 3. waiting 
for store count-by-key
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:279)
at 
org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable(QueryableStateIntegrationTest.java:489)

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
queryOnRebalance[0] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
queryOnRebalance[0] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
concurrentAccesses[0] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
concurrentAccesses[0] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryState[1] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryState[1] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable[1] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable[1] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
queryOnRebalance[1] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
queryOnRebalance[1] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
concurrentAccesses[1] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
concurrentAccesses[1] PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testShouldReadFromRegexAndNamedTopics STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testShouldReadFromRegexAndNamedTopics PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenCreated STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenCreated PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testMultipleConsumersCanReadFromPartitionedTopic STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testMultipleConsumersCanReadFromPartitionedTopic PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenDeleted STARTED

org.apache.kafka.streams.integration.Reg

[jira] [Work started] (KAFKA-4161) Decouple flush and offset commits

2016-11-09 Thread Shikhar Bhushan (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4161 started by Shikhar Bhushan.
--
> Decouple flush and offset commits
> -
>
> Key: KAFKA-4161
> URL: https://issues.apache.org/jira/browse/KAFKA-4161
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Shikhar Bhushan
>Assignee: Shikhar Bhushan
>  Labels: needs-kip
>
> It is desirable to have, in addition to the time-based flush interval, volume 
> or size-based commits. E.g. a sink connector which is buffering in terms of 
> number of records may want to request a flush when the buffer is full, or 
> when sufficient amount of data has been buffered in a file.
> Having a method like say {{requestFlush()}} on the {{SinkTaskContext}} would 
> allow for connectors to have flexible policies around flushes. This would be 
> in addition to the time interval based flushes that are controlled with 
> {{offset.flush.interval.ms}}, for which the clock should be reset when any 
> kind of flush happens.
> We should probably also support requesting flushes via the 
> {{SourceTaskContext}} for consistency though a use-case doesn't come to mind 
> off the bat.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4396) Seeing offsets not resetting even when reset policy is configured explicitly

2016-11-09 Thread Justin Miller (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justin Miller updated KAFKA-4396:
-
Description: 
I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be two 
separate errors, I'm not sure. What's puzzling is that I'm setting 
auto.offset.reset to latest and it's still throwing an 
OffsetOutOfRangeException, behavior that's contrary to the code. Please help! :)

{code}
val kafkaParams = Map[String, Object](
  "group.id" -> consumerGroup,
  "bootstrap.servers" -> bootstrapServers,
  "key.deserializer" -> classOf[ByteArrayDeserializer],
  "value.deserializer" -> classOf[MessageRowDeserializer],
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean),
  "max.poll.records" -> persisterConfig.maxPollRecords,
  "request.timeout.ms" -> persisterConfig.requestTimeoutMs,
  "session.timeout.ms" -> persisterConfig.sessionTimeoutMs,
  "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs,
  "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs
)
{code}

{code}
16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory 
on xyz (size: 146.3 KB, free: 8.4 GB)
16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID 
38837, xyz): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
Offsets out of range with no configured reset policy for partitions: 
{topic=231884473}
at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at 
org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID 
39388) in 12043 ms on xyz (1/16)
16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.0 (TID 
39375) in 13444 ms on xyz (2/16)
16/11/09 23:10:44 WARN TaskSetManager: Lost task 1.0 in stage 151.0 (TID 38843, 
xyz): java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)

{code}

  was:
I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be two 
separate errors, I'm not sure. What's puzzling is that I'm setting 
auto.offset.reset to latest and it's still throwing 

[jira] [Updated] (KAFKA-4396) Seeing offsets not resetting even when reset policy is configured explicitly

2016-11-09 Thread Justin Miller (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justin Miller updated KAFKA-4396:
-
Description: 
I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be two 
separate errors, I'm not sure. What's puzzling is that I'm setting 
auto.offset.reset to latest and it's still throwing an 
OffsetOutOfRangeException, behavior that's contrary to the code. Please help! :)

{code}
val kafkaParams = Map[String, Object](
  "group.id" -> consumerGroup,
  "bootstrap.servers" -> bootstrapServers,
  "key.deserializer" -> classOf[ByteArrayDeserializer],
  "value.deserializer" -> classOf[MessageRowDeserializer],
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean),
  "max.poll.records" -> persisterConfig.maxPollRecords,
  "request.timeout.ms" -> persisterConfig.requestTimeoutMs,
  "session.timeout.ms" -> persisterConfig.sessionTimeoutMs,
  "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs,
  "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs
)
{code}

{code}
16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory 
on ip-172-20-212-53.int.protectwise.net:33038 (size: 146.3 KB, free: 8.4 GB)
16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID 
38837, ip-172-20-212-51.int.protectwise.net): 
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of 
range with no configured reset policy for partitions: 
{observation.http-final-main-0-0=231884473}
at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at 
org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID 
39388) in 12043 ms on ip-172-20-212-49.int.protectwise.net (1/16)
16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.0 (TID 
39375) in 13444 ms on ip-172-20-212-49.int.protectwise.net (2/16)
16/11/09 23:10:44 WARN TaskSetManager: Lost task 1.0 in stage 151.0 (TID 38843, 
ip-172-20-212-52.int.protectwise.net): 
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)

{code}

  was:
I'

[jira] [Created] (KAFKA-4396) Seeing offsets not resetting even when reset policy is configured explicitly

2016-11-09 Thread Justin Miller (JIRA)
Justin Miller created KAFKA-4396:


 Summary: Seeing offsets not resetting even when reset policy is 
configured explicitly
 Key: KAFKA-4396
 URL: https://issues.apache.org/jira/browse/KAFKA-4396
 Project: Kafka
  Issue Type: Bug
Reporter: Justin Miller


I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be two 
separate errors, I'm not sure. What's puzzling is that I'm setting 
auto.offset.reset to latest and it's still throwing an 
OffsetOutOfRangeException, behavior that's contrary to the code. Please help! :)

val kafkaParams = Map[String, Object](
  "group.id" -> consumerGroup,
  "bootstrap.servers" -> bootstrapServers,
  "key.deserializer" -> classOf[ByteArrayDeserializer],
  "value.deserializer" -> classOf[MessageRowDeserializer],
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean),
  "max.poll.records" -> persisterConfig.maxPollRecords,
  "request.timeout.ms" -> persisterConfig.requestTimeoutMs,
  "session.timeout.ms" -> persisterConfig.sessionTimeoutMs,
  "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs,
  "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs
)

16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory 
on ip-172-20-212-53.int.protectwise.net:33038 (size: 146.3 KB, free: 8.4 GB)
16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID 
38837, ip-172-20-212-51.int.protectwise.net): 
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of 
range with no configured reset policy for partitions: 
{observation.http-final-main-0-0=231884473}
at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at 
org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID 
39388) in 12043 ms on ip-172-20-212-49.int.protectwise.net (1/16)
16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.0 (TID 
39375) in 13444 ms on ip-172-20-212-49.int.protectwise.net (2/16)
16/11/09 23:10:44 WARN TaskSetManager: Lost task 1.0 in stage 151.0 (TID 38843, 
ip-172-20-212-52.int.protectwise.net): 
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$K

[jira] [Updated] (KAFKA-4396) Seeing offsets not resetting even when reset policy is configured explicitly

2016-11-09 Thread Justin Miller (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justin Miller updated KAFKA-4396:
-
Description: 
I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be two 
separate errors, I'm not sure. What's puzzling is that I'm setting 
auto.offset.reset to latest and it's still throwing an 
OffsetOutOfRangeException, behavior that's contrary to the code. Please help! :)

```
val kafkaParams = Map[String, Object](
  "group.id" -> consumerGroup,
  "bootstrap.servers" -> bootstrapServers,
  "key.deserializer" -> classOf[ByteArrayDeserializer],
  "value.deserializer" -> classOf[MessageRowDeserializer],
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean),
  "max.poll.records" -> persisterConfig.maxPollRecords,
  "request.timeout.ms" -> persisterConfig.requestTimeoutMs,
  "session.timeout.ms" -> persisterConfig.sessionTimeoutMs,
  "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs,
  "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs
)
```

16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory 
on ip-172-20-212-53.int.protectwise.net:33038 (size: 146.3 KB, free: 8.4 GB)
16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID 
38837, ip-172-20-212-51.int.protectwise.net): 
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of 
range with no configured reset policy for partitions: 
{observation.http-final-main-0-0=231884473}
at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at 
org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID 
39388) in 12043 ms on ip-172-20-212-49.int.protectwise.net (1/16)
16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.0 (TID 
39375) in 13444 ms on ip-172-20-212-49.int.protectwise.net (2/16)
16/11/09 23:10:44 WARN TaskSetManager: Lost task 1.0 in stage 151.0 (TID 38843, 
ip-172-20-212-52.int.protectwise.net): 
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)

  was:
I've been seeing a curi

[GitHub] kafka-site pull request #30: New meetup links and kafka summit to events pag...

2016-11-09 Thread derrickdoo
Github user derrickdoo closed the pull request at:

https://github.com/apache/kafka-site/pull/30


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka-site pull request #30: New meetup links and kafka summit to events pag...

2016-11-09 Thread derrickdoo
GitHub user derrickdoo opened a pull request:

https://github.com/apache/kafka-site/pull/30

New meetup links and kafka summit to events page

Added new meetup locations and a link to the Kafka Summit conference page

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/derrickdoo/kafka-site eventPageUpdates

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka-site/pull/30.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #30


commit 9143eaaa0360bdcd9e4b1ad50251173d8f151584
Author: Derrick Or 
Date:   2016-11-09T23:35:51Z

added new meetup links and kafka summit to events page




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2114: MINOR: add upgrade guide for Kafka Streams API

2016-11-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2114


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (KAFKA-4364) Sink tasks expose secrets in DEBUG logging

2016-11-09 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-4364.
--
   Resolution: Fixed
Fix Version/s: 0.10.2.0

Issue resolved by pull request 2115
[https://github.com/apache/kafka/pull/2115]

> Sink tasks expose secrets in DEBUG logging
> --
>
> Key: KAFKA-4364
> URL: https://issues.apache.org/jira/browse/KAFKA-4364
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Ryan P
>Assignee: Ryan P
> Fix For: 0.10.2.0
>
>
> As it stands today worker tasks print secrets such as Key/Trust store 
> passwords to their respective logs. 
> https://github.com/confluentinc/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L213-L214
> i.e.
> [2016-11-01 12:50:59,254] DEBUG Initializing connector test-sink with config 
> {consumer.ssl.truststore.password=password, 
> connector.class=io.confluent.connect.jdbc.JdbcSinkConnector, 
> connection.password=password, producer.security.protocol=SSL, 
> producer.ssl.truststore.password=password, topics=orders, tasks.max=1, 
> consumer.ssl.truststore.location=/tmp/truststore/kafka.trustore.jks, 
> producer.ssl.truststore.location=/tmp/truststore/kafka.trustore.jks, 
> connection.user=connect, name=test-sink, auto.create=true, 
> consumer.security.protocol=SSL, 
> connection.url=jdbc:postgresql://localhost/test} 
> (org.apache.kafka.connect.runtime.WorkerConnector:71)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4364) Sink tasks expose secrets in DEBUG logging

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15652309#comment-15652309
 ] 

ASF GitHub Bot commented on KAFKA-4364:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2115


> Sink tasks expose secrets in DEBUG logging
> --
>
> Key: KAFKA-4364
> URL: https://issues.apache.org/jira/browse/KAFKA-4364
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Ryan P
>Assignee: Ryan P
> Fix For: 0.10.2.0
>
>
> As it stands today worker tasks print secrets such as Key/Trust store 
> passwords to their respective logs. 
> https://github.com/confluentinc/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L213-L214
> i.e.
> [2016-11-01 12:50:59,254] DEBUG Initializing connector test-sink with config 
> {consumer.ssl.truststore.password=password, 
> connector.class=io.confluent.connect.jdbc.JdbcSinkConnector, 
> connection.password=password, producer.security.protocol=SSL, 
> producer.ssl.truststore.password=password, topics=orders, tasks.max=1, 
> consumer.ssl.truststore.location=/tmp/truststore/kafka.trustore.jks, 
> producer.ssl.truststore.location=/tmp/truststore/kafka.trustore.jks, 
> connection.user=connect, name=test-sink, auto.create=true, 
> consumer.security.protocol=SSL, 
> connection.url=jdbc:postgresql://localhost/test} 
> (org.apache.kafka.connect.runtime.WorkerConnector:71)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2115: KAFKA-4364: Remove secrets from DEBUG logging

2016-11-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2115


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[VOTE] KIP-89: Allow sink connectors to decouple flush and offset commit

2016-11-09 Thread Shikhar Bhushan
Hi,

I would like to initiate a vote on KIP-89

https://cwiki.apache.org/confluence/display/KAFKA/KIP-89%3A+Allow+sink+connectors+to+decouple+flush+and+offset+commit

Best,

Shikhar


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-09 Thread Roger Hoover
Sorry for going a little in the weeds but thanks for the replies regarding
varint.

Agreed that a prefix and {int, int} can be the same.  It doesn't look like
that's what the KIP is saying the "Open" section.   The example shows 211
for New Relic and 210002 for App Dynamics implying that the New Relic
organization will have only a single header id to work with.  Or is 211
a prefix?  The main point of a namespace or prefix is to reduce the
overhead of config mapping or registration depending on how
namespaces/prefixes are managed.

Would love to hear more feedback on the higher-level questions though...

Cheers,

Roger


On Wed, Nov 9, 2016 at 11:38 AM, radai  wrote:

> I think this discussion is getting a bit into the weeds on technical
> implementation details.
> I'd liek to step back a minute and try and establish where we are in the
> larger picture:
>
> (re-wording nacho's last paragraph)
> 1. are we all in agreement that headers are a worthwhile and useful
> addition to have? this was contested early on
> 2. are we all in agreement on headers as top level entity vs headers
> squirreled-away in V?
>
> if there are still concerns around these #2 points (#jay? #jun?)?
>
> (and now back to our normal programming ...)
>
> varints are nice. having said that, its adding complexity (see
> https://github.com/addthis/stream-lib/blob/master/src/
> main/java/com/clearspring/analytics/util/Varint.java
> as 1st google result) and would require anyone writing other clients (C?
> Python? Go? Bash? ;-) ) to get/implement the same, and for relatively
> little gain (int vs string is order of magnitude, this isnt).
>
> int namespacing vs {int, int} namespacing are basically the same thing -
> youre just namespacing an int64 and giving people while 2^32 ranges at a
> time. the part i like about this is letting people have a large swath of
> numbers with one registration so they dont have to come back for every
> single plugin/header they want to "reserve".
>
>
> On Wed, Nov 9, 2016 at 11:01 AM, Roger Hoover 
> wrote:
>
> > Since some of the debate has been about overhead + performance, I'm
> > wondering if we have considered a varint encoding (
> > https://developers.google.com/protocol-buffers/docs/encoding#varints)
> for
> > the header length field (int32 in the proposal) and for header ids?  If
> you
> > don't use headers, the overhead would be a single byte and for each
> header
> > id < 128 would also need only a single byte?
> >
> >
> >
> > On Wed, Nov 9, 2016 at 6:43 AM, radai 
> wrote:
> >
> > > @magnus - and very dangerous (youre essentially downloading and
> executing
> > > arbitrary code off the internet on your servers ... bad idea without a
> > > sandbox, even with)
> > >
> > > as for it being a purely administrative task - i disagree.
> > >
> > > i wish it would, really, because then my earlier point on the
> complexity
> > of
> > > the remapping process would be invalid, but at linkedin, for example,
> we
> > > (the team im in) run kafka as a service. we dont really know what our
> > users
> > > (developing applications that use kafka) are up to at any given moment.
> > it
> > > is very possible (given the existance of headers and a corresponding
> > plugin
> > > ecosystem) for some application to "equip" their producers and
> consumers
> > > with the required plugin without us knowing. i dont mean to imply thats
> > > bad, i just want to make the point that its not as simple keeping it in
> > > sync across a large-enough organization.
> > >
> > >
> > > On Wed, Nov 9, 2016 at 6:17 AM, Magnus Edenhill 
> > > wrote:
> > >
> > > > I think there is a piece missing in the Strings discussion, where
> > > > pro-Stringers
> > > > reason that by providing unique string identifiers for each header
> > > > everything will just
> > > > magically work for all parts of the stream pipeline.
> > > >
> > > > But the strings dont mean anything by themselves, and while we could
> > > > probably envision
> > > > some auto plugin loader that downloads, compiles, links and runs
> > plugins
> > > > on-demand
> > > > as soon as they're seen by a consumer, I dont really see a use-case
> for
> > > > something
> > > > so dynamic (and fragile) in practice.
> > > >
> > > > In the real world an application will be configured with a set of
> > plugins
> > > > to either add (producer)
> > > > or read (consumer) headers.
> > > > This is an administrative task based on what features a client
> > > > needs/provides and results in
> > > > some sort of configuration to enable and configure the desired
> plugins.
> > > >
> > > > Since this needs to be kept somewhat in sync across an organisation
> > > (there
> > > > is no point in having producers
> > > > add headers no consumers will read, and vice versa), the added
> > complexity
> > > > of assigning an id namespace
> > > > for each plugin as it is being configured should be tolerable.
> > > >
> > > >
> > > > /Magnus
> > > >
> > > > 2016-11-09 13:06 GMT+01:00 Michael Pearce :
> > > >

Build failed in Jenkins: kafka-trunk-jdk8 #1030

2016-11-09 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] MINOR: remove unused fields from KTableImpl

[wangguoz] MINOR: fix incorrect logging in StreamThread

--
[...truncated 3875 lines...]

kafka.utils.CommandLineUtilsTest > testParseSingleArg STARTED

kafka.utils.CommandLineUtilsTest > testParseSingleArg PASSED

kafka.utils.CommandLineUtilsTest > testParseArgs STARTED

kafka.utils.CommandLineUtilsTest > testParseArgs PASSED

kafka.utils.CommandLineUtilsTest > testParseEmptyArgAsValid STARTED

kafka.utils.CommandLineUtilsTest > testParseEmptyArgAsValid PASSED

kafka.utils.ReplicationUtilsTest > testUpdateLeaderAndIsr STARTED

kafka.utils.ReplicationUtilsTest > testUpdateLeaderAndIsr PASSED

kafka.utils.ReplicationUtilsTest > testGetLeaderIsrAndEpochForPartition STARTED

kafka.utils.ReplicationUtilsTest > testGetLeaderIsrAndEpochForPartition PASSED

kafka.utils.JsonTest > testJsonEncoding STARTED

kafka.utils.JsonTest > testJsonEncoding PASSED

kafka.utils.SchedulerTest > testMockSchedulerNonPeriodicTask STARTED

kafka.utils.SchedulerTest > testMockSchedulerNonPeriodicTask PASSED

kafka.utils.SchedulerTest > testMockSchedulerPeriodicTask STARTED

kafka.utils.SchedulerTest > testMockSchedulerPeriodicTask PASSED

kafka.utils.SchedulerTest > testNonPeriodicTask STARTED

kafka.utils.SchedulerTest > testNonPeriodicTask PASSED

kafka.utils.SchedulerTest > testRestart STARTED

kafka.utils.SchedulerTest > testRestart PASSED

kafka.utils.SchedulerTest > testReentrantTaskInMockScheduler STARTED

kafka.utils.SchedulerTest > testReentrantTaskInMockScheduler PASSED

kafka.utils.SchedulerTest > testPeriodicTask STARTED

kafka.utils.SchedulerTest > testPeriodicTask PASSED

kafka.utils.ZkUtilsTest > testAbortedConditionalDeletePath STARTED

kafka.utils.ZkUtilsTest > testAbortedConditionalDeletePath PASSED

kafka.utils.ZkUtilsTest > testSuccessfulConditionalDeletePath STARTED

kafka.utils.ZkUtilsTest > testSuccessfulConditionalDeletePath PASSED

kafka.utils.ZkUtilsTest > testClusterIdentifierJsonParsing STARTED

kafka.utils.ZkUtilsTest > testClusterIdentifierJsonParsing PASSED

kafka.utils.IteratorTemplateTest > testIterator STARTED

kafka.utils.IteratorTemplateTest > testIterator PASSED

kafka.utils.UtilsTest > testGenerateUuidAsBase64 STARTED

kafka.utils.UtilsTest > testGenerateUuidAsBase64 PASSED

kafka.utils.UtilsTest > testAbs STARTED

kafka.utils.UtilsTest > testAbs PASSED

kafka.utils.UtilsTest > testReplaceSuffix STARTED

kafka.utils.UtilsTest > testReplaceSuffix PASSED

kafka.utils.UtilsTest > testCircularIterator STARTED

kafka.utils.UtilsTest > testCircularIterator PASSED

kafka.utils.UtilsTest > testReadBytes STARTED

kafka.utils.UtilsTest > testReadBytes PASSED

kafka.utils.UtilsTest > testCsvList STARTED

kafka.utils.UtilsTest > testCsvList PASSED

kafka.utils.UtilsTest > testReadInt STARTED

kafka.utils.UtilsTest > testReadInt PASSED

kafka.utils.UtilsTest > testUrlSafeBase64EncodeUUID STARTED

kafka.utils.UtilsTest > testUrlSafeBase64EncodeUUID PASSED

kafka.utils.UtilsTest > testCsvMap STARTED

kafka.utils.UtilsTest > testCsvMap PASSED

kafka.utils.UtilsTest > testInLock STARTED

kafka.utils.UtilsTest > testInLock PASSED

kafka.utils.UtilsTest > testSwallow STARTED

kafka.utils.UtilsTest > testSwallow PASSED

kafka.utils.ByteBoundedBlockingQueueTest > testByteBoundedBlockingQueue STARTED

kafka.utils.ByteBoundedBlockingQueueTest > testByteBoundedBlockingQueue PASSED

kafka.producer.AsyncProducerTest > testFailedSendRetryLogic STARTED

kafka.producer.AsyncProducerTest > testFailedSendRetryLogic PASSED

kafka.producer.AsyncProducerTest > testQueueTimeExpired STARTED

kafka.producer.AsyncProducerTest > testQueueTimeExpired PASSED

kafka.producer.AsyncProducerTest > testPartitionAndCollateEvents STARTED

kafka.producer.AsyncProducerTest > testPartitionAndCollateEvents PASSED

kafka.producer.AsyncProducerTest > testBatchSize STARTED

kafka.producer.AsyncProducerTest > testBatchSize PASSED

kafka.producer.AsyncProducerTest > testSerializeEvents STARTED

kafka.producer.AsyncProducerTest > testSerializeEvents PASSED

kafka.producer.AsyncProducerTest > testProducerQueueSize STARTED

kafka.producer.AsyncProducerTest > testProducerQueueSize PASSED

kafka.producer.AsyncProducerTest > testRandomPartitioner STARTED

kafka.producer.AsyncProducerTest > testRandomPartitioner PASSED

kafka.producer.AsyncProducerTest > testInvalidConfiguration STARTED

kafka.producer.AsyncProducerTest > testInvalidConfiguration PASSED

kafka.producer.AsyncProducerTest > testInvalidPartition STARTED

kafka.producer.AsyncProducerTest > testInvalidPartition PASSED

kafka.producer.AsyncProducerTest > testNoBroker STARTED

kafka.producer.AsyncProducerTest > testNoBroker PASSED

kafka.producer.AsyncProducerTest > testProduceAfterClosed STARTED

kafka.producer.AsyncProducerTest > testProduceAfterClosed PASSED

kafka.producer.AsyncProducerTest > testJavaProducer STARTED

kafka.

Build failed in Jenkins: kafka-trunk-jdk8 #1029

2016-11-09 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-4360:Controller may deadLock when autoLeaderRebalance encounter

[wangguoz] MINOR: fix typos and incorrect docs

[wangguoz] KAFKA-4311: Multi layer cache eviction causes forwarding to incorrect

--
[...truncated 3877 lines...]

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.PrimitiveApiTest > testMultiProduce STARTED

kafka.integration.PrimitiveApiTest > testMultiProduce PASSED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch STARTED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch PASSED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize 
STARTED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests STARTED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests PASSED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch STARTED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression STARTED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression PASSED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic STARTED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic PASSED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest STARTED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.AutoOffsetResetTest > testResetToLa

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-09 Thread Nacho Solis
- Transposition
Within one Kafka ecosystem (a set of clusters, mirror makers, replicators,
clients) header keys should be constant. There is no strong benefit for
allowing header-key transposition in transit. Yes, it's doable, and maybe
the only solution in some edge cases, but not worth the trouble.  Also,
this problem should be a completely separate discussion since it also
applies to strings.

- Namespace
Namespaces are very useful when trying to work in a federated system with
multiple namespaces in play.  This is not the case for Kafka.  In
networking we have interoperation between various vendors of routers,
switches and firewalls and they may need to carry information specific to
the vendor but not the general protocol.  One example is Cisco routers
sending control data to other Cisco routers but having to
traverse/interoperate with Juniper routers.  In this situation, protocols
offer a vendor space. This vendor space is then keyed by organization.
There are multiple established identification systems for this.

In our case, the spacing is more of a role-style, which is what was
reflected in the proposed key space.  It's unlikely LinkedIn will carry
traffic with Uber set keys.

Nacho


On Wed, Nov 9, 2016 at 11:37 AM, Nacho Solis  wrote:

> varint encoding was considered, but it's not as beneficial as it looks.
>
> Varints are useful when you're trying to encode numbers from a very large
> range. They provide some space savings, but it's not as much as people
> expect.
>
> Varints have a problem of checking for validity. Most varints can
> potentially encode the same number in multiple ways. This leads to
> confusion and problems when trying to do exact matching. This means that
> they have to be parsed and assembled first, which may lead to many memory
> moves. Also, if they don't allow for various encodings, then there has to
> be a validity check (was 0 allowed to be encoded in 2 bytes?). And, on top
> of that, they require more code.
>
> At the speed that kafka works varints would be feasible (even with the
> memory copies and the validity check), but since the savings are not that
> big then I don't believe they're worth it.
>
> Nacho
>
>
> On Wed, Nov 9, 2016 at 11:01 AM, Roger Hoover 
> wrote:
>
>> Since some of the debate has been about overhead + performance, I'm
>> wondering if we have considered a varint encoding (
>> https://developers.google.com/protocol-buffers/docs/encoding#varints) for
>> the header length field (int32 in the proposal) and for header ids?  If
>> you
>> don't use headers, the overhead would be a single byte and for each header
>> id < 128 would also need only a single byte?
>>
>>
>>
>> On Wed, Nov 9, 2016 at 6:43 AM, radai  wrote:
>>
>> > @magnus - and very dangerous (youre essentially downloading and
>> executing
>> > arbitrary code off the internet on your servers ... bad idea without a
>> > sandbox, even with)
>> >
>> > as for it being a purely administrative task - i disagree.
>> >
>> > i wish it would, really, because then my earlier point on the
>> complexity of
>> > the remapping process would be invalid, but at linkedin, for example, we
>> > (the team im in) run kafka as a service. we dont really know what our
>> users
>> > (developing applications that use kafka) are up to at any given moment.
>> it
>> > is very possible (given the existance of headers and a corresponding
>> plugin
>> > ecosystem) for some application to "equip" their producers and consumers
>> > with the required plugin without us knowing. i dont mean to imply thats
>> > bad, i just want to make the point that its not as simple keeping it in
>> > sync across a large-enough organization.
>> >
>> >
>> > On Wed, Nov 9, 2016 at 6:17 AM, Magnus Edenhill 
>> > wrote:
>> >
>> > > I think there is a piece missing in the Strings discussion, where
>> > > pro-Stringers
>> > > reason that by providing unique string identifiers for each header
>> > > everything will just
>> > > magically work for all parts of the stream pipeline.
>> > >
>> > > But the strings dont mean anything by themselves, and while we could
>> > > probably envision
>> > > some auto plugin loader that downloads, compiles, links and runs
>> plugins
>> > > on-demand
>> > > as soon as they're seen by a consumer, I dont really see a use-case
>> for
>> > > something
>> > > so dynamic (and fragile) in practice.
>> > >
>> > > In the real world an application will be configured with a set of
>> plugins
>> > > to either add (producer)
>> > > or read (consumer) headers.
>> > > This is an administrative task based on what features a client
>> > > needs/provides and results in
>> > > some sort of configuration to enable and configure the desired
>> plugins.
>> > >
>> > > Since this needs to be kept somewhat in sync across an organisation
>> > (there
>> > > is no point in having producers
>> > > add headers no consumers will read, and vice versa), the added
>> complexity
>> > > of assigning an id namespace
>> > > for each plugin as it is being con

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-09 Thread radai
I think this discussion is getting a bit into the weeds on technical
implementation details.
I'd liek to step back a minute and try and establish where we are in the
larger picture:

(re-wording nacho's last paragraph)
1. are we all in agreement that headers are a worthwhile and useful
addition to have? this was contested early on
2. are we all in agreement on headers as top level entity vs headers
squirreled-away in V?

if there are still concerns around these #2 points (#jay? #jun?)?

(and now back to our normal programming ...)

varints are nice. having said that, its adding complexity (see
https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/util/Varint.java
as 1st google result) and would require anyone writing other clients (C?
Python? Go? Bash? ;-) ) to get/implement the same, and for relatively
little gain (int vs string is order of magnitude, this isnt).

int namespacing vs {int, int} namespacing are basically the same thing -
youre just namespacing an int64 and giving people while 2^32 ranges at a
time. the part i like about this is letting people have a large swath of
numbers with one registration so they dont have to come back for every
single plugin/header they want to "reserve".


On Wed, Nov 9, 2016 at 11:01 AM, Roger Hoover 
wrote:

> Since some of the debate has been about overhead + performance, I'm
> wondering if we have considered a varint encoding (
> https://developers.google.com/protocol-buffers/docs/encoding#varints) for
> the header length field (int32 in the proposal) and for header ids?  If you
> don't use headers, the overhead would be a single byte and for each header
> id < 128 would also need only a single byte?
>
>
>
> On Wed, Nov 9, 2016 at 6:43 AM, radai  wrote:
>
> > @magnus - and very dangerous (youre essentially downloading and executing
> > arbitrary code off the internet on your servers ... bad idea without a
> > sandbox, even with)
> >
> > as for it being a purely administrative task - i disagree.
> >
> > i wish it would, really, because then my earlier point on the complexity
> of
> > the remapping process would be invalid, but at linkedin, for example, we
> > (the team im in) run kafka as a service. we dont really know what our
> users
> > (developing applications that use kafka) are up to at any given moment.
> it
> > is very possible (given the existance of headers and a corresponding
> plugin
> > ecosystem) for some application to "equip" their producers and consumers
> > with the required plugin without us knowing. i dont mean to imply thats
> > bad, i just want to make the point that its not as simple keeping it in
> > sync across a large-enough organization.
> >
> >
> > On Wed, Nov 9, 2016 at 6:17 AM, Magnus Edenhill 
> > wrote:
> >
> > > I think there is a piece missing in the Strings discussion, where
> > > pro-Stringers
> > > reason that by providing unique string identifiers for each header
> > > everything will just
> > > magically work for all parts of the stream pipeline.
> > >
> > > But the strings dont mean anything by themselves, and while we could
> > > probably envision
> > > some auto plugin loader that downloads, compiles, links and runs
> plugins
> > > on-demand
> > > as soon as they're seen by a consumer, I dont really see a use-case for
> > > something
> > > so dynamic (and fragile) in practice.
> > >
> > > In the real world an application will be configured with a set of
> plugins
> > > to either add (producer)
> > > or read (consumer) headers.
> > > This is an administrative task based on what features a client
> > > needs/provides and results in
> > > some sort of configuration to enable and configure the desired plugins.
> > >
> > > Since this needs to be kept somewhat in sync across an organisation
> > (there
> > > is no point in having producers
> > > add headers no consumers will read, and vice versa), the added
> complexity
> > > of assigning an id namespace
> > > for each plugin as it is being configured should be tolerable.
> > >
> > >
> > > /Magnus
> > >
> > > 2016-11-09 13:06 GMT+01:00 Michael Pearce :
> > >
> > > > Just following/catching up on what seems to be an active night :)
> > > >
> > > > @Radai sorry if it may seem obvious but what does MD stand for?
> > > >
> > > > My take on String vs Int:
> > > >
> > > > I will state first I am pro Int (16 or 32).
> > > >
> > > > I do though playing devils advocate see a big plus with the argument
> of
> > > > String keys, this is around integrating into an existing eco-system.
> > > >
> > > > As many other systems use String based headers (Flume, JMS)  it makes
> > it
> > > > much easier for these to be incorporated/integrated into.
> > > >
> > > > How with Int based headers could we provide a way/guidence to make
> this
> > > > integration simple / easy with transition flows over to kafka?
> > > >
> > > > * tough luck buddy you're on your own
> > > > * simply hash the string into int code and hope for no collisions
> (how
> > to
> > > > convert ba

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-09 Thread Nacho Solis
varint encoding was considered, but it's not as beneficial as it looks.

Varints are useful when you're trying to encode numbers from a very large
range. They provide some space savings, but it's not as much as people
expect.

Varints have a problem of checking for validity. Most varints can
potentially encode the same number in multiple ways. This leads to
confusion and problems when trying to do exact matching. This means that
they have to be parsed and assembled first, which may lead to many memory
moves. Also, if they don't allow for various encodings, then there has to
be a validity check (was 0 allowed to be encoded in 2 bytes?). And, on top
of that, they require more code.

At the speed that kafka works varints would be feasible (even with the
memory copies and the validity check), but since the savings are not that
big then I don't believe they're worth it.

Nacho


On Wed, Nov 9, 2016 at 11:01 AM, Roger Hoover 
wrote:

> Since some of the debate has been about overhead + performance, I'm
> wondering if we have considered a varint encoding (
> https://developers.google.com/protocol-buffers/docs/encoding#varints) for
> the header length field (int32 in the proposal) and for header ids?  If you
> don't use headers, the overhead would be a single byte and for each header
> id < 128 would also need only a single byte?
>
>
>
> On Wed, Nov 9, 2016 at 6:43 AM, radai  wrote:
>
> > @magnus - and very dangerous (youre essentially downloading and executing
> > arbitrary code off the internet on your servers ... bad idea without a
> > sandbox, even with)
> >
> > as for it being a purely administrative task - i disagree.
> >
> > i wish it would, really, because then my earlier point on the complexity
> of
> > the remapping process would be invalid, but at linkedin, for example, we
> > (the team im in) run kafka as a service. we dont really know what our
> users
> > (developing applications that use kafka) are up to at any given moment.
> it
> > is very possible (given the existance of headers and a corresponding
> plugin
> > ecosystem) for some application to "equip" their producers and consumers
> > with the required plugin without us knowing. i dont mean to imply thats
> > bad, i just want to make the point that its not as simple keeping it in
> > sync across a large-enough organization.
> >
> >
> > On Wed, Nov 9, 2016 at 6:17 AM, Magnus Edenhill 
> > wrote:
> >
> > > I think there is a piece missing in the Strings discussion, where
> > > pro-Stringers
> > > reason that by providing unique string identifiers for each header
> > > everything will just
> > > magically work for all parts of the stream pipeline.
> > >
> > > But the strings dont mean anything by themselves, and while we could
> > > probably envision
> > > some auto plugin loader that downloads, compiles, links and runs
> plugins
> > > on-demand
> > > as soon as they're seen by a consumer, I dont really see a use-case for
> > > something
> > > so dynamic (and fragile) in practice.
> > >
> > > In the real world an application will be configured with a set of
> plugins
> > > to either add (producer)
> > > or read (consumer) headers.
> > > This is an administrative task based on what features a client
> > > needs/provides and results in
> > > some sort of configuration to enable and configure the desired plugins.
> > >
> > > Since this needs to be kept somewhat in sync across an organisation
> > (there
> > > is no point in having producers
> > > add headers no consumers will read, and vice versa), the added
> complexity
> > > of assigning an id namespace
> > > for each plugin as it is being configured should be tolerable.
> > >
> > >
> > > /Magnus
> > >
> > > 2016-11-09 13:06 GMT+01:00 Michael Pearce :
> > >
> > > > Just following/catching up on what seems to be an active night :)
> > > >
> > > > @Radai sorry if it may seem obvious but what does MD stand for?
> > > >
> > > > My take on String vs Int:
> > > >
> > > > I will state first I am pro Int (16 or 32).
> > > >
> > > > I do though playing devils advocate see a big plus with the argument
> of
> > > > String keys, this is around integrating into an existing eco-system.
> > > >
> > > > As many other systems use String based headers (Flume, JMS)  it makes
> > it
> > > > much easier for these to be incorporated/integrated into.
> > > >
> > > > How with Int based headers could we provide a way/guidence to make
> this
> > > > integration simple / easy with transition flows over to kafka?
> > > >
> > > > * tough luck buddy you're on your own
> > > > * simply hash the string into int code and hope for no collisions
> (how
> > to
> > > > convert back though?)
> > > > * http2 style as mentioned by nacho.
> > > >
> > > > cheers,
> > > > Mike
> > > >
> > > >
> > > > 
> > > > From: radai 
> > > > Sent: Wednesday, November 9, 2016 8:12 AM
> > > > To: dev@kafka.apache.org
> > > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers
> > > >
> > > > thinking about it some mor

[jira] [Comment Edited] (KAFKA-4391) On Windows, Kafka server stops with uncaught exception after coming back from sleep

2016-11-09 Thread Yiquan Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15651828#comment-15651828
 ] 

Yiquan Zhou edited comment on KAFKA-4391 at 11/9/16 7:32 PM:
-

All the "Power options" settings are set to default values. In the real use 
case of my application, I know that this exception occurs on many other Windows 
machines.

Even if the disk is deactivated during the sleep, the exception is thrown after 
coming back from the sleep mode so I think that the disk and network are all 
running again. What could prevent the Kafka server from renaming a temporary 
file?


was (Author: yqzhou):
All the "Power options" settings are set to default values. In the real use 
case of my application, I know that this exception occurs on many other Windows 
machines.

Even if the disk is deactivated during the sleep, the exception is thrown after 
coming back from the sleep mode so I think that the disk and network are all 
running again. Why the Kafka server fails to rename a temporary file?

> On Windows, Kafka server stops with uncaught exception after coming back from 
> sleep
> ---
>
> Key: KAFKA-4391
> URL: https://issues.apache.org/jira/browse/KAFKA-4391
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
> Environment: Windows 10, jdk1.8.0_111
>Reporter: Yiquan Zhou
>
> Steps to reproduce:
> 1. start the zookeeper
> $ bin\windows\zookeeper-server-start.bat config/zookeeper.properties
> 2. start the Kafka server with the default properties
> $ bin\windows\kafka-server-start.bat config/server.properties
> 3. put Windows into sleep mode for 1-2 hours
> 4. activate Windows again, an exception occurs in Kafka server console and 
> the server is stopped:
> {code:title=kafka console log}
> [2016-11-08 21:45:35,185] INFO Client session timed out, have not heard from 
> server in 10081379ms for sessionid 0x1584514da47, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:40,698] INFO zookeeper state changed (Disconnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-11-08 21:45:43,029] INFO Opening socket connection to server 
> 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,044] INFO Socket connection established to 
> 127.0.0.1/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,158] INFO Unable to reconnect to ZooKeeper service, 
> session 0x1584514da47 has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,158] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-11-08 21:45:43,236] INFO Initiating client connection, 
> connectString=localhost:2181 sessionTimeout=6000 
> watcher=org.I0Itec.zkclient.ZkClient@11ca437b (org.apache.zookeeper.ZooKeeper)
> [2016-11-08 21:45:43,280] INFO EventThread shut down 
> (org.apache.zookeeper.ClientCnxn)
> log4j:ERROR Failed to rename [/controller.log] to 
> [/controller.log.2016-11-08-18].
> [2016-11-08 21:45:43,421] INFO Opening socket connection to server 
> 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,483] INFO Socket connection established to 
> 127.0.0.1/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,811] INFO Session establishment complete on server 
> 127.0.0.1/127.0.0.1:2181, sessionid = 0x1584514da470001, negotiated timeout = 
> 6000 (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,827] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> log4j:ERROR Failed to rename [/server.log] to [/server.log.2016-11-08-18].
> [2016-11-08 21:45:43,827] INFO Creating /controller (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-11-08 21:45:44,014] INFO Result of znode creation is: OK 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-11-08 21:45:44,014] INFO 0 successfully elected as leader 
> (kafka.server.ZookeeperLeaderElector)
> log4j:ERROR Failed to rename [/state-change.log] to 
> [/state-change.log.2016-11-08-18].
> [2016-11-08 21:45:44,421] INFO re-registering broker info in ZK for broker 0 
> (kafka.server.KafkaHealthcheck)
> [2016-11-08 21:45:44,436] INFO Creating /brokers/ids/0 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-11-08 21:45:44,686] INFO Result of znode creation is: OK 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-11-08 21:45:44,686] INFO Registered broker 0 at path /brokers/ids/0 
> with addresses: PLAINTEXT -> EndPoint(192.168.0.15,9092,PLAINTEXT) 
> (kafka.utils.ZkUtils)
> [2016-11-08 21:45:4

[jira] [Commented] (KAFKA-4391) On Windows, Kafka server stops with uncaught exception after coming back from sleep

2016-11-09 Thread Yiquan Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15651828#comment-15651828
 ] 

Yiquan Zhou commented on KAFKA-4391:


All the "Power options" settings are set to default values. In the real use 
case of my application, I know that this exception occurs on many other Windows 
machines.

Even if the disk is deactivated during the sleep, the exception is thrown after 
coming back from the sleep mode so I think that the disk and network are all 
running again. Why the Kafka server fails to rename a temporary file?

> On Windows, Kafka server stops with uncaught exception after coming back from 
> sleep
> ---
>
> Key: KAFKA-4391
> URL: https://issues.apache.org/jira/browse/KAFKA-4391
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
> Environment: Windows 10, jdk1.8.0_111
>Reporter: Yiquan Zhou
>
> Steps to reproduce:
> 1. start the zookeeper
> $ bin\windows\zookeeper-server-start.bat config/zookeeper.properties
> 2. start the Kafka server with the default properties
> $ bin\windows\kafka-server-start.bat config/server.properties
> 3. put Windows into sleep mode for 1-2 hours
> 4. activate Windows again, an exception occurs in Kafka server console and 
> the server is stopped:
> {code:title=kafka console log}
> [2016-11-08 21:45:35,185] INFO Client session timed out, have not heard from 
> server in 10081379ms for sessionid 0x1584514da47, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:40,698] INFO zookeeper state changed (Disconnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-11-08 21:45:43,029] INFO Opening socket connection to server 
> 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,044] INFO Socket connection established to 
> 127.0.0.1/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,158] INFO Unable to reconnect to ZooKeeper service, 
> session 0x1584514da47 has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,158] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-11-08 21:45:43,236] INFO Initiating client connection, 
> connectString=localhost:2181 sessionTimeout=6000 
> watcher=org.I0Itec.zkclient.ZkClient@11ca437b (org.apache.zookeeper.ZooKeeper)
> [2016-11-08 21:45:43,280] INFO EventThread shut down 
> (org.apache.zookeeper.ClientCnxn)
> log4j:ERROR Failed to rename [/controller.log] to 
> [/controller.log.2016-11-08-18].
> [2016-11-08 21:45:43,421] INFO Opening socket connection to server 
> 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,483] INFO Socket connection established to 
> 127.0.0.1/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,811] INFO Session establishment complete on server 
> 127.0.0.1/127.0.0.1:2181, sessionid = 0x1584514da470001, negotiated timeout = 
> 6000 (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,827] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> log4j:ERROR Failed to rename [/server.log] to [/server.log.2016-11-08-18].
> [2016-11-08 21:45:43,827] INFO Creating /controller (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-11-08 21:45:44,014] INFO Result of znode creation is: OK 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-11-08 21:45:44,014] INFO 0 successfully elected as leader 
> (kafka.server.ZookeeperLeaderElector)
> log4j:ERROR Failed to rename [/state-change.log] to 
> [/state-change.log.2016-11-08-18].
> [2016-11-08 21:45:44,421] INFO re-registering broker info in ZK for broker 0 
> (kafka.server.KafkaHealthcheck)
> [2016-11-08 21:45:44,436] INFO Creating /brokers/ids/0 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-11-08 21:45:44,686] INFO Result of znode creation is: OK 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-11-08 21:45:44,686] INFO Registered broker 0 at path /brokers/ids/0 
> with addresses: PLAINTEXT -> EndPoint(192.168.0.15,9092,PLAINTEXT) 
> (kafka.utils.ZkUtils)
> [2016-11-08 21:45:44,686] INFO done re-registering broker 
> (kafka.server.KafkaHealthcheck)
> [2016-11-08 21:45:44,686] INFO Subscribing to /brokers/topics path to watch 
> for new topics (kafka.server.KafkaHealthcheck)
> [2016-11-08 21:45:45,046] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
> [2016-11-08 21:45:45,061] INFO New leader is 0 
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> [2016-11-08 21:45:47,

[GitHub] kafka pull request #2109: MINOR: fix incorrect logging in StreamThread

2016-11-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2109


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP 88: OffsetFetch Protocol Update

2016-11-09 Thread Vahid S Hashemian
Jason,
For some reason I did not receive your earlier response to the thread.
I just saw it when I went to 
https://www.mail-archive.com/dev@kafka.apache.org/msg59608.html
In the updated KIP I exposed the capability via KafkaConsumer (your first 
suggestion), but would be happy to look into adding it to AdminClient in 
the next round if you think that's the better approach.
Thanks.
--Vahid



From:   Vahid S Hashemian/Silicon Valley/IBM@IBMUS
To: dev@kafka.apache.org
Date:   11/09/2016 11:12 AM
Subject:Re: [DISCUSS] KIP 88: OffsetFetch Protocol Update



Hi Jason (et al.),

I modified the KIP towards a change in OffsetFetch protocol, as per your 
suggestion.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-88%3A+OffsetFetch+Protocol+Update


I'd appreciate it if you could take another look and share your thoughts.
Thanks.
--Vahid



From:   Vahid S Hashemian/Silicon Valley/IBM@IBMUS
To: dev@kafka.apache.org
Date:   11/07/2016 03:28 PM
Subject:Re: [DISCUSS] KIP 88: DescribeGroups Protocol Update



Hi Jason,

Thanks for your feedback.

Yes, the intent of the KIP is to make existing offsets of the group 
available even when there is no active consumers in the group consuming 
from one or more topic partitions.
Your suggestion should also work. I'm not yet sure how to obtain group's 
all topic partitions and need to look more closely at your 'null=all' 
suggestion, as I couldn't immediately see an obvious way to extract that 
info in the OffsetFetch handler. I'll dig deeper.
Just a quick note that using the OffsetFetch API would not address the 
second (but minor) problem described in the current KIP (the dummy group 
member). I assume that is not a big concern.

Thanks.
--Vahid




From:   Jason Gustafson 
To: dev@kafka.apache.org
Date:   11/07/2016 09:19 AM
Subject:Re: [DISCUSS] KIP 88: DescribeGroups Protocol Update



Hey Vahid,

Thanks for the KIP. If I understand correctly, the problem is how to fetch
existing offsets for a group which has no active members, right? I'm not
totally clear why we need to modify the DescribeGroups API in order to
achieve this since we already have the OffsetFetch API. I think the
limitation currently is that you need to know the partitions to fetch
offsets for, but perhaps we could modify it to support the "null=all"
semantics that we used for the TopicMetadata API?

Thanks,
Jason

On Thu, Nov 3, 2016 at 11:09 AM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> Hi all,
>
> I started a new KIP under
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 88%3A+DescribeGroups+Protocol+Update
> .
>
> The KIP is a proposal to update the DescribeGroups protocol to address
> KAFKA-3853 (https://issues.apache.org/jira/browse/KAFKA-3853).
>
> I appreciate your feedback.
>
> Thanks.
> --Vahid
>
>














[jira] [Commented] (KAFKA-1894) Avoid long or infinite blocking in the consumer

2016-11-09 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15651799#comment-15651799
 ] 

Shixiong Zhu commented on KAFKA-1894:
-

Hit the same issue in Spark Structured Streaming Kafka Source. These loops 
should check the current thread's interrupted status.

> Avoid long or infinite blocking in the consumer
> ---
>
> Key: KAFKA-1894
> URL: https://issues.apache.org/jira/browse/KAFKA-1894
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>Assignee: Jason Gustafson
> Fix For: 0.10.2.0
>
>
> The new consumer has a lot of loops that look something like
> {code}
>   while(!isThingComplete())
> client.poll();
> {code}
> This occurs both in KafkaConsumer but also in NetworkClient.completeAll. 
> These retry loops are actually mostly the behavior we want but there are 
> several cases where they may cause problems:
>  - In the case of a hard failure we may hang for a long time or indefinitely 
> before realizing the connection is lost.
>  - In the case where the cluster is malfunctioning or down we may retry 
> forever.
> It would probably be better to give a timeout to these. The proposed approach 
> would be to add something like retry.time.ms=6 and only continue retrying 
> for that period of time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2119: MINOR: remove unused fields from KTableImpl

2016-11-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2119


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP 88: OffsetFetch Protocol Update

2016-11-09 Thread Vahid S Hashemian
Hi Jason (et al.),

I modified the KIP towards a change in OffsetFetch protocol, as per your 
suggestion.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-88%3A+OffsetFetch+Protocol+Update

I'd appreciate it if you could take another look and share your thoughts.
Thanks.
--Vahid



From:   Vahid S Hashemian/Silicon Valley/IBM@IBMUS
To: dev@kafka.apache.org
Date:   11/07/2016 03:28 PM
Subject:Re: [DISCUSS] KIP 88: DescribeGroups Protocol Update



Hi Jason,

Thanks for your feedback.

Yes, the intent of the KIP is to make existing offsets of the group 
available even when there is no active consumers in the group consuming 
from one or more topic partitions.
Your suggestion should also work. I'm not yet sure how to obtain group's 
all topic partitions and need to look more closely at your 'null=all' 
suggestion, as I couldn't immediately see an obvious way to extract that 
info in the OffsetFetch handler. I'll dig deeper.
Just a quick note that using the OffsetFetch API would not address the 
second (but minor) problem described in the current KIP (the dummy group 
member). I assume that is not a big concern.

Thanks.
--Vahid




From:   Jason Gustafson 
To: dev@kafka.apache.org
Date:   11/07/2016 09:19 AM
Subject:Re: [DISCUSS] KIP 88: DescribeGroups Protocol Update



Hey Vahid,

Thanks for the KIP. If I understand correctly, the problem is how to fetch
existing offsets for a group which has no active members, right? I'm not
totally clear why we need to modify the DescribeGroups API in order to
achieve this since we already have the OffsetFetch API. I think the
limitation currently is that you need to know the partitions to fetch
offsets for, but perhaps we could modify it to support the "null=all"
semantics that we used for the TopicMetadata API?

Thanks,
Jason

On Thu, Nov 3, 2016 at 11:09 AM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> Hi all,
>
> I started a new KIP under
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 88%3A+DescribeGroups+Protocol+Update
> .
>
> The KIP is a proposal to update the DescribeGroups protocol to address
> KAFKA-3853 (https://issues.apache.org/jira/browse/KAFKA-3853).
>
> I appreciate your feedback.
>
> Thanks.
> --Vahid
>
>










[jira] [Commented] (KAFKA-3901) KStreamTransformValues$KStreamTransformValuesProcessor#process() forwards null values

2016-11-09 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15651775#comment-15651775
 ] 

Guozhang Wang commented on KAFKA-3901:
--

[~dminkovsky] The motivation of KAFKA-3519 is that, if user's defined 
`transform` or `punctuate` function returns {{null}}, it indicates users do not 
want to forward any data downstream. In other words, {{null}} is not used as a 
special value but as a signal to indicate "no output".

Could you describe your scenario where you would need {{}} as a 
special record?

> KStreamTransformValues$KStreamTransformValuesProcessor#process() forwards 
> null values
> -
>
> Key: KAFKA-3901
> URL: https://issues.apache.org/jira/browse/KAFKA-3901
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Dmitry Minkovsky
>Assignee: Guozhang Wang
>Priority: Minor
>
> Did this get missed in [KAFKA-3519: Refactor Transformer's transform / 
> punctuate to return nullable 
> value|https://github.com/apache/kafka/commit/40fd456649b5df29d030da46865b5e7e0ca6db15#diff-338c230fd5a15d98550230007651a224]?
>  I think it may have, because that processor's #punctuate() does not forward 
> null. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-09 Thread Mayuresh Gharat
I see the reasoning and might be inclined to agree a bit :
If we go to stage 2, the only difference is that we can theoretically
support a null value non-tombstone message in a log compacted topic, but I
am not sure if that has any use case.

But as an end goal I see that kafka should clearly specify what it means by
a tombstone : is it the attribute flag OR is it the null value. If we just
do stage 1, I don't think we are defining the end-goal completely.
Again this is more about semantics of correctness of end state.

Thanks,

Mayuresh

On Wed, Nov 9, 2016 at 10:49 AM, Becket Qin  wrote:

> I am not sure if we need the second stage. Wouldn't it be enough to say
> that a message is a tombstone if one of the following is true?
> 1. tombstone flag is set.
> 2. value is null.
>
> If we go to stage 2, the only difference is that we can theoretically
> support a null value non-tombstone message in a log compacted topic, but I
> am not sure if that has any use case.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Wed, Nov 9, 2016 at 9:23 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com>
> wrote:
>
> > I think it will be a good idea. +1
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Wed, Nov 9, 2016 at 9:13 AM, Michael Pearce 
> > wrote:
> >
> > > +1 Mayuresh, I think this is a good solution/strategy.
> > >
> > > Shall we update the KIP with this? Becket/Jun/Joel any comments to add
> > > before we do?
> > >
> > > On 08/11/2016, 17:29, "Mayuresh Gharat" 
> > > wrote:
> > >
> > > I think the migration can be done in 2 stages :
> > >
> > > 1) In first stage the broker should understand the attribute flag
> as
> > > well
> > > as Null for the value for log compaction.
> > > 2) In second stage we move on to supporting only the attribute flag
> > > for log
> > > compaction.
> > >
> > > I agree with Becket that for older clients (consumers) the broker
> > might
> > > have to down convert a message that has the attribute flag set for
> > log
> > > compacting but has a non null value. But this should be in first
> > stage.
> > > Once all the clients have upgraded (clients start recognizing the
> > > attribute
> > > flag), we can move the broker to stage 2.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Tue, Nov 8, 2016 at 12:06 AM, Michael Pearce <
> > michael.pea...@ig.com
> > > >
> > > wrote:
> > >
> > > > Also we can add further guidance:
> > > >
> > > > To  avoid the below caveat to organisations by promoting of
> > > upgrading all
> > > > consumers first before relying on producing tombstone messages
> with
> > > data
> > > >
> > > > Sent using OWA for iPhone
> > > > 
> > > > From: Michael Pearce
> > > > Sent: Tuesday, November 8, 2016 8:03:32 AM
> > > > To: dev@kafka.apache.org
> > > > Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
> > > >
> > > > Thanks Jun on the feedback, I think I understand the issue/point
> > now.
> > > >
> > > > We def can add that on older client version if tombstone marker
> > make
> > > the
> > > > value null to preserve behaviour.
> > > >
> > > > There is one caveats to this:
> > > >
> > > > * we have to be clear that data is lost if reading via old
> > > client/message
> > > > format - I don't think this is a big issue as mostly the idea/use
> > > case is
> > > > around meta data transport as such would only be as bad as
> current
> > > situation
> > > >
> > > > Re having configurable broker this was to handle cases like you
> > > described
> > > > but in another way by allowing organisation choose the behaviour
> of
> > > the
> > > > compaction per broker or per topic so they could manage their
> > > transition to
> > > > using tombstone markers.
> > > >
> > > > On hind sight it maybe easier to just upgrade and downgrade the
> > > messages
> > > > on version as you propose.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > Sent using OWA for iPhone
> > > > 
> > > > From: Jun Rao 
> > > > Sent: Tuesday, November 8, 2016 12:34:41 AM
> > > > To: dev@kafka.apache.org
> > > > Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
> > > >
> > > > For the use case, one potential use case is for schema
> > registration.
> > > For
> > > > example, in Avro, a null value corresponds to a Null schema. So,
> if
> > > you
> > > > want to be able to keep the schema id in a delete message, the
> > value
> > > can't
> > > > be null. We could get around this issue by specializing null
> value
> > > during
> > > > schema registration though.
> > > >
> > > > Now for the proposed changes. We probably should preserve client
> > > > compatibility. If a client application is sending a null value
> to a
> > > > compacted topic, ideally, 

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-09 Thread Roger Hoover
Since some of the debate has been about overhead + performance, I'm
wondering if we have considered a varint encoding (
https://developers.google.com/protocol-buffers/docs/encoding#varints) for
the header length field (int32 in the proposal) and for header ids?  If you
don't use headers, the overhead would be a single byte and for each header
id < 128 would also need only a single byte?



On Wed, Nov 9, 2016 at 6:43 AM, radai  wrote:

> @magnus - and very dangerous (youre essentially downloading and executing
> arbitrary code off the internet on your servers ... bad idea without a
> sandbox, even with)
>
> as for it being a purely administrative task - i disagree.
>
> i wish it would, really, because then my earlier point on the complexity of
> the remapping process would be invalid, but at linkedin, for example, we
> (the team im in) run kafka as a service. we dont really know what our users
> (developing applications that use kafka) are up to at any given moment. it
> is very possible (given the existance of headers and a corresponding plugin
> ecosystem) for some application to "equip" their producers and consumers
> with the required plugin without us knowing. i dont mean to imply thats
> bad, i just want to make the point that its not as simple keeping it in
> sync across a large-enough organization.
>
>
> On Wed, Nov 9, 2016 at 6:17 AM, Magnus Edenhill 
> wrote:
>
> > I think there is a piece missing in the Strings discussion, where
> > pro-Stringers
> > reason that by providing unique string identifiers for each header
> > everything will just
> > magically work for all parts of the stream pipeline.
> >
> > But the strings dont mean anything by themselves, and while we could
> > probably envision
> > some auto plugin loader that downloads, compiles, links and runs plugins
> > on-demand
> > as soon as they're seen by a consumer, I dont really see a use-case for
> > something
> > so dynamic (and fragile) in practice.
> >
> > In the real world an application will be configured with a set of plugins
> > to either add (producer)
> > or read (consumer) headers.
> > This is an administrative task based on what features a client
> > needs/provides and results in
> > some sort of configuration to enable and configure the desired plugins.
> >
> > Since this needs to be kept somewhat in sync across an organisation
> (there
> > is no point in having producers
> > add headers no consumers will read, and vice versa), the added complexity
> > of assigning an id namespace
> > for each plugin as it is being configured should be tolerable.
> >
> >
> > /Magnus
> >
> > 2016-11-09 13:06 GMT+01:00 Michael Pearce :
> >
> > > Just following/catching up on what seems to be an active night :)
> > >
> > > @Radai sorry if it may seem obvious but what does MD stand for?
> > >
> > > My take on String vs Int:
> > >
> > > I will state first I am pro Int (16 or 32).
> > >
> > > I do though playing devils advocate see a big plus with the argument of
> > > String keys, this is around integrating into an existing eco-system.
> > >
> > > As many other systems use String based headers (Flume, JMS)  it makes
> it
> > > much easier for these to be incorporated/integrated into.
> > >
> > > How with Int based headers could we provide a way/guidence to make this
> > > integration simple / easy with transition flows over to kafka?
> > >
> > > * tough luck buddy you're on your own
> > > * simply hash the string into int code and hope for no collisions (how
> to
> > > convert back though?)
> > > * http2 style as mentioned by nacho.
> > >
> > > cheers,
> > > Mike
> > >
> > >
> > > 
> > > From: radai 
> > > Sent: Wednesday, November 9, 2016 8:12 AM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers
> > >
> > > thinking about it some more, the best way to transmit the header
> > remapping
> > > data to consumers would be to put it in the MD response payload, so
> maybe
> > > it should be discussed now.
> > >
> > >
> > > On Wed, Nov 9, 2016 at 12:09 AM, radai 
> > wrote:
> > >
> > > > im not opposed to the idea of namespace mapping. all im saying is
> that
> > > its
> > > > not part of the "mvp" and, since it requires no wire format change,
> can
> > > > always be added later.
> > > > also, its not as simple as just configuring MM to do the transform:
> > lets
> > > > say i've implemented large message support as {666,1} and on some
> > mirror
> > > > target cluster its been remapped to {999,1}. the consumer plugin code
> > > would
> > > > also need to be told to look for the large message "part X of Y"
> header
> > > > under {999,1}. doable, but tricky.
> > > >
> > > > On Tue, Nov 8, 2016 at 10:29 PM, Gwen Shapira 
> > wrote:
> > > >
> > > >> While you can do whatever you want with a namespace and your code,
> > > >> what I'd expect is for each app to namespaces configurable...
> > > >>
> > > >> So if I accidentally used 666 for my HR department, and still want
> to
> > 

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-09 Thread Nacho Solis
​What's the main idea behind the namespaceId?  What are we segmenting?

The KIP does not suggest a global registry. It does suggest segmentation
via a "prefix".  The proposal in the current KIP has 4 proposed sections,
Kafka open source, Local, Open and Testing.  Each of these is subdivided.
There is no coordination needed between your org and any other org to use
the Local space.  This obviously does not have global scope, only local
scope.

>From this perspective numbers that start with 0x0010 are only used by the
local organization's infrastructure/kafka team.

The actual assignment of ints is separate from the segmentation. Configs as
suggested above are a perfectly reasonable way to do it.  The KIP proposes
a single file for Kafka open source where it assigns it's keys (in the
current proposal from the prefix of 0x).  Locally at your org you could
do whatever you desired.

While I like the idea of decentralized numbers, I would really like to make
sure we do it within subspaces. The reason is that I want to be able to
check from a client (or for that matter, at any layer in the stack),
whether a certain set of keys are allowed to be added or are reserved for
the lower layers.

Finally, the current proposal for key allocation has a lot of reserved
space: 0x004 - 0xFFF​.

What would an alternate proposal with namespace look like?


One thing to note is that the way that the keys space is handled (whether
ints or strings) is partially separate from the fact of having headers and
where to have them.

- Headers yes or no?
- Native or as V-wrapper?
- Inside Apache open source?
- Ints vs Strings for keys?

Nacho


On Tue, Nov 8, 2016 at 5:54 PM, Gwen Shapira  wrote:

> Thank you so much for this clear and fair summary of the arguments.
>
> I'm in favor of ints. Not a deal-breaker, but in favor.
>
> Even more in favor of Magnus's decentralized suggestion with Roger's
> tweak: add a namespace for headers. This will allow each app to just
> use whatever IDs it wants internally, and then let the admin deploying
> the app figure out an available namespace ID for the app to live in.
> So io.confluent.schema-registry can be namespace 0x01 on my deployment
> and 0x57 on yours, and the poor guys developing the app don't need to
> worry about that.
>
> Gwen
>
> On Tue, Nov 8, 2016 at 4:23 PM, radai  wrote:
> > +1 for sean's document. it covers pretty much all the trade-offs and
> > provides concrete figures to argue about :-)
> > (nit-picking - used the same xkcd twice, also trove has been superceded
> for
> > purposes of high performance collections: look at
> > https://github.com/leventov/Koloboke)
> >
> > so to sum up the string vs int debate:
> >
> > performance - you can do 140k ops/sec _per thread_ with string headers.
> you
> > could do x2-3 better with ints. there's no arguing the relative diff
> > between the two, there's only the question of whether or not _the rest of
> > kafka_ operates fast enough to care. if we want to make choices solely
> > based on performance we need ints. if we are willing to settle/compromise
> > for a nicer (to some) API than strings are good enough for the current
> > state of affairs.
> >
> > message size - with batching and compression it comes down to a ~5%
> > difference (internal testing, not in the doc. maybe would help adding if
> > this becomes a point of contention?). this means it wont really affect
> > kafka in "throughput mode" (large, compressed batches). in "low latency"
> > mode (meaning less/no batching and compression) the difference can be
> > extreme (it'll easily be an order of magnitude with small payloads like
> > stock ticks and header keys of the form
> > "com.acme.infraTeam.kafka.hiMom.auditPlugin"). we have a few such
> topics at
> > linkedin where actual payloads are ~2 ints and are eclipsed by our
> in-house
> > audit "header" which is why we liked ints to begin with.
> >
> > "ease of use" - strings would probably still require _some_ degree of
> > partitioning by convention (imagine if everyone used the key "infra"...)
> > but its very intuitive for java devs to do anyway (reverse-domain is
> > ingrained into java developers at a young age :-) ). also most java devs
> > find Map more intuitive than Map -
> > probably because of other text-based protocols like http. ints would
> > require a number registry. if you think number registries are hard just
> > look at the wiki page for KIPs (specifically the number for next
> available
> > KIP) and think again - we are probably talking about the same volume of
> > requests. also this would only be "required" (good citizenship, more
> like)
> > if you want to publish your plugin for others to use. within your org do
> > whatever you want - just know that if you use [some "reserved" range]
> and a
> > future kafka update breaks it its your problem. RTFM.
> >
> > personally im in favor of ints.
> >
> > having said that (and like nacho) I will settle if int vs string remains
> > the only obstacle to this.
>

[GitHub] kafka-site issue #29: Update the website repo link in code.html to point to ...

2016-11-09 Thread becketqin
Github user becketqin commented on the issue:

https://github.com/apache/kafka-site/pull/29
  
@ijuma Thanks for the review. Merged to the asf-site.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka-site pull request #29: Update the website repo link in code.html to po...

2016-11-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka-site/pull/29


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-09 Thread Becket Qin
I am not sure if we need the second stage. Wouldn't it be enough to say
that a message is a tombstone if one of the following is true?
1. tombstone flag is set.
2. value is null.

If we go to stage 2, the only difference is that we can theoretically
support a null value non-tombstone message in a log compacted topic, but I
am not sure if that has any use case.

Thanks,

Jiangjie (Becket) Qin


On Wed, Nov 9, 2016 at 9:23 AM, Mayuresh Gharat 
wrote:

> I think it will be a good idea. +1
>
> Thanks,
>
> Mayuresh
>
> On Wed, Nov 9, 2016 at 9:13 AM, Michael Pearce 
> wrote:
>
> > +1 Mayuresh, I think this is a good solution/strategy.
> >
> > Shall we update the KIP with this? Becket/Jun/Joel any comments to add
> > before we do?
> >
> > On 08/11/2016, 17:29, "Mayuresh Gharat" 
> > wrote:
> >
> > I think the migration can be done in 2 stages :
> >
> > 1) In first stage the broker should understand the attribute flag as
> > well
> > as Null for the value for log compaction.
> > 2) In second stage we move on to supporting only the attribute flag
> > for log
> > compaction.
> >
> > I agree with Becket that for older clients (consumers) the broker
> might
> > have to down convert a message that has the attribute flag set for
> log
> > compacting but has a non null value. But this should be in first
> stage.
> > Once all the clients have upgraded (clients start recognizing the
> > attribute
> > flag), we can move the broker to stage 2.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Tue, Nov 8, 2016 at 12:06 AM, Michael Pearce <
> michael.pea...@ig.com
> > >
> > wrote:
> >
> > > Also we can add further guidance:
> > >
> > > To  avoid the below caveat to organisations by promoting of
> > upgrading all
> > > consumers first before relying on producing tombstone messages with
> > data
> > >
> > > Sent using OWA for iPhone
> > > 
> > > From: Michael Pearce
> > > Sent: Tuesday, November 8, 2016 8:03:32 AM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
> > >
> > > Thanks Jun on the feedback, I think I understand the issue/point
> now.
> > >
> > > We def can add that on older client version if tombstone marker
> make
> > the
> > > value null to preserve behaviour.
> > >
> > > There is one caveats to this:
> > >
> > > * we have to be clear that data is lost if reading via old
> > client/message
> > > format - I don't think this is a big issue as mostly the idea/use
> > case is
> > > around meta data transport as such would only be as bad as current
> > situation
> > >
> > > Re having configurable broker this was to handle cases like you
> > described
> > > but in another way by allowing organisation choose the behaviour of
> > the
> > > compaction per broker or per topic so they could manage their
> > transition to
> > > using tombstone markers.
> > >
> > > On hind sight it maybe easier to just upgrade and downgrade the
> > messages
> > > on version as you propose.
> > >
> > >
> > >
> > >
> > >
> > >
> > > Sent using OWA for iPhone
> > > 
> > > From: Jun Rao 
> > > Sent: Tuesday, November 8, 2016 12:34:41 AM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
> > >
> > > For the use case, one potential use case is for schema
> registration.
> > For
> > > example, in Avro, a null value corresponds to a Null schema. So, if
> > you
> > > want to be able to keep the schema id in a delete message, the
> value
> > can't
> > > be null. We could get around this issue by specializing null value
> > during
> > > schema registration though.
> > >
> > > Now for the proposed changes. We probably should preserve client
> > > compatibility. If a client application is sending a null value to a
> > > compacted topic, ideally, it should work the same after the client
> > > upgrades.
> > >
> > > I am not sure about making the tombstone marker configurable,
> > especially at
> > > the topic level. Should we allow users to change the config values
> > back and
> > > forth, and what would be the implication?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Mon, Nov 7, 2016 at 10:48 AM, Becket Qin 
> > wrote:
> > >
> > > > Hi Michael,
> > > >
> > > > Yes, changing the logic in the log cleaner makes sense. There
> > could be
> > > some
> > > > other thing worth thinking (e.g. the message size change after
> > > conversion),
> > > > though.
> > > >
> > > > The scenario I was thinking is the following:
> > > > Imagine a distributed caching system built on top of Kafka. A
> user
> > is
> > > > consuming from a topic and it is g

[jira] [Commented] (KAFKA-4311) Multi layer cache eviction causes forwarding to incorrect ProcessorNode

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15651702#comment-15651702
 ] 

ASF GitHub Bot commented on KAFKA-4311:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2051


> Multi layer cache eviction causes forwarding to incorrect ProcessorNode 
> 
>
> Key: KAFKA-4311
> URL: https://issues.apache.org/jira/browse/KAFKA-4311
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.2.0
>
>
> The two exceptions below were reported by Frank on the dev mailing list. 
> After investigation, the root cause is multiple cache evictions happening in 
> the same topology. 
> Given a topology like the one below. If a record arriving in `tableOne` 
> causes a cache eviction, it will trigger the `leftJoin` that will do a `get` 
> from `reducer-store`. If the key is not currently cached in `reducer-store`, 
> but is in the backing store, it will be put into the cache, and it may also 
> trigger an eviction. If it does trigger an eviction and the eldest entry is 
> dirty it will flush the dirty keys. It is at this point that the exception in 
> the comment happens (ClassCastException). This occurs because the 
> ProcessorContext is still set to the context of the `leftJoin` and the next 
> child in the topology is `mapValues`.
> We need to set the correct `ProcessorNode`, on the context,  in the 
> `ForwardingCacheFlushListener` prior to calling `context.forward`. We also 
> need to set remember to reset the `ProcessorNode` to the previous node once 
> `context.forward` has completed.
> {code}
> final KTable one = builder.table(Serdes.String(), 
> Serdes.String(), tableOne, tableOne);
> final KTable two = builder.table(Serdes.Long(), 
> Serdes.String(), tableTwo, tableTwo);
> final KTable reduce = two.groupBy(new 
> KeyValueMapper>() {
> @Override
> public KeyValue apply(final Long key, final String 
> value) {
> return new KeyValue<>(value, key);
> }
> }, Serdes.String(), Serdes.Long())
> .reduce(new Reducer() {
> @Override
> public Long apply(final Long value1, final Long value2) {
> return value1 + value2;
> }
> }, new Reducer() {
> @Override
> public Long apply(final Long value1, final Long value2) {
> return value1 - value2;
> }
> }, "reducer-store");
> one.leftJoin(reduce, new ValueJoiner() {
> @Override
> public String apply(final String value1, final Long value2) {
> return value1 + ":" + value2;
> }
> })
> .mapValues(new ValueMapper() {
> @Override
> public String apply(final String value) {
> return value;
> }
> });
> {code}
> This exception is actually a symptom of the exception reported below in the 
> comment. After the first exception is thrown, the StreamThread triggers a 
> shutdown that then throws this exception.
> [StreamThread-1] ERROR
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [StreamThread-1] Failed to close state manager for StreamTask 0_0:
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed
> to close state store addr-organization
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245)
> Caused by: java.lang.IllegalStateException: Key found in dirty key set, but
> entry is null
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
> at
> org.apache.kafka.streams.state

[jira] [Updated] (KAFKA-4311) Multi layer cache eviction causes forwarding to incorrect ProcessorNode

2016-11-09 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4311:
-
   Resolution: Fixed
Fix Version/s: (was: 0.10.1.1)
   0.10.2.0
   Status: Resolved  (was: Patch Available)

Issue resolved by pull request 2051
[https://github.com/apache/kafka/pull/2051]

> Multi layer cache eviction causes forwarding to incorrect ProcessorNode 
> 
>
> Key: KAFKA-4311
> URL: https://issues.apache.org/jira/browse/KAFKA-4311
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.2.0
>
>
> The two exceptions below were reported by Frank on the dev mailing list. 
> After investigation, the root cause is multiple cache evictions happening in 
> the same topology. 
> Given a topology like the one below. If a record arriving in `tableOne` 
> causes a cache eviction, it will trigger the `leftJoin` that will do a `get` 
> from `reducer-store`. If the key is not currently cached in `reducer-store`, 
> but is in the backing store, it will be put into the cache, and it may also 
> trigger an eviction. If it does trigger an eviction and the eldest entry is 
> dirty it will flush the dirty keys. It is at this point that the exception in 
> the comment happens (ClassCastException). This occurs because the 
> ProcessorContext is still set to the context of the `leftJoin` and the next 
> child in the topology is `mapValues`.
> We need to set the correct `ProcessorNode`, on the context,  in the 
> `ForwardingCacheFlushListener` prior to calling `context.forward`. We also 
> need to set remember to reset the `ProcessorNode` to the previous node once 
> `context.forward` has completed.
> {code}
> final KTable one = builder.table(Serdes.String(), 
> Serdes.String(), tableOne, tableOne);
> final KTable two = builder.table(Serdes.Long(), 
> Serdes.String(), tableTwo, tableTwo);
> final KTable reduce = two.groupBy(new 
> KeyValueMapper>() {
> @Override
> public KeyValue apply(final Long key, final String 
> value) {
> return new KeyValue<>(value, key);
> }
> }, Serdes.String(), Serdes.Long())
> .reduce(new Reducer() {
> @Override
> public Long apply(final Long value1, final Long value2) {
> return value1 + value2;
> }
> }, new Reducer() {
> @Override
> public Long apply(final Long value1, final Long value2) {
> return value1 - value2;
> }
> }, "reducer-store");
> one.leftJoin(reduce, new ValueJoiner() {
> @Override
> public String apply(final String value1, final Long value2) {
> return value1 + ":" + value2;
> }
> })
> .mapValues(new ValueMapper() {
> @Override
> public String apply(final String value) {
> return value;
> }
> });
> {code}
> This exception is actually a symptom of the exception reported below in the 
> comment. After the first exception is thrown, the StreamThread triggers a 
> shutdown that then throws this exception.
> [StreamThread-1] ERROR
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [StreamThread-1] Failed to close state manager for StreamTask 0_0:
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed
> to close state store addr-organization
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245)
> Caused by: java.lang.IllegalStateException: Key found in dirty key set, but
> entry is null
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112)
> at
> org.apache.kafka.streams.state.internals.ThreadC

[GitHub] kafka pull request #2051: KAFKA-4311: Multi layer cache eviction causes forw...

2016-11-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2051


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2112: MINOR: fix typos and incorrect docs

2016-11-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2112


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted message

2016-11-09 Thread Guozhang Wang
Hello Jun,

Thanks for reporting this issue. I looked through the code and I agree the
logic you found with 0.9.0.1 also exists in 0.10.0+. However, I think the
process is designed intentionally like this to skip the message set that is
causing such fatal error, since otherwise the thread will keep fetching
this message set and throwing the same exceptions. It will cause the
follower to have a "hole" in this log, but this is arguably better than
falls the replica fetcher thread in a bad state.

As for the out of range issue following up the skipping of the message set,
I think they should be handled in the "handleOffsetOutOfRange" logic, of
the ReplicaFetcherThread class, to reset the fetching offset to the log end
offset of the leader and then retry. So I do not know why you observed that
the thread actually stopped because of this issue. Could you check the
source code as well as stack trace to see why this happens?


Guozhang


On Wed, Nov 2, 2016 at 4:38 AM, Jun H.  wrote:

> Hi all,
>
> We recently discovered an issue in Kafka 0.9.0.1 (), where
> ReplicaFetcherThread stopped after ReplicaFetcherThread received a
> corrupted message. As the same logic exists also in Kafka 0.10.0.0 and
> 0.10.0.1, they may have the similar issue.
>
> Here are system logs related to this issue.
>
> > 2016-10-30 - 02:33:05,606 ERROR ReplicaFetcherThread-5-1590174474
> > ReplicaFetcherThread.apply - Found invalid messages during fetch for
> > partition [logs,41] offset 39021512238 error Message is corrupt (stored
> crc
> > = 2028421553, computed crc = 3577227678)
> > 2016-10-30 - 02:33:06,582 ERROR ReplicaFetcherThread-5-1590174474
> > ReplicaFetcherThread.error - [ReplicaFetcherThread-5-1590174474], Error
> due
> > to kafka.common.KafkaException: - error processing data for partition
> > [logs,41] offset 39021512301 Caused - by: java.lang.RuntimeException:
> > Offset mismatch: fetched offset = 39021512301, log end offset =
> 39021512238.
>
>
> First, ReplicaFetcherThread got a corrupted message (offset 39021512238)
> due to some blip.
>
> Line
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/
> main/scala/kafka/server/AbstractFetcherThread.scala#L138
> threw exception
>
> Then, Line
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/
> main/scala/kafka/server/AbstractFetcherThread.scala#L145
> caught it and logged this error.
>
> Because
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/
> main/scala/kafka/server/AbstractFetcherThread.scala#L134
> updated the topic partition offset to the fetched latest one in
> partitionMap. So ReplicaFetcherThread skipped the batch with corrupted
> messages.
>
> Based on
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/
> main/scala/kafka/server/AbstractFetcherThread.scala#L84,
> the ReplicaFetcherThread then directly fetched the next batch of messages
> (with offset 39021512301)
>
> Next, ReplicaFetcherThread stopped because the log end offset (still
> 39021512238) didn't match the fetched message (offset 39021512301).
>
> A quick fix is to move line 134 to be after line 138.
>
> Would be great to have your comments and please let me know if a Jira issue
> is needed. Thanks.
>
> Best,
>
> Jun
>



-- 
-- Guozhang


[jira] [Resolved] (KAFKA-4360) Controller may deadLock when autoLeaderRebalance encounter zk expired

2016-11-09 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-4360.
--
   Resolution: Fixed
Fix Version/s: 0.10.2.0

Issue resolved by pull request 2094
[https://github.com/apache/kafka/pull/2094]

> Controller may deadLock when autoLeaderRebalance encounter zk expired
> -
>
> Key: KAFKA-4360
> URL: https://issues.apache.org/jira/browse/KAFKA-4360
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Json Tu
>  Labels: bugfix
> Fix For: 0.10.2.0
>
> Attachments: deadlock_patch, yf-mafka2-common02_jstack.txt
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> when controller has checkAndTriggerPartitionRebalance task in 
> autoRebalanceScheduler,and then zk expired at that time. It will
> run into deadlock.
> we can restore the scene as below,when zk session expired,zk thread will call 
> handleNewSession which defined in SessionExpirationListener, and it will get 
> controllerContext.controllerLock,and then it will 
> autoRebalanceScheduler.shutdown(),which need complete all the task in the 
> autoRebalanceScheduler,but that threadPoll also need get 
> controllerContext.controllerLock,but it has already owned by zk callback 
> thread,which will then run into deadlock.
> because of that,it will cause two problems at least, first is the broker’s id 
> is cannot register to the zookeeper,and it will be considered as dead by new 
> controller,second this procedure can not be stop by kafka-server-stop.sh, 
> because shutdown function
> can not get controllerContext.controllerLock also, we cannot shutdown kafka 
> except using kill -9.
> In my attachment, I upload a jstack file, which was created when my kafka 
> procedure cannot shutdown by kafka-server-stop.sh.
> I have met this scenes for several times,I think this may be a bug that not 
> solved in kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2094: KAFKA-4360:Controller may deadLock when autoLead...

2016-11-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2094


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: replica fetch error and shuabing

2016-11-09 Thread Guozhang Wang
Which version of Kafka are you using?

On Mon, Nov 7, 2016 at 10:35 AM, Json Tu  wrote:

> Hi, when I move __consumer_offsets from old broker to new broker, we
> encounter error as follow and it always shuabing.
> server.log.2016-11-07-19:[2016-11-07 19:17:15,392] ERROR Found invalid
> messages during fetch for partition [__consumer_offsets,10] offset 13973569
> error Message found with corrupt size (0) in shallow iterator (kafka.server.
> ReplicaFetcherThread)
> server.log.2016-11-07-19:[2016-11-07 19:17:15,476] ERROR Found invalid
> messages during fetch for partition [__consumer_offsets,10] offset 13973569
> error Message found with corrupt size (0) in shallow iterator (kafka.server.
> ReplicaFetcherThread)
> server.log.2016-11-07-19:[2016-11-07 19:17:15,573] ERROR Found invalid
> messages during fetch for partition [__consumer_offsets,10] offset 13973569
> error Message found with corrupt size (0) in shallow iterator (kafka.server.
> ReplicaFetcherThread)
> server.log.2016-11-07-19:[2016-11-07 19:17:15,640] ERROR Found invalid
> messages during fetch for partition [__consumer_offsets,10] offset 13973569
> error Message found with corrupt size (0) in shallow iterator (kafka.server.
> ReplicaFetcherThread)
> server.log.2016-11-07-19:[2016-11-07 19:17:15,697] ERROR Found invalid
> messages during fetch for partition [__consumer_offsets,10] offset 13973569
> error Message found with corrupt size (0) in shallow iterator (kafka.server.
> ReplicaFetcherThread)
> server.log.2016-11-07-19:[2016-11-07 19:17:15,770] ERROR Found invalid
> messages during fetch for partition [__consumer_offsets,10] offset 13973569
> error Message found with corrupt size (0) in shallow iterator (kafka.server.
> ReplicaFetcherThread)
>
> anyone can help solve it,thanks.
>



-- 
-- Guozhang


[jira] [Commented] (KAFKA-4395) KafkaConfig and LogConfig should not have static initialization order dependencies

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15651674#comment-15651674
 ] 

ASF GitHub Bot commented on KAFKA-4395:
---

GitHub user sutambe opened a pull request:

https://github.com/apache/kafka/pull/2120

KAFKA-4395: Break static initialization order dependency between 
KafkaConfig and Logconfig

Fixes static initialization order dependency between KafkaConfig and 
LogConfig.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sutambe/kafka logconfig-static-init

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2120.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2120


commit 0a9690981aeaf6a2c50d32a504815d1b041e3089
Author: Sumant Tambe 
Date:   2016-11-09T18:19:40Z

KAFKA-4395 fixes static initialization order depedency between KafkaConfig 
and LogConfig

commit 7f7a9ec820e3fe65bcb9f2bd544fb2322a897e11
Author: Sumant Tambe 
Date:   2016-11-09T18:27:00Z

removed commented line




> KafkaConfig and LogConfig should not have static initialization order 
> dependencies
> --
>
> Key: KAFKA-4395
> URL: https://issues.apache.org/jira/browse/KAFKA-4395
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Sumant Tambe
>Assignee: Sumant Tambe
> Fix For: 0.10.1.1
>
>
> LogConfig.configDef.serverDefaultConfigNames is not initialized properly in 
> due to static initialization order dependencies between KafkaConfig and 
> LogConfig. The map ends inserting null values, which are all string literals. 
> Consider the following.
> 1. KafkaConfig begins initialization at first because KafkaServer constructor 
> needs KafkaConfig. 
> 2. at KafkaConfig.LogMessageFormatVersionProp it needs LogConfig. 
> 3. LogConfig begins initialization 
> 4. LogConfig.configDef begins initialization 
> 5. .define(UncleanLeaderElectionEnableProp) needs 
> KafkaConfig.UncleanLeaderElectionEnableProp, which is defined below  
> KafkaConfig.LogMessageFormatVersionProp so it's null 
> 6. Can't start another initialization of KafkaConfig 
> 7. So .define inserts null. This is applicable to all three 
> MinInSyncReplicasProp, UncleanLeaderElectionEnableProp, and 
> CompressionTypeProp



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2120: KAFKA-4395: Break static initialization order depe...

2016-11-09 Thread sutambe
GitHub user sutambe opened a pull request:

https://github.com/apache/kafka/pull/2120

KAFKA-4395: Break static initialization order dependency between 
KafkaConfig and Logconfig

Fixes static initialization order dependency between KafkaConfig and 
LogConfig.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sutambe/kafka logconfig-static-init

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2120.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2120


commit 0a9690981aeaf6a2c50d32a504815d1b041e3089
Author: Sumant Tambe 
Date:   2016-11-09T18:19:40Z

KAFKA-4395 fixes static initialization order depedency between KafkaConfig 
and LogConfig

commit 7f7a9ec820e3fe65bcb9f2bd544fb2322a897e11
Author: Sumant Tambe 
Date:   2016-11-09T18:27:00Z

removed commented line




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka-site issue #29: Update the website repo link in code.html to point to ...

2016-11-09 Thread ijuma
Github user ijuma commented on the issue:

https://github.com/apache/kafka-site/pull/29
  
LGTM, thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-4395) KafkaConfig and LogConfig should not have static initialization order dependencies

2016-11-09 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4395:
---
Fix Version/s: 0.10.1.1

> KafkaConfig and LogConfig should not have static initialization order 
> dependencies
> --
>
> Key: KAFKA-4395
> URL: https://issues.apache.org/jira/browse/KAFKA-4395
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Sumant Tambe
>Assignee: Sumant Tambe
> Fix For: 0.10.1.1
>
>
> LogConfig.configDef.serverDefaultConfigNames is not initialized properly in 
> due to static initialization order dependencies between KafkaConfig and 
> LogConfig. The map ends inserting null values, which are all string literals. 
> Consider the following.
> 1. KafkaConfig begins initialization at first because KafkaServer constructor 
> needs KafkaConfig. 
> 2. at KafkaConfig.LogMessageFormatVersionProp it needs LogConfig. 
> 3. LogConfig begins initialization 
> 4. LogConfig.configDef begins initialization 
> 5. .define(UncleanLeaderElectionEnableProp) needs 
> KafkaConfig.UncleanLeaderElectionEnableProp, which is defined below  
> KafkaConfig.LogMessageFormatVersionProp so it's null 
> 6. Can't start another initialization of KafkaConfig 
> 7. So .define inserts null. This is applicable to all three 
> MinInSyncReplicasProp, UncleanLeaderElectionEnableProp, and 
> CompressionTypeProp



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4395) KafkaConfig and LogConfig should not have static initialization order dependencies

2016-11-09 Thread Sumant Tambe (JIRA)
Sumant Tambe created KAFKA-4395:
---

 Summary: KafkaConfig and LogConfig should not have static 
initialization order dependencies
 Key: KAFKA-4395
 URL: https://issues.apache.org/jira/browse/KAFKA-4395
 Project: Kafka
  Issue Type: Bug
Reporter: Sumant Tambe
Assignee: Sumant Tambe


LogConfig.configDef.serverDefaultConfigNames is not initialized properly in due 
to static initialization order dependencies between KafkaConfig and LogConfig. 
The map ends inserting null values, which are all string literals. Consider the 
following.

1. KafkaConfig begins initialization at first because KafkaServer constructor 
needs KafkaConfig. 
2. at KafkaConfig.LogMessageFormatVersionProp it needs LogConfig. 
3. LogConfig begins initialization 
4. LogConfig.configDef begins initialization 
5. .define(UncleanLeaderElectionEnableProp) needs 
KafkaConfig.UncleanLeaderElectionEnableProp, which is defined below  
KafkaConfig.LogMessageFormatVersionProp so it's null 
6. Can't start another initialization of KafkaConfig 
7. So .define inserts null. This is applicable to all three 
MinInSyncReplicasProp, UncleanLeaderElectionEnableProp, and CompressionTypeProp



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4395) KafkaConfig and LogConfig should not have static initialization order dependencies

2016-11-09 Thread Sumant Tambe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sumant Tambe updated KAFKA-4395:

Affects Version/s: 0.10.0.1

> KafkaConfig and LogConfig should not have static initialization order 
> dependencies
> --
>
> Key: KAFKA-4395
> URL: https://issues.apache.org/jira/browse/KAFKA-4395
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Sumant Tambe
>Assignee: Sumant Tambe
>
> LogConfig.configDef.serverDefaultConfigNames is not initialized properly in 
> due to static initialization order dependencies between KafkaConfig and 
> LogConfig. The map ends inserting null values, which are all string literals. 
> Consider the following.
> 1. KafkaConfig begins initialization at first because KafkaServer constructor 
> needs KafkaConfig. 
> 2. at KafkaConfig.LogMessageFormatVersionProp it needs LogConfig. 
> 3. LogConfig begins initialization 
> 4. LogConfig.configDef begins initialization 
> 5. .define(UncleanLeaderElectionEnableProp) needs 
> KafkaConfig.UncleanLeaderElectionEnableProp, which is defined below  
> KafkaConfig.LogMessageFormatVersionProp so it's null 
> 6. Can't start another initialization of KafkaConfig 
> 7. So .define inserts null. This is applicable to all three 
> MinInSyncReplicasProp, UncleanLeaderElectionEnableProp, and 
> CompressionTypeProp



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP 88: DescribeGroups Protocol Update

2016-11-09 Thread Jason Gustafson
Hey Vahid,

What I meant was that the OffsetFetch request includes an array of topic
partitions, just like the TopicMetadata request. So similar to that API, we
could use a null array in order to fetch all offsets for a given groupId.
Then there's the matter of how to expose the capability. Adding the API to
KafkaConsumer as you've suggested seems reasonable, but I could also see it
exposed on the AdminClient, which would eliminate the need to create the
"dummy" consumer. What do you think?

Thanks,
Jason

On Mon, Nov 7, 2016 at 3:28 PM, Vahid S Hashemian  wrote:

> Hi Jason,
>
> Thanks for your feedback.
>
> Yes, the intent of the KIP is to make existing offsets of the group
> available even when there is no active consumers in the group consuming
> from one or more topic partitions.
> Your suggestion should also work. I'm not yet sure how to obtain group's
> all topic partitions and need to look more closely at your 'null=all'
> suggestion, as I couldn't immediately see an obvious way to extract that
> info in the OffsetFetch handler. I'll dig deeper.
> Just a quick note that using the OffsetFetch API would not address the
> second (but minor) problem described in the current KIP (the dummy group
> member). I assume that is not a big concern.
>
> Thanks.
> --Vahid
>
>
>
>
> From:   Jason Gustafson 
> To: dev@kafka.apache.org
> Date:   11/07/2016 09:19 AM
> Subject:Re: [DISCUSS] KIP 88: DescribeGroups Protocol Update
>
>
>
> Hey Vahid,
>
> Thanks for the KIP. If I understand correctly, the problem is how to fetch
> existing offsets for a group which has no active members, right? I'm not
> totally clear why we need to modify the DescribeGroups API in order to
> achieve this since we already have the OffsetFetch API. I think the
> limitation currently is that you need to know the partitions to fetch
> offsets for, but perhaps we could modify it to support the "null=all"
> semantics that we used for the TopicMetadata API?
>
> Thanks,
> Jason
>
> On Thu, Nov 3, 2016 at 11:09 AM, Vahid S Hashemian <
> vahidhashem...@us.ibm.com> wrote:
>
> > Hi all,
> >
> > I started a new KIP under
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 88%3A+DescribeGroups+Protocol+Update
> > .
> >
> > The KIP is a proposal to update the DescribeGroups protocol to address
> > KAFKA-3853 (https://issues.apache.org/jira/browse/KAFKA-3853).
> >
> > I appreciate your feedback.
> >
> > Thanks.
> > --Vahid
> >
> >
>
>
>
>
>


Re: [VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2016-11-09 Thread Jun Rao
Hi, Radai,

Thanks for the KIP. Some comments below.

1. The KIP says "to facilitate faster implementation (as a safety net) the
pool will be implemented in such a way that memory that was not release()ed
(but still garbage collected) would be detected and "reclaimed". this is to
prevent "leaks" in case of code paths that fail to release() properly.".
What are the cases that could cause memory leaks? If we are concerned about
bugs, it seems that it's better to just do more testing to make sure the
usage of the simple implementation (SimpleMemoryPool) is solid instead of
adding more complicated logic (GarbageCollectedMemoryPool) to hide the
potential bugs.

2. I am wondering how much this KIP covers the SSL channel implementation.
2.1 SslTransportLayer maintains netReadBuffer, netWriteBuffer,
appReadBuffer per socket. Should those memory be accounted for in memory
pool?
2.2 One tricky thing with SSL is that during a KafkaChannel.read(), it's
possible for multiple NetworkReceives to be returned since multiple
requests' data could be encrypted together by SSL. To deal with this, we
stash those NetworkReceives in Selector.stagedReceives and give it back to
the poll() call one NetworkReceive at a time. What this means is that, if
we stop reading from KafkaChannel in the middle because memory pool is
full, this channel's key may never get selected for reads (even after the
read interest is turned on), but there are still pending data for the
channel, which will never get processed.

3. The code has the following two methods in MemoryPool, which are not
described in the KIP. Could you explain how they are used in the wiki?
isLowOnMemory()
isOutOfMemory()

4. Could you also describe in the KIP at the high level, how the read
interest bit for the socket is turned on/off with respect to MemoryPool?

5. Should queued.max.bytes defaults to -1 or Long.MAX_VALUE?

Thanks,

Jun

On Mon, Nov 7, 2016 at 1:08 PM, radai  wrote:

> Hi,
>
> I would like to initiate a vote on KIP-72:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-72%3A+
> Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests
>
> The kip allows specifying a limit on the amount of memory allocated for
> reading incoming requests into. This is useful for "sizing" a broker and
> avoiding OOMEs under heavy load (as actually happens occasionally at
> linkedin).
>
> I believe I've addressed most (all?) concerns brought up during the
> discussion.
>
> To the best of my understanding this vote is about the goal and
> public-facing changes related to the new proposed behavior, but as for
> implementation, i have the code up here:
>
> https://github.com/radai-rosenblatt/kafka/tree/broker-memory
> -pool-with-muting
>
> and I've stress-tested it to work properly (meaning it chugs along and
> throttles under loads that would DOS 10.0.1.0 code).
>
> I also believe that the primitives and "pattern"s introduced in this KIP
> (namely the notion of a buffer pool and retrieving from / releasing to said
> pool instead of allocating memory) are generally useful beyond the scope of
> this KIP for both performance issues (allocating lots of short-lived large
> buffers is a performance bottleneck) and other areas where memory limits
> are a problem (KIP-81)
>
> Thank you,
>
> Radai.
>


Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-09 Thread Mayuresh Gharat
I think it will be a good idea. +1

Thanks,

Mayuresh

On Wed, Nov 9, 2016 at 9:13 AM, Michael Pearce 
wrote:

> +1 Mayuresh, I think this is a good solution/strategy.
>
> Shall we update the KIP with this? Becket/Jun/Joel any comments to add
> before we do?
>
> On 08/11/2016, 17:29, "Mayuresh Gharat" 
> wrote:
>
> I think the migration can be done in 2 stages :
>
> 1) In first stage the broker should understand the attribute flag as
> well
> as Null for the value for log compaction.
> 2) In second stage we move on to supporting only the attribute flag
> for log
> compaction.
>
> I agree with Becket that for older clients (consumers) the broker might
> have to down convert a message that has the attribute flag set for log
> compacting but has a non null value. But this should be in first stage.
> Once all the clients have upgraded (clients start recognizing the
> attribute
> flag), we can move the broker to stage 2.
>
> Thanks,
>
> Mayuresh
>
> On Tue, Nov 8, 2016 at 12:06 AM, Michael Pearce  >
> wrote:
>
> > Also we can add further guidance:
> >
> > To  avoid the below caveat to organisations by promoting of
> upgrading all
> > consumers first before relying on producing tombstone messages with
> data
> >
> > Sent using OWA for iPhone
> > 
> > From: Michael Pearce
> > Sent: Tuesday, November 8, 2016 8:03:32 AM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
> >
> > Thanks Jun on the feedback, I think I understand the issue/point now.
> >
> > We def can add that on older client version if tombstone marker make
> the
> > value null to preserve behaviour.
> >
> > There is one caveats to this:
> >
> > * we have to be clear that data is lost if reading via old
> client/message
> > format - I don't think this is a big issue as mostly the idea/use
> case is
> > around meta data transport as such would only be as bad as current
> situation
> >
> > Re having configurable broker this was to handle cases like you
> described
> > but in another way by allowing organisation choose the behaviour of
> the
> > compaction per broker or per topic so they could manage their
> transition to
> > using tombstone markers.
> >
> > On hind sight it maybe easier to just upgrade and downgrade the
> messages
> > on version as you propose.
> >
> >
> >
> >
> >
> >
> > Sent using OWA for iPhone
> > 
> > From: Jun Rao 
> > Sent: Tuesday, November 8, 2016 12:34:41 AM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
> >
> > For the use case, one potential use case is for schema registration.
> For
> > example, in Avro, a null value corresponds to a Null schema. So, if
> you
> > want to be able to keep the schema id in a delete message, the value
> can't
> > be null. We could get around this issue by specializing null value
> during
> > schema registration though.
> >
> > Now for the proposed changes. We probably should preserve client
> > compatibility. If a client application is sending a null value to a
> > compacted topic, ideally, it should work the same after the client
> > upgrades.
> >
> > I am not sure about making the tombstone marker configurable,
> especially at
> > the topic level. Should we allow users to change the config values
> back and
> > forth, and what would be the implication?
> >
> > Thanks,
> >
> > Jun
> >
> > On Mon, Nov 7, 2016 at 10:48 AM, Becket Qin 
> wrote:
> >
> > > Hi Michael,
> > >
> > > Yes, changing the logic in the log cleaner makes sense. There
> could be
> > some
> > > other thing worth thinking (e.g. the message size change after
> > conversion),
> > > though.
> > >
> > > The scenario I was thinking is the following:
> > > Imagine a distributed caching system built on top of Kafka. A user
> is
> > > consuming from a topic and it is guaranteed that if the user
> consume to
> > the
> > > log end it will get the latest value for all the keys. Currently
> if the
> > > consumer sees a null value it knows the key has been removed. Now
> let's
> > say
> > > we rolled out this change. And the producer applies a message with
> the
> > > tombstone flag set, but the value was not null. When we append that
> > message
> > > to the log I suppose we will not do the down conversion if the
> broker has
> > > set the message.format.version to the latest. Because the log
> cleaner
> > won't
> > > touch the active log segment, so that message will be sitting in
> the
> > active
> > > segment as is. Now when a consumer that hasn't upgraded yet
> cons

Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-09 Thread Michael Pearce
+1 Mayuresh, I think this is a good solution/strategy.

Shall we update the KIP with this? Becket/Jun/Joel any comments to add before 
we do?

On 08/11/2016, 17:29, "Mayuresh Gharat"  wrote:

I think the migration can be done in 2 stages :

1) In first stage the broker should understand the attribute flag as well
as Null for the value for log compaction.
2) In second stage we move on to supporting only the attribute flag for log
compaction.

I agree with Becket that for older clients (consumers) the broker might
have to down convert a message that has the attribute flag set for log
compacting but has a non null value. But this should be in first stage.
Once all the clients have upgraded (clients start recognizing the attribute
flag), we can move the broker to stage 2.

Thanks,

Mayuresh

On Tue, Nov 8, 2016 at 12:06 AM, Michael Pearce 
wrote:

> Also we can add further guidance:
>
> To  avoid the below caveat to organisations by promoting of upgrading all
> consumers first before relying on producing tombstone messages with data
>
> Sent using OWA for iPhone
> 
> From: Michael Pearce
> Sent: Tuesday, November 8, 2016 8:03:32 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
>
> Thanks Jun on the feedback, I think I understand the issue/point now.
>
> We def can add that on older client version if tombstone marker make the
> value null to preserve behaviour.
>
> There is one caveats to this:
>
> * we have to be clear that data is lost if reading via old client/message
> format - I don't think this is a big issue as mostly the idea/use case is
> around meta data transport as such would only be as bad as current 
situation
>
> Re having configurable broker this was to handle cases like you described
> but in another way by allowing organisation choose the behaviour of the
> compaction per broker or per topic so they could manage their transition 
to
> using tombstone markers.
>
> On hind sight it maybe easier to just upgrade and downgrade the messages
> on version as you propose.
>
>
>
>
>
>
> Sent using OWA for iPhone
> 
> From: Jun Rao 
> Sent: Tuesday, November 8, 2016 12:34:41 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
>
> For the use case, one potential use case is for schema registration. For
> example, in Avro, a null value corresponds to a Null schema. So, if you
> want to be able to keep the schema id in a delete message, the value can't
> be null. We could get around this issue by specializing null value during
> schema registration though.
>
> Now for the proposed changes. We probably should preserve client
> compatibility. If a client application is sending a null value to a
> compacted topic, ideally, it should work the same after the client
> upgrades.
>
> I am not sure about making the tombstone marker configurable, especially 
at
> the topic level. Should we allow users to change the config values back 
and
> forth, and what would be the implication?
>
> Thanks,
>
> Jun
>
> On Mon, Nov 7, 2016 at 10:48 AM, Becket Qin  wrote:
>
> > Hi Michael,
> >
> > Yes, changing the logic in the log cleaner makes sense. There could be
> some
> > other thing worth thinking (e.g. the message size change after
> conversion),
> > though.
> >
> > The scenario I was thinking is the following:
> > Imagine a distributed caching system built on top of Kafka. A user is
> > consuming from a topic and it is guaranteed that if the user consume to
> the
> > log end it will get the latest value for all the keys. Currently if the
> > consumer sees a null value it knows the key has been removed. Now let's
> say
> > we rolled out this change. And the producer applies a message with the
> > tombstone flag set, but the value was not null. When we append that
> message
> > to the log I suppose we will not do the down conversion if the broker 
has
> > set the message.format.version to the latest. Because the log cleaner
> won't
> > touch the active log segment, so that message will be sitting in the
> active
> > segment as is. Now when a consumer that hasn't upgraded yet consumes 
that
> > tombstone message in the active segment, it seems that the broker will
> need
> > to down convert that message to remove the value, right? In this case, 
we
> > cannot wait for the log cleaner to do the down conversion because that
> > message may have already been consumed before the log compaction 
happens.
> >
> > Thanks,
> >
> > Jian

[jira] [Commented] (KAFKA-4322) StateRestoreCallback begin and end indication

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15651371#comment-15651371
 ] 

ASF GitHub Bot commented on KAFKA-4322:
---

GitHub user markcshelton reopened a pull request:

https://github.com/apache/kafka/pull/2105

KAFKA-4322 StateRestoreCallback begin and end indication

This adds a begin and end callback to StateRestoreCallback.

The contribution is my original work and I license the work to Apache Kafka 
under the Kafka's open source license.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/markcshelton/kafka KAFKA-4322

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2105.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2105


commit 9f59ea6ff2aac94bdd84eaafda36c860b12e1dcd
Author: Mark Shelton 
Date:   2016-11-04T20:57:37Z

changes KAFKA-4322 StateRestoreCallback begin and end indication

commit 3a512fb9bf3e2b1f07d307187072c5204c2dc075
Author: Mark Shelton 
Date:   2016-11-09T16:33:02Z

KAFKA-4322 StateRestoreCallback begin and end indication




> StateRestoreCallback begin and end indication
> -
>
> Key: KAFKA-4322
> URL: https://issues.apache.org/jira/browse/KAFKA-4322
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Mark Shelton
>Assignee: Guozhang Wang
>Priority: Minor
>
> In Kafka Streams, the StateRestoreCallback interface provides only a single 
> method "restore(byte[] key, byte[] value)" that is called for every key-value 
> pair to be restored. 
> It would be nice to have "beginRestore" and "endRestore" methods as part of 
> StateRestoreCallback.
> Kafka Streams would call "beginRestore" before restoring any keys, and would 
> call "endRestore" when it determines that it is done. This allows an 
> implementation, for example, to report on the number of keys restored and 
> perform a commit after the last key was restored. Other uses are conceivable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4322) StateRestoreCallback begin and end indication

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15651369#comment-15651369
 ] 

ASF GitHub Bot commented on KAFKA-4322:
---

Github user markcshelton closed the pull request at:

https://github.com/apache/kafka/pull/2105


> StateRestoreCallback begin and end indication
> -
>
> Key: KAFKA-4322
> URL: https://issues.apache.org/jira/browse/KAFKA-4322
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Mark Shelton
>Assignee: Guozhang Wang
>Priority: Minor
>
> In Kafka Streams, the StateRestoreCallback interface provides only a single 
> method "restore(byte[] key, byte[] value)" that is called for every key-value 
> pair to be restored. 
> It would be nice to have "beginRestore" and "endRestore" methods as part of 
> StateRestoreCallback.
> Kafka Streams would call "beginRestore" before restoring any keys, and would 
> call "endRestore" when it determines that it is done. This allows an 
> implementation, for example, to report on the number of keys restored and 
> perform a commit after the last key was restored. Other uses are conceivable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2105: KAFKA-4322 StateRestoreCallback begin and end indi...

2016-11-09 Thread markcshelton
Github user markcshelton closed the pull request at:

https://github.com/apache/kafka/pull/2105


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2105: KAFKA-4322 StateRestoreCallback begin and end indi...

2016-11-09 Thread markcshelton
GitHub user markcshelton reopened a pull request:

https://github.com/apache/kafka/pull/2105

KAFKA-4322 StateRestoreCallback begin and end indication

This adds a begin and end callback to StateRestoreCallback.

The contribution is my original work and I license the work to Apache Kafka 
under the Kafka's open source license.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/markcshelton/kafka KAFKA-4322

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2105.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2105


commit 9f59ea6ff2aac94bdd84eaafda36c860b12e1dcd
Author: Mark Shelton 
Date:   2016-11-04T20:57:37Z

changes KAFKA-4322 StateRestoreCallback begin and end indication

commit 3a512fb9bf3e2b1f07d307187072c5204c2dc075
Author: Mark Shelton 
Date:   2016-11-09T16:33:02Z

KAFKA-4322 StateRestoreCallback begin and end indication




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4394) Processor API does not receive timestamp and/or ConsumerRecord

2016-11-09 Thread Damian Guy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15651233#comment-15651233
 ] 

Damian Guy commented on KAFKA-4394:
---

The record timestamp is available via {{ProcessorContext.timestamp()}}. The 
{{ProcessorContext}} is supplied during {{init}} and is saved as the field 
{{context}} in {{AbstractProcessor}}

> Processor API does not receive timestamp and/or ConsumerRecord 
> ---
>
> Key: KAFKA-4394
> URL: https://issues.apache.org/jira/browse/KAFKA-4394
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Mark Shelton
>Assignee: Guozhang Wang
>Priority: Critical
>
> I'm trying to implement a custom low-level processor by implementing 
> org.apache.kafka.streams.processor.Processor or AbstractProcessor.
> The "process(K key, V value)" process method  only receives the key and value 
> and does not provide access to timestamp or other information from 
> ConsumerRecord. It is critical for me to get access to the message timestamp.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Use Android App as a “Producing client” for Kafka?

2016-11-09 Thread radai
also, for large-enough kafka clusters (#topics, #partitions) just
downloading the metadata (required by the client to know where to publish
stuff to) by be a big hit on your bandwidth consumption.
I would go with something like a rest-proxy (there are a few open source
ones) or roll your own server-side.

On Mon, Nov 7, 2016 at 9:41 PM, Ewen Cheslack-Postava 
wrote:

> Artur,
>
> It is possible to do this, but the second approach you mentioned is much
> more common. Normally people don't want to expose their Kafka cluster
> publicly, so having an intermediary can be a good way to, e.g., add a layer
> where you can easily filter out bad traffic. You may be able to use some of
> Kafka's newer security features to get enough security for your use case,
> however. Additionally, you'll likely need to tweak some of the default
> settings as they are good defaults for use within a data center, but not
> over the link your Android app will probably have.
>
> -Ewen
>
> On Fri, Nov 4, 2016 at 11:40 PM,  wrote:
>
> > Hi folks,
> >
> > is it possible / does it make sense to use an Android app as a "Producing
> > client" for Apache Kafka?
> >
> > Let's say my Android App need to capture and analyse reaction time data.
> > Goal is to collect all data and show the average reaction time in
> real-time
> > in the App.
> >
> > The alternative is having an app server of some kind as an intermediary
> > that accepts messages from the android app and posts them to Kafka,
> rather
> > than having the app be a Kafka Producer on its own.
> >
> >
> >
> > See: http://stackoverflow.com/questions/40043532/how-to-use-
> > android-app-as-a-producing-client-for-kafka
> >
> >
> >
> > Best Regards,
> >
> > Artur
> >
>
>
>
> --
> Thanks,
> Ewen
>


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-09 Thread radai
@magnus - and very dangerous (youre essentially downloading and executing
arbitrary code off the internet on your servers ... bad idea without a
sandbox, even with)

as for it being a purely administrative task - i disagree.

i wish it would, really, because then my earlier point on the complexity of
the remapping process would be invalid, but at linkedin, for example, we
(the team im in) run kafka as a service. we dont really know what our users
(developing applications that use kafka) are up to at any given moment. it
is very possible (given the existance of headers and a corresponding plugin
ecosystem) for some application to "equip" their producers and consumers
with the required plugin without us knowing. i dont mean to imply thats
bad, i just want to make the point that its not as simple keeping it in
sync across a large-enough organization.


On Wed, Nov 9, 2016 at 6:17 AM, Magnus Edenhill  wrote:

> I think there is a piece missing in the Strings discussion, where
> pro-Stringers
> reason that by providing unique string identifiers for each header
> everything will just
> magically work for all parts of the stream pipeline.
>
> But the strings dont mean anything by themselves, and while we could
> probably envision
> some auto plugin loader that downloads, compiles, links and runs plugins
> on-demand
> as soon as they're seen by a consumer, I dont really see a use-case for
> something
> so dynamic (and fragile) in practice.
>
> In the real world an application will be configured with a set of plugins
> to either add (producer)
> or read (consumer) headers.
> This is an administrative task based on what features a client
> needs/provides and results in
> some sort of configuration to enable and configure the desired plugins.
>
> Since this needs to be kept somewhat in sync across an organisation (there
> is no point in having producers
> add headers no consumers will read, and vice versa), the added complexity
> of assigning an id namespace
> for each plugin as it is being configured should be tolerable.
>
>
> /Magnus
>
> 2016-11-09 13:06 GMT+01:00 Michael Pearce :
>
> > Just following/catching up on what seems to be an active night :)
> >
> > @Radai sorry if it may seem obvious but what does MD stand for?
> >
> > My take on String vs Int:
> >
> > I will state first I am pro Int (16 or 32).
> >
> > I do though playing devils advocate see a big plus with the argument of
> > String keys, this is around integrating into an existing eco-system.
> >
> > As many other systems use String based headers (Flume, JMS)  it makes it
> > much easier for these to be incorporated/integrated into.
> >
> > How with Int based headers could we provide a way/guidence to make this
> > integration simple / easy with transition flows over to kafka?
> >
> > * tough luck buddy you're on your own
> > * simply hash the string into int code and hope for no collisions (how to
> > convert back though?)
> > * http2 style as mentioned by nacho.
> >
> > cheers,
> > Mike
> >
> >
> > 
> > From: radai 
> > Sent: Wednesday, November 9, 2016 8:12 AM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers
> >
> > thinking about it some more, the best way to transmit the header
> remapping
> > data to consumers would be to put it in the MD response payload, so maybe
> > it should be discussed now.
> >
> >
> > On Wed, Nov 9, 2016 at 12:09 AM, radai 
> wrote:
> >
> > > im not opposed to the idea of namespace mapping. all im saying is that
> > its
> > > not part of the "mvp" and, since it requires no wire format change, can
> > > always be added later.
> > > also, its not as simple as just configuring MM to do the transform:
> lets
> > > say i've implemented large message support as {666,1} and on some
> mirror
> > > target cluster its been remapped to {999,1}. the consumer plugin code
> > would
> > > also need to be told to look for the large message "part X of Y" header
> > > under {999,1}. doable, but tricky.
> > >
> > > On Tue, Nov 8, 2016 at 10:29 PM, Gwen Shapira 
> wrote:
> > >
> > >> While you can do whatever you want with a namespace and your code,
> > >> what I'd expect is for each app to namespaces configurable...
> > >>
> > >> So if I accidentally used 666 for my HR department, and still want to
> > >> run RadaiApp, I can config "namespace=42" for RadaiApp and everything
> > >> will look normal.
> > >>
> > >> This means you only need to sync usage inside your own organization.
> > >> Still hard, but somewhat easier than syncing with the entire world.
> > >>
> > >> On Tue, Nov 8, 2016 at 10:07 PM, radai 
> > >> wrote:
> > >> > and we can start with {namespace, id} and no re-mapping support and
> > >> always
> > >> > add it later on if/when collisions actually happen (i dont think
> > they'd
> > >> be
> > >> > a problem).
> > >> >
> > >> > every interested party (so orgs or individuals) could then register
> a
> > >> > prefix (0 = reserved, 1 = confluent ... 666 = m

[jira] [Created] (KAFKA-4394) Processor API does not receive timestamp and/or ConsumerRecord

2016-11-09 Thread Mark Shelton (JIRA)
Mark Shelton created KAFKA-4394:
---

 Summary: Processor API does not receive timestamp and/or 
ConsumerRecord 
 Key: KAFKA-4394
 URL: https://issues.apache.org/jira/browse/KAFKA-4394
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.10.0.1
Reporter: Mark Shelton
Assignee: Guozhang Wang
Priority: Critical


I'm trying to implement a custom low-level processor by implementing 
org.apache.kafka.streams.processor.Processor or AbstractProcessor.
The "process(K key, V value)" process method  only receives the key and value 
and does not provide access to timestamp or other information from 
ConsumerRecord. It is critical for me to get access to the message timestamp.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4322) StateRestoreCallback begin and end indication

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15651070#comment-15651070
 ] 

ASF GitHub Bot commented on KAFKA-4322:
---

GitHub user markcshelton reopened a pull request:

https://github.com/apache/kafka/pull/2105

KAFKA-4322 StateRestoreCallback begin and end indication

This adds a begin and end callback to StateRestoreCallback.

The contribution is my original work and I license the work to Apache Kafka 
under the Kafka's open source license.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/markcshelton/kafka KAFKA-4322

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2105.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2105


commit 9f59ea6ff2aac94bdd84eaafda36c860b12e1dcd
Author: Mark Shelton 
Date:   2016-11-04T20:57:37Z

changes KAFKA-4322 StateRestoreCallback begin and end indication




> StateRestoreCallback begin and end indication
> -
>
> Key: KAFKA-4322
> URL: https://issues.apache.org/jira/browse/KAFKA-4322
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Mark Shelton
>Assignee: Guozhang Wang
>Priority: Minor
>
> In Kafka Streams, the StateRestoreCallback interface provides only a single 
> method "restore(byte[] key, byte[] value)" that is called for every key-value 
> pair to be restored. 
> It would be nice to have "beginRestore" and "endRestore" methods as part of 
> StateRestoreCallback.
> Kafka Streams would call "beginRestore" before restoring any keys, and would 
> call "endRestore" when it determines that it is done. This allows an 
> implementation, for example, to report on the number of keys restored and 
> perform a commit after the last key was restored. Other uses are conceivable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4322) StateRestoreCallback begin and end indication

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15651069#comment-15651069
 ] 

ASF GitHub Bot commented on KAFKA-4322:
---

Github user markcshelton closed the pull request at:

https://github.com/apache/kafka/pull/2105


> StateRestoreCallback begin and end indication
> -
>
> Key: KAFKA-4322
> URL: https://issues.apache.org/jira/browse/KAFKA-4322
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Mark Shelton
>Assignee: Guozhang Wang
>Priority: Minor
>
> In Kafka Streams, the StateRestoreCallback interface provides only a single 
> method "restore(byte[] key, byte[] value)" that is called for every key-value 
> pair to be restored. 
> It would be nice to have "beginRestore" and "endRestore" methods as part of 
> StateRestoreCallback.
> Kafka Streams would call "beginRestore" before restoring any keys, and would 
> call "endRestore" when it determines that it is done. This allows an 
> implementation, for example, to report on the number of keys restored and 
> perform a commit after the last key was restored. Other uses are conceivable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2105: KAFKA-4322 StateRestoreCallback begin and end indi...

2016-11-09 Thread markcshelton
Github user markcshelton closed the pull request at:

https://github.com/apache/kafka/pull/2105


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2105: KAFKA-4322 StateRestoreCallback begin and end indi...

2016-11-09 Thread markcshelton
GitHub user markcshelton reopened a pull request:

https://github.com/apache/kafka/pull/2105

KAFKA-4322 StateRestoreCallback begin and end indication

This adds a begin and end callback to StateRestoreCallback.

The contribution is my original work and I license the work to Apache Kafka 
under the Kafka's open source license.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/markcshelton/kafka KAFKA-4322

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2105.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2105


commit 9f59ea6ff2aac94bdd84eaafda36c860b12e1dcd
Author: Mark Shelton 
Date:   2016-11-04T20:57:37Z

changes KAFKA-4322 StateRestoreCallback begin and end indication




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-09 Thread Magnus Edenhill
I think there is a piece missing in the Strings discussion, where
pro-Stringers
reason that by providing unique string identifiers for each header
everything will just
magically work for all parts of the stream pipeline.

But the strings dont mean anything by themselves, and while we could
probably envision
some auto plugin loader that downloads, compiles, links and runs plugins
on-demand
as soon as they're seen by a consumer, I dont really see a use-case for
something
so dynamic (and fragile) in practice.

In the real world an application will be configured with a set of plugins
to either add (producer)
or read (consumer) headers.
This is an administrative task based on what features a client
needs/provides and results in
some sort of configuration to enable and configure the desired plugins.

Since this needs to be kept somewhat in sync across an organisation (there
is no point in having producers
add headers no consumers will read, and vice versa), the added complexity
of assigning an id namespace
for each plugin as it is being configured should be tolerable.


/Magnus

2016-11-09 13:06 GMT+01:00 Michael Pearce :

> Just following/catching up on what seems to be an active night :)
>
> @Radai sorry if it may seem obvious but what does MD stand for?
>
> My take on String vs Int:
>
> I will state first I am pro Int (16 or 32).
>
> I do though playing devils advocate see a big plus with the argument of
> String keys, this is around integrating into an existing eco-system.
>
> As many other systems use String based headers (Flume, JMS)  it makes it
> much easier for these to be incorporated/integrated into.
>
> How with Int based headers could we provide a way/guidence to make this
> integration simple / easy with transition flows over to kafka?
>
> * tough luck buddy you're on your own
> * simply hash the string into int code and hope for no collisions (how to
> convert back though?)
> * http2 style as mentioned by nacho.
>
> cheers,
> Mike
>
>
> 
> From: radai 
> Sent: Wednesday, November 9, 2016 8:12 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-82 - Add Record Headers
>
> thinking about it some more, the best way to transmit the header remapping
> data to consumers would be to put it in the MD response payload, so maybe
> it should be discussed now.
>
>
> On Wed, Nov 9, 2016 at 12:09 AM, radai  wrote:
>
> > im not opposed to the idea of namespace mapping. all im saying is that
> its
> > not part of the "mvp" and, since it requires no wire format change, can
> > always be added later.
> > also, its not as simple as just configuring MM to do the transform: lets
> > say i've implemented large message support as {666,1} and on some mirror
> > target cluster its been remapped to {999,1}. the consumer plugin code
> would
> > also need to be told to look for the large message "part X of Y" header
> > under {999,1}. doable, but tricky.
> >
> > On Tue, Nov 8, 2016 at 10:29 PM, Gwen Shapira  wrote:
> >
> >> While you can do whatever you want with a namespace and your code,
> >> what I'd expect is for each app to namespaces configurable...
> >>
> >> So if I accidentally used 666 for my HR department, and still want to
> >> run RadaiApp, I can config "namespace=42" for RadaiApp and everything
> >> will look normal.
> >>
> >> This means you only need to sync usage inside your own organization.
> >> Still hard, but somewhat easier than syncing with the entire world.
> >>
> >> On Tue, Nov 8, 2016 at 10:07 PM, radai 
> >> wrote:
> >> > and we can start with {namespace, id} and no re-mapping support and
> >> always
> >> > add it later on if/when collisions actually happen (i dont think
> they'd
> >> be
> >> > a problem).
> >> >
> >> > every interested party (so orgs or individuals) could then register a
> >> > prefix (0 = reserved, 1 = confluent ... 666 = me :-) ) and do whatever
> >> with
> >> > the 2nd ID - so once linkedin registers, say 3, then linkedin devs are
> >> free
> >> > to use {3, *} with a reasonable expectation to to collide with
> anything
> >> > else. further partitioning of that * becomes linkedin's problem, but
> the
> >> > "upstream registration" of a namespace only has to happen once.
> >> >
> >> > On Tue, Nov 8, 2016 at 9:03 PM, James Cheng 
> >> wrote:
> >> >
> >> >>
> >> >>
> >> >>
> >> >> > On Nov 8, 2016, at 5:54 PM, Gwen Shapira 
> wrote:
> >> >> >
> >> >> > Thank you so much for this clear and fair summary of the arguments.
> >> >> >
> >> >> > I'm in favor of ints. Not a deal-breaker, but in favor.
> >> >> >
> >> >> > Even more in favor of Magnus's decentralized suggestion with
> Roger's
> >> >> > tweak: add a namespace for headers. This will allow each app to
> just
> >> >> > use whatever IDs it wants internally, and then let the admin
> >> deploying
> >> >> > the app figure out an available namespace ID for the app to live
> in.
> >> >> > So io.confluent.schema-registry can be namespace 0x01 on my
> >> deployment
> >> >> > and 0x57 on your

[jira] [Commented] (KAFKA-1894) Avoid long or infinite blocking in the consumer

2016-11-09 Thread Catalina-Alina Dobrica (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15650814#comment-15650814
 ] 

Catalina-Alina Dobrica commented on KAFKA-1894:
---

This issue also prevents the consumer's thread from being interrupted. This is 
particularly relevant when the consumer is integrated in an external system - 
like a camel ecosystem. Trying to force the shutdown of the ExecutorService 
that manages the thread or to terminate the thread itself has no effect and the 
thread is in the infinite loop. This eventually leads to OOME if enough such 
threads are started.
I found this issue when providing an incorrect SSL protocol to the consumer in 
version 0.10.1.0, but it can occur in any circumstance where the channel is not 
established - such as not having kafka enabled. The thread loops infinitely to 
check if this connection was established, which, in some cases, will never 
happen.

> Avoid long or infinite blocking in the consumer
> ---
>
> Key: KAFKA-1894
> URL: https://issues.apache.org/jira/browse/KAFKA-1894
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>Assignee: Jason Gustafson
> Fix For: 0.10.2.0
>
>
> The new consumer has a lot of loops that look something like
> {code}
>   while(!isThingComplete())
> client.poll();
> {code}
> This occurs both in KafkaConsumer but also in NetworkClient.completeAll. 
> These retry loops are actually mostly the behavior we want but there are 
> several cases where they may cause problems:
>  - In the case of a hard failure we may hang for a long time or indefinitely 
> before realizing the connection is lost.
>  - In the case where the cluster is malfunctioning or down we may retry 
> forever.
> It would probably be better to give a timeout to these. The proposed approach 
> would be to add something like retry.time.ms=6 and only continue retrying 
> for that period of time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (KAFKA-4189) Consumer poll hangs forever if kafka is disabled

2016-11-09 Thread Catalina-Alina Dobrica (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Catalina-Alina Dobrica updated KAFKA-4189:
--
Comment: was deleted

(was: This issue also prevents the consumer's thread from being interrupted. 
This is particularly relevant when the consumer is integrated in an external 
system - like a camel ecosystem. Trying to force the shutdown of the 
ExecutorService that manages the thread or to terminate the thread itself has 
no effect and the thread is in an infinite loop in 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient#awaitMetadataUpdate().
 This eventually leads to OOME if enough such threads are started.
I found this issue when providing an incorrect SSL protocol to the consumer in 
version 0.10.1.0, but it can occur in any circumstance where the channel is not 
established - such as not having kafka enabled. The thread loops infinitely to 
check if this connection was established, which, in some cases, will never 
happen.)

> Consumer poll hangs forever if kafka is disabled
> 
>
> Key: KAFKA-4189
> URL: https://issues.apache.org/jira/browse/KAFKA-4189
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.9.0.1, 0.10.0.1
>Reporter: Tomas Benc
>Priority: Critical
>
> We develop web application, where client sends REST request and our 
> application downloads messages from Kafka and sends those messages back to 
> client. In our web application we use "New Consumer API" (not High Level nor 
> Simple Consumer API).
> Problem occurs in case of disabling Kafka and web application is running on. 
> Application receives request and tries to poll messages from Kafka. 
> Processing is on that line blocked until Kafka is enabled.
> ConsumerRecords records = consumer.poll(1000);
> Timeout parameter of the poll method has no influence in such case. I expect 
> poll method could throw some Exception describing about connection issues.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4189) Consumer poll hangs forever if kafka is disabled

2016-11-09 Thread Catalina-Alina Dobrica (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15650781#comment-15650781
 ] 

Catalina-Alina Dobrica commented on KAFKA-4189:
---

This issue also prevents the consumer's thread from being interrupted. This is 
particularly relevant when the consumer is integrated in an external system - 
like a camel ecosystem. Trying to force the shutdown of the ExecutorService 
that manages the thread or to terminate the thread itself has no effect and the 
thread is in an infinite loop in 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient#awaitMetadataUpdate().
 This eventually leads to OOME if enough such threads are started.
I found this issue when providing an incorrect SSL protocol to the consumer in 
version 0.10.1.0, but it can occur in any circumstance where the channel is not 
established - such as not having kafka enabled. The thread loops infinitely to 
check if this connection was established, which, in some cases, will never 
happen.

> Consumer poll hangs forever if kafka is disabled
> 
>
> Key: KAFKA-4189
> URL: https://issues.apache.org/jira/browse/KAFKA-4189
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.9.0.1, 0.10.0.1
>Reporter: Tomas Benc
>Priority: Critical
>
> We develop web application, where client sends REST request and our 
> application downloads messages from Kafka and sends those messages back to 
> client. In our web application we use "New Consumer API" (not High Level nor 
> Simple Consumer API).
> Problem occurs in case of disabling Kafka and web application is running on. 
> Application receives request and tries to poll messages from Kafka. 
> Processing is on that line blocked until Kafka is enabled.
> ConsumerRecords records = consumer.poll(1000);
> Timeout parameter of the poll method has no influence in such case. I expect 
> poll method could throw some Exception describing about connection issues.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-09 Thread Michael Pearce
Just following/catching up on what seems to be an active night :)

@Radai sorry if it may seem obvious but what does MD stand for?

My take on String vs Int:

I will state first I am pro Int (16 or 32).

I do though playing devils advocate see a big plus with the argument of String 
keys, this is around integrating into an existing eco-system.

As many other systems use String based headers (Flume, JMS)  it makes it much 
easier for these to be incorporated/integrated into.

How with Int based headers could we provide a way/guidence to make this 
integration simple / easy with transition flows over to kafka?

* tough luck buddy you're on your own
* simply hash the string into int code and hope for no collisions (how to 
convert back though?)
* http2 style as mentioned by nacho.

cheers,
Mike



From: radai 
Sent: Wednesday, November 9, 2016 8:12 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-82 - Add Record Headers

thinking about it some more, the best way to transmit the header remapping
data to consumers would be to put it in the MD response payload, so maybe
it should be discussed now.


On Wed, Nov 9, 2016 at 12:09 AM, radai  wrote:

> im not opposed to the idea of namespace mapping. all im saying is that its
> not part of the "mvp" and, since it requires no wire format change, can
> always be added later.
> also, its not as simple as just configuring MM to do the transform: lets
> say i've implemented large message support as {666,1} and on some mirror
> target cluster its been remapped to {999,1}. the consumer plugin code would
> also need to be told to look for the large message "part X of Y" header
> under {999,1}. doable, but tricky.
>
> On Tue, Nov 8, 2016 at 10:29 PM, Gwen Shapira  wrote:
>
>> While you can do whatever you want with a namespace and your code,
>> what I'd expect is for each app to namespaces configurable...
>>
>> So if I accidentally used 666 for my HR department, and still want to
>> run RadaiApp, I can config "namespace=42" for RadaiApp and everything
>> will look normal.
>>
>> This means you only need to sync usage inside your own organization.
>> Still hard, but somewhat easier than syncing with the entire world.
>>
>> On Tue, Nov 8, 2016 at 10:07 PM, radai 
>> wrote:
>> > and we can start with {namespace, id} and no re-mapping support and
>> always
>> > add it later on if/when collisions actually happen (i dont think they'd
>> be
>> > a problem).
>> >
>> > every interested party (so orgs or individuals) could then register a
>> > prefix (0 = reserved, 1 = confluent ... 666 = me :-) ) and do whatever
>> with
>> > the 2nd ID - so once linkedin registers, say 3, then linkedin devs are
>> free
>> > to use {3, *} with a reasonable expectation to to collide with anything
>> > else. further partitioning of that * becomes linkedin's problem, but the
>> > "upstream registration" of a namespace only has to happen once.
>> >
>> > On Tue, Nov 8, 2016 at 9:03 PM, James Cheng 
>> wrote:
>> >
>> >>
>> >>
>> >>
>> >> > On Nov 8, 2016, at 5:54 PM, Gwen Shapira  wrote:
>> >> >
>> >> > Thank you so much for this clear and fair summary of the arguments.
>> >> >
>> >> > I'm in favor of ints. Not a deal-breaker, but in favor.
>> >> >
>> >> > Even more in favor of Magnus's decentralized suggestion with Roger's
>> >> > tweak: add a namespace for headers. This will allow each app to just
>> >> > use whatever IDs it wants internally, and then let the admin
>> deploying
>> >> > the app figure out an available namespace ID for the app to live in.
>> >> > So io.confluent.schema-registry can be namespace 0x01 on my
>> deployment
>> >> > and 0x57 on yours, and the poor guys developing the app don't need to
>> >> > worry about that.
>> >> >
>> >>
>> >> Gwen, if I understand your example right, an application deployer might
>> >> decide to use 0x01 in one deployment, and that means that once the
>> message
>> >> is written into the broker, it will be saved on the broker with that
>> >> specific namespace (0x01).
>> >>
>> >> If you were to mirror that message into another cluster, the 0x01 would
>> >> accompany the message, right? What if the deployers of the same app in
>> the
>> >> other cluster uses 0x57? They won't understand each other?
>> >>
>> >> I'm not sure that's an avoidable problem. I think it simply means that
>> in
>> >> order to share data, you have to also have a shared (agreed upon)
>> >> understanding of what the namespaces mean. Which I think makes sense,
>> >> because the alternate (sharing *nothing* at all) would mean that there
>> >> would be no way to understand each other.
>> >>
>> >> -James
>> >>
>> >> > Gwen
>> >> >
>> >> > On Tue, Nov 8, 2016 at 4:23 PM, radai 
>> >> wrote:
>> >> >> +1 for sean's document. it covers pretty much all the trade-offs and
>> >> >> provides concrete figures to argue about :-)
>> >> >> (nit-picking - used the same xkcd twice, also trove has been
>> superceded
>> >> for
>> >> >> purposes of high performance 

[jira] [Resolved] (KAFKA-4367) MirrorMaker shuts down gracefully without actually being stopped

2016-11-09 Thread Alex (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex resolved KAFKA-4367.
-
Resolution: Not A Problem

Not a bug. We had to run kafka-mirror-maker.sh with nohup to completely detach 
the process from terminal. 





> MirrorMaker shuts down gracefully without actually being stopped
> 
>
> Key: KAFKA-4367
> URL: https://issues.apache.org/jira/browse/KAFKA-4367
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
> Environment: RHEL 7
>Reporter: Alex
>
> MirrorMaker stops working without being stopped. From 30 minutes to 20 hours. 
> No clue why this problem occurs.
> Start:
> bin/kafka-mirror-maker.sh --new.consumer --consumer.config 
> config/ssl_mirroring_consumer.properties --producer.config 
> config/ssl_mirroring_producer.properties --whitelist 
> "TOPIC1|TOPIC2|TOPIC3|TOPIC4" --num.streams 20 &> /dev/null &
> 
>   kafka-mirror-maker.log
> 
> [2016-11-01 19:23:32,003] TRACE Produced messages to topic-partition 
> CEP.FS.IN-175 with base offset offset 15015 and error: null. 
> (org.apache.kafka.clients.producer.internals.RecordBatch)
> [2016-11-01 19:23:32,003] TRACE Produced messages to topic-partition 
> CEP.FS.IN-151 with base offset offset 15066 and error: null. 
> (org.apache.kafka.clients.producer.internals.RecordBatch)
> [2016-11-01 19:23:32,003] TRACE Nodes with data ready to send: [Node(8, 
> 10.126.0.2, 9092)] (org.apache.kafka.clients.producer.internals.Sender)
> [2016-11-01 19:23:32,003] TRACE Created 1 produce requests: 
> [ClientRequest(expectResponse=true, 
> callback=org.apache.kafka.clients.producer.internals.Sender$1@483c4c7a, 
> request=RequestSend(header={api_key=0,api_version=1,correlation_id=219685,client_id=producer-1},
>  
> body={acks=-1,timeout=3,topic_data=[{topic=CEP.FS.IN,data=[{partition=133,record_set=java.nio.HeapByteBuffer[pos=0
>  lim=9085 cap=16384]}]}]}), createdTimeMs=1478017412003, sendTimeMs=0)] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2016-11-01 19:23:32,008] TRACE Returning fetched records for assigned 
> partition CEP.FS.IN-172 and update consumed position to 3869316 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-11-01 19:23:32,008] TRACE [mirrormaker-thread-7] Sending message with 
> value size 485 and offset 3869315 (kafka.tools.MirrorMaker$MirrorMakerThread)
> [2016-11-01 19:23:32,008] TRACE Sending record 
> ProducerRecord(topic=CEP.FS.IN, partition=null, key=null, value=[B@12a54f5a 
> with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@5ea65b8f to 
> topic CEP.FS.IN partition 160 
> (org.apache.kafka.clients.producer.KafkaProducer)
> [2016-11-01 19:23:32,008] TRACE Allocating a new 16384 byte message buffer 
> for topic CEP.FS.IN partition 160 
> (org.apache.kafka.clients.producer.internals.RecordAccumulator)
> [2016-11-01 19:23:32,008] TRACE Waking up the sender since topic CEP.FS.IN 
> partition 160 is either full or getting a new batch 
> (org.apache.kafka.clients.producer.KafkaProducer)
> [2016-11-01 19:23:32,010] TRACE Received produce response from node 7 with 
> correlation id 219684 (org.apache.kafka.clients.producer.internals.Sender)
> [2016-11-01 19:23:32,010] TRACE Produced messages to topic-partition 
> CEP.FS.IN-106 with base offset offset 15086 and error: null. 
> (org.apache.kafka.clients.producer.internals.RecordBatch)
> [2016-11-01 19:23:32,010] TRACE Produced messages to topic-partition 
> CEP.FS.IN-124 with base offset offset 15095 and error: null. 
> (org.apache.kafka.clients.producer.internals.RecordBatch)
> [2016-11-01 19:23:32,010] TRACE Nodes with data ready to send: [Node(7, 
> 10.126.0.1, 9092)] (org.apache.kafka.clients.producer.internals.Sender)
> [2016-11-01 19:23:32,010] INFO Start clean shutdown. 
> (kafka.tools.MirrorMaker$)
> [2016-11-01 19:23:32,010] TRACE Created 1 produce requests: 
> [ClientRequest(expectResponse=true, 
> callback=org.apache.kafka.clients.producer.internals.Sender$1@44b788c7, 
> request=RequestSend(header={api_key=0,api_version=1,correlation_id=219686,client_id=producer-1},
>  
> body={acks=-1,timeout=3,topic_data=[{topic=CEP.FS.IN,data=[{partition=160,record_set=java.nio.HeapByteBuffer[pos=0
>  lim=511 cap=16384]}]}]}), createdTimeMs=1478017412010, sendTimeMs=0)] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2016-11-01 19:23:32,010] INFO Shutting down consumer threads. 
> (kafka.tools.MirrorMaker$)
> [2016-11-01 19:23:32,011] INFO [mirrormaker-thread-0] mirrormaker-thread-0 
> shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
> [2016-11-01 19:23:32,011] INFO [mirrormaker-thread

[jira] [Commented] (KAFKA-4367) MirrorMaker shuts down gracefully without actually being stopped

2016-11-09 Thread Alex (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15650572#comment-15650572
 ] 

Alex commented on KAFKA-4367:
-

No, we didn't! That was the issue! Thanks for mentioning. We had to use nohup. 

> MirrorMaker shuts down gracefully without actually being stopped
> 
>
> Key: KAFKA-4367
> URL: https://issues.apache.org/jira/browse/KAFKA-4367
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
> Environment: RHEL 7
>Reporter: Alex
>
> MirrorMaker stops working without being stopped. From 30 minutes to 20 hours. 
> No clue why this problem occurs.
> Start:
> bin/kafka-mirror-maker.sh --new.consumer --consumer.config 
> config/ssl_mirroring_consumer.properties --producer.config 
> config/ssl_mirroring_producer.properties --whitelist 
> "TOPIC1|TOPIC2|TOPIC3|TOPIC4" --num.streams 20 &> /dev/null &
> 
>   kafka-mirror-maker.log
> 
> [2016-11-01 19:23:32,003] TRACE Produced messages to topic-partition 
> CEP.FS.IN-175 with base offset offset 15015 and error: null. 
> (org.apache.kafka.clients.producer.internals.RecordBatch)
> [2016-11-01 19:23:32,003] TRACE Produced messages to topic-partition 
> CEP.FS.IN-151 with base offset offset 15066 and error: null. 
> (org.apache.kafka.clients.producer.internals.RecordBatch)
> [2016-11-01 19:23:32,003] TRACE Nodes with data ready to send: [Node(8, 
> 10.126.0.2, 9092)] (org.apache.kafka.clients.producer.internals.Sender)
> [2016-11-01 19:23:32,003] TRACE Created 1 produce requests: 
> [ClientRequest(expectResponse=true, 
> callback=org.apache.kafka.clients.producer.internals.Sender$1@483c4c7a, 
> request=RequestSend(header={api_key=0,api_version=1,correlation_id=219685,client_id=producer-1},
>  
> body={acks=-1,timeout=3,topic_data=[{topic=CEP.FS.IN,data=[{partition=133,record_set=java.nio.HeapByteBuffer[pos=0
>  lim=9085 cap=16384]}]}]}), createdTimeMs=1478017412003, sendTimeMs=0)] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2016-11-01 19:23:32,008] TRACE Returning fetched records for assigned 
> partition CEP.FS.IN-172 and update consumed position to 3869316 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2016-11-01 19:23:32,008] TRACE [mirrormaker-thread-7] Sending message with 
> value size 485 and offset 3869315 (kafka.tools.MirrorMaker$MirrorMakerThread)
> [2016-11-01 19:23:32,008] TRACE Sending record 
> ProducerRecord(topic=CEP.FS.IN, partition=null, key=null, value=[B@12a54f5a 
> with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@5ea65b8f to 
> topic CEP.FS.IN partition 160 
> (org.apache.kafka.clients.producer.KafkaProducer)
> [2016-11-01 19:23:32,008] TRACE Allocating a new 16384 byte message buffer 
> for topic CEP.FS.IN partition 160 
> (org.apache.kafka.clients.producer.internals.RecordAccumulator)
> [2016-11-01 19:23:32,008] TRACE Waking up the sender since topic CEP.FS.IN 
> partition 160 is either full or getting a new batch 
> (org.apache.kafka.clients.producer.KafkaProducer)
> [2016-11-01 19:23:32,010] TRACE Received produce response from node 7 with 
> correlation id 219684 (org.apache.kafka.clients.producer.internals.Sender)
> [2016-11-01 19:23:32,010] TRACE Produced messages to topic-partition 
> CEP.FS.IN-106 with base offset offset 15086 and error: null. 
> (org.apache.kafka.clients.producer.internals.RecordBatch)
> [2016-11-01 19:23:32,010] TRACE Produced messages to topic-partition 
> CEP.FS.IN-124 with base offset offset 15095 and error: null. 
> (org.apache.kafka.clients.producer.internals.RecordBatch)
> [2016-11-01 19:23:32,010] TRACE Nodes with data ready to send: [Node(7, 
> 10.126.0.1, 9092)] (org.apache.kafka.clients.producer.internals.Sender)
> [2016-11-01 19:23:32,010] INFO Start clean shutdown. 
> (kafka.tools.MirrorMaker$)
> [2016-11-01 19:23:32,010] TRACE Created 1 produce requests: 
> [ClientRequest(expectResponse=true, 
> callback=org.apache.kafka.clients.producer.internals.Sender$1@44b788c7, 
> request=RequestSend(header={api_key=0,api_version=1,correlation_id=219686,client_id=producer-1},
>  
> body={acks=-1,timeout=3,topic_data=[{topic=CEP.FS.IN,data=[{partition=160,record_set=java.nio.HeapByteBuffer[pos=0
>  lim=511 cap=16384]}]}]}), createdTimeMs=1478017412010, sendTimeMs=0)] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2016-11-01 19:23:32,010] INFO Shutting down consumer threads. 
> (kafka.tools.MirrorMaker$)
> [2016-11-01 19:23:32,011] INFO [mirrormaker-thread-0] mirrormaker-thread-0 
> shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
> [2016-11-01 19:23:32,011] INFO [mirrormaker-thread-1] mir

[jira] [Comment Edited] (KAFKA-4386) Producer Metrics Explanation

2016-11-09 Thread Pratik kumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15650180#comment-15650180
 ] 

Pratik kumar edited comment on KAFKA-4386 at 11/9/16 9:20 AM:
--

Just for clarification:
Async producers use sync producers internally to send batched 
messages(TopicPartition X ByteBufferMessageSet). So each invocation of send in 
sync producer should contain a collection of messages for a particular broker 
rather than single messages.
These metrics are released for each call to this method , which encapsulates 
the above functionality . So shouldn't these metrics be tracked for each 
batch(for each broker) rather than request level?


was (Author: pratik.kumar):
Just for clarification:
Async producers use sync producers internally to send batched 
messages(TopicPartition X ByteBufferMessageSet). So each invocation of send in 
sync producer should contain the a collection of messages rather than a single 
message. 
These metrics are released for each call to this method , which encapsulates 
the above functionality . So shouldn't these metrics be tracked for each batch 
rather than request level?

> Producer Metrics Explanation
> 
>
> Key: KAFKA-4386
> URL: https://issues.apache.org/jira/browse/KAFKA-4386
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Pratik kumar
>  Labels: producer
>
> Context :
> Kafka Producer 0.8.x
> Problem:
> Kafka Producer emits metrics regarding request size stats, request latency 
> and request rate stats.
> But the inherent meaning of the these metrics are not clear. What does this 
> measure?
> Is for each producer send request(which contains batches of messages per 
> broker)? OR Is it for a batch of messages defined according to user batching 
> policy? What happens when some application code has multiple async producers 
> to increase performance (how are rate and percentiles measured?)?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4386) Producer Metrics Explanation

2016-11-09 Thread Pratik kumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15650180#comment-15650180
 ] 

Pratik kumar commented on KAFKA-4386:
-

Just for clarification:
Async producers use sync producers internally to send batched 
messages(TopicPartition X ByteBufferMessageSet). So each invocation of send in 
sync producer should contain the a collection of messages rather than a single 
message. 
These metrics are released for each call to this method , which encapsulates 
the above functionality . So shouldn't these metrics be tracked for each batch 
rather than request level?

> Producer Metrics Explanation
> 
>
> Key: KAFKA-4386
> URL: https://issues.apache.org/jira/browse/KAFKA-4386
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Pratik kumar
>  Labels: producer
>
> Context :
> Kafka Producer 0.8.x
> Problem:
> Kafka Producer emits metrics regarding request size stats, request latency 
> and request rate stats.
> But the inherent meaning of the these metrics are not clear. What does this 
> measure?
> Is for each producer send request(which contains batches of messages per 
> broker)? OR Is it for a batch of messages defined according to user batching 
> policy? What happens when some application code has multiple async producers 
> to increase performance (how are rate and percentiles measured?)?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-09 Thread radai
thinking about it some more, the best way to transmit the header remapping
data to consumers would be to put it in the MD response payload, so maybe
it should be discussed now.


On Wed, Nov 9, 2016 at 12:09 AM, radai  wrote:

> im not opposed to the idea of namespace mapping. all im saying is that its
> not part of the "mvp" and, since it requires no wire format change, can
> always be added later.
> also, its not as simple as just configuring MM to do the transform: lets
> say i've implemented large message support as {666,1} and on some mirror
> target cluster its been remapped to {999,1}. the consumer plugin code would
> also need to be told to look for the large message "part X of Y" header
> under {999,1}. doable, but tricky.
>
> On Tue, Nov 8, 2016 at 10:29 PM, Gwen Shapira  wrote:
>
>> While you can do whatever you want with a namespace and your code,
>> what I'd expect is for each app to namespaces configurable...
>>
>> So if I accidentally used 666 for my HR department, and still want to
>> run RadaiApp, I can config "namespace=42" for RadaiApp and everything
>> will look normal.
>>
>> This means you only need to sync usage inside your own organization.
>> Still hard, but somewhat easier than syncing with the entire world.
>>
>> On Tue, Nov 8, 2016 at 10:07 PM, radai 
>> wrote:
>> > and we can start with {namespace, id} and no re-mapping support and
>> always
>> > add it later on if/when collisions actually happen (i dont think they'd
>> be
>> > a problem).
>> >
>> > every interested party (so orgs or individuals) could then register a
>> > prefix (0 = reserved, 1 = confluent ... 666 = me :-) ) and do whatever
>> with
>> > the 2nd ID - so once linkedin registers, say 3, then linkedin devs are
>> free
>> > to use {3, *} with a reasonable expectation to to collide with anything
>> > else. further partitioning of that * becomes linkedin's problem, but the
>> > "upstream registration" of a namespace only has to happen once.
>> >
>> > On Tue, Nov 8, 2016 at 9:03 PM, James Cheng 
>> wrote:
>> >
>> >>
>> >>
>> >>
>> >> > On Nov 8, 2016, at 5:54 PM, Gwen Shapira  wrote:
>> >> >
>> >> > Thank you so much for this clear and fair summary of the arguments.
>> >> >
>> >> > I'm in favor of ints. Not a deal-breaker, but in favor.
>> >> >
>> >> > Even more in favor of Magnus's decentralized suggestion with Roger's
>> >> > tweak: add a namespace for headers. This will allow each app to just
>> >> > use whatever IDs it wants internally, and then let the admin
>> deploying
>> >> > the app figure out an available namespace ID for the app to live in.
>> >> > So io.confluent.schema-registry can be namespace 0x01 on my
>> deployment
>> >> > and 0x57 on yours, and the poor guys developing the app don't need to
>> >> > worry about that.
>> >> >
>> >>
>> >> Gwen, if I understand your example right, an application deployer might
>> >> decide to use 0x01 in one deployment, and that means that once the
>> message
>> >> is written into the broker, it will be saved on the broker with that
>> >> specific namespace (0x01).
>> >>
>> >> If you were to mirror that message into another cluster, the 0x01 would
>> >> accompany the message, right? What if the deployers of the same app in
>> the
>> >> other cluster uses 0x57? They won't understand each other?
>> >>
>> >> I'm not sure that's an avoidable problem. I think it simply means that
>> in
>> >> order to share data, you have to also have a shared (agreed upon)
>> >> understanding of what the namespaces mean. Which I think makes sense,
>> >> because the alternate (sharing *nothing* at all) would mean that there
>> >> would be no way to understand each other.
>> >>
>> >> -James
>> >>
>> >> > Gwen
>> >> >
>> >> > On Tue, Nov 8, 2016 at 4:23 PM, radai 
>> >> wrote:
>> >> >> +1 for sean's document. it covers pretty much all the trade-offs and
>> >> >> provides concrete figures to argue about :-)
>> >> >> (nit-picking - used the same xkcd twice, also trove has been
>> superceded
>> >> for
>> >> >> purposes of high performance collections: look at
>> >> >> https://github.com/leventov/Koloboke)
>> >> >>
>> >> >> so to sum up the string vs int debate:
>> >> >>
>> >> >> performance - you can do 140k ops/sec _per thread_ with string
>> headers.
>> >> you
>> >> >> could do x2-3 better with ints. there's no arguing the relative diff
>> >> >> between the two, there's only the question of whether or not _the
>> rest
>> >> of
>> >> >> kafka_ operates fast enough to care. if we want to make choices
>> solely
>> >> >> based on performance we need ints. if we are willing to
>> >> settle/compromise
>> >> >> for a nicer (to some) API than strings are good enough for the
>> current
>> >> >> state of affairs.
>> >> >>
>> >> >> message size - with batching and compression it comes down to a ~5%
>> >> >> difference (internal testing, not in the doc. maybe would help
>> adding if
>> >> >> this becomes a point of contention?). this means it wont really
>> affect
>> >> >> kafka in "throughput mode" (large, 

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-09 Thread radai
im not opposed to the idea of namespace mapping. all im saying is that its
not part of the "mvp" and, since it requires no wire format change, can
always be added later.
also, its not as simple as just configuring MM to do the transform: lets
say i've implemented large message support as {666,1} and on some mirror
target cluster its been remapped to {999,1}. the consumer plugin code would
also need to be told to look for the large message "part X of Y" header
under {999,1}. doable, but tricky.

On Tue, Nov 8, 2016 at 10:29 PM, Gwen Shapira  wrote:

> While you can do whatever you want with a namespace and your code,
> what I'd expect is for each app to namespaces configurable...
>
> So if I accidentally used 666 for my HR department, and still want to
> run RadaiApp, I can config "namespace=42" for RadaiApp and everything
> will look normal.
>
> This means you only need to sync usage inside your own organization.
> Still hard, but somewhat easier than syncing with the entire world.
>
> On Tue, Nov 8, 2016 at 10:07 PM, radai  wrote:
> > and we can start with {namespace, id} and no re-mapping support and
> always
> > add it later on if/when collisions actually happen (i dont think they'd
> be
> > a problem).
> >
> > every interested party (so orgs or individuals) could then register a
> > prefix (0 = reserved, 1 = confluent ... 666 = me :-) ) and do whatever
> with
> > the 2nd ID - so once linkedin registers, say 3, then linkedin devs are
> free
> > to use {3, *} with a reasonable expectation to to collide with anything
> > else. further partitioning of that * becomes linkedin's problem, but the
> > "upstream registration" of a namespace only has to happen once.
> >
> > On Tue, Nov 8, 2016 at 9:03 PM, James Cheng 
> wrote:
> >
> >>
> >>
> >>
> >> > On Nov 8, 2016, at 5:54 PM, Gwen Shapira  wrote:
> >> >
> >> > Thank you so much for this clear and fair summary of the arguments.
> >> >
> >> > I'm in favor of ints. Not a deal-breaker, but in favor.
> >> >
> >> > Even more in favor of Magnus's decentralized suggestion with Roger's
> >> > tweak: add a namespace for headers. This will allow each app to just
> >> > use whatever IDs it wants internally, and then let the admin deploying
> >> > the app figure out an available namespace ID for the app to live in.
> >> > So io.confluent.schema-registry can be namespace 0x01 on my deployment
> >> > and 0x57 on yours, and the poor guys developing the app don't need to
> >> > worry about that.
> >> >
> >>
> >> Gwen, if I understand your example right, an application deployer might
> >> decide to use 0x01 in one deployment, and that means that once the
> message
> >> is written into the broker, it will be saved on the broker with that
> >> specific namespace (0x01).
> >>
> >> If you were to mirror that message into another cluster, the 0x01 would
> >> accompany the message, right? What if the deployers of the same app in
> the
> >> other cluster uses 0x57? They won't understand each other?
> >>
> >> I'm not sure that's an avoidable problem. I think it simply means that
> in
> >> order to share data, you have to also have a shared (agreed upon)
> >> understanding of what the namespaces mean. Which I think makes sense,
> >> because the alternate (sharing *nothing* at all) would mean that there
> >> would be no way to understand each other.
> >>
> >> -James
> >>
> >> > Gwen
> >> >
> >> > On Tue, Nov 8, 2016 at 4:23 PM, radai 
> >> wrote:
> >> >> +1 for sean's document. it covers pretty much all the trade-offs and
> >> >> provides concrete figures to argue about :-)
> >> >> (nit-picking - used the same xkcd twice, also trove has been
> superceded
> >> for
> >> >> purposes of high performance collections: look at
> >> >> https://github.com/leventov/Koloboke)
> >> >>
> >> >> so to sum up the string vs int debate:
> >> >>
> >> >> performance - you can do 140k ops/sec _per thread_ with string
> headers.
> >> you
> >> >> could do x2-3 better with ints. there's no arguing the relative diff
> >> >> between the two, there's only the question of whether or not _the
> rest
> >> of
> >> >> kafka_ operates fast enough to care. if we want to make choices
> solely
> >> >> based on performance we need ints. if we are willing to
> >> settle/compromise
> >> >> for a nicer (to some) API than strings are good enough for the
> current
> >> >> state of affairs.
> >> >>
> >> >> message size - with batching and compression it comes down to a ~5%
> >> >> difference (internal testing, not in the doc. maybe would help
> adding if
> >> >> this becomes a point of contention?). this means it wont really
> affect
> >> >> kafka in "throughput mode" (large, compressed batches). in "low
> latency"
> >> >> mode (meaning less/no batching and compression) the difference can be
> >> >> extreme (it'll easily be an order of magnitude with small payloads
> like
> >> >> stock ticks and header keys of the form
> >> >> "com.acme.infraTeam.kafka.hiMom.auditPlugin"). we have a few such
> >> topics at
> >> >> linkedin wher