Kafka Upgrade from 8.2 to 10.2

2017-07-20 Thread Kiran Singh
Hi Team,

I am having Kafka setup as follows:
Kafka server version: 10.2
inter.broker.protocol.version: 8.2
log.message.format.version: 8.2

Kafka client version: 8.2

Now i need to change following properties

inter.broker.protocol.version: 10.2
log.message.format.version: 10.2

Kafka Client version: 10.2



*My question are1. Can i used same 8.2 kafka producer and kafka consumer
creation logic with Kafka client 10.2 version jar? *











*Producer creation with following
properties:key.serializervalue.serializerbootstrap.serversclient.id
 acks retries timeout.ms 
metadata.fetch.timeout.ms 
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
*












*Consumer creation with following
properties:zookeeper.connectzookeeper.connection.timeout.ms
auto.offset.resetauto.commit.enablezookeeper.session.timeout.ms
group.id client.id
offsets.storage : kafkadual.commit.enabled:
falsehttps://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
*


*2. Do you see any challenge with this approach? *

*Thanks in advance*

*Kiran Singh *


Re: Help please: Topics deleted, but Kafka continues to try and sync them with Zookeeper

2017-07-20 Thread Chris Neal
Thanks again for the replies.  VERY much appreciated.  I'll check both
/admin/delete_topics and /config/topics.

Chris

On Thu, Jul 20, 2017 at 9:22 PM, Carl Haferd 
wrote:

> If delete normally works, there would hopefully be some log entries when it
> fails.  Are there any unusual zookeeper entries in the /admin/delete_topics
> path or in the other /admin folders?
>
> Does the topic name still exist in zookeeper under /config/topics?  If so,
> that should probably deleted as well.
>
> Carl
>
> On Thu, Jul 20, 2017 at 6:42 PM, Chris Neal  wrote:
>
> > Delete is definitely there.  The delete worked fine, based on the fact
> that
> > there is nothing in Zookeeper, and that the controller reported that the
> > delete was successful, it's just something seems to have gotten out of
> > sync.
> >
> > delete.topic.enabled is true.  I've successfully deleted topics in the
> > past, so I know it *should* work. :)
> >
> > I also had already checked in Zookeeper, and there is no directory for
> the
> > topics under /brokers/topics  Very strange indeed.
> >
> > If I just remove the log directories from the filesystem, is that enough
> to
> > get the broker to stop asking about the topics?  I would guess there
> would
> > need to be more than just that, but I could be wrong.
> >
> > Thanks guys for the suggestions though!
> >
> > On Thu, Jul 20, 2017 at 8:19 PM, Stephen Powis 
> > wrote:
> >
> > > I could be totally wrong, but I seem to recall that delete wasn't fully
> > > implemented in 0.8.x?
> > >
> > > On Fri, Jul 21, 2017 at 10:10 AM, Carl Haferd
> >  > > >
> > > wrote:
> > >
> > > > Chris,
> > > >
> > > > You could first check to make sure that delete.topic.enable is true
> and
> > > try
> > > > deleting again if not.  If that doesn't work with 0.8.1.1 you might
> > need
> > > to
> > > > manually remove the topic's log files from the configured log.dirs
> > folder
> > > > on each broker in addition to removing the topic's zookeeper path.
> > > >
> > > > Carl
> > > >
> > > > On Thu, Jul 20, 2017 at 10:06 AM, Chris Neal 
> wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I have a weird situation here.  I have deleted a few topics on my
> > > 0.8.1.1
> > > > > cluster (old, I know...).  The deletes succeeded according to the
> > > > > controller.log:
> > > > >
> > > > > [2017-07-20 16:40:31,175] INFO [TopicChangeListener on Controller
> 1]:
> > > New
> > > > > topics: [Set()], deleted topics:
> > > > > [Set(perf_doorway-supplier-adapter-uat_raw)], new partition
> replica
> > > > > assignment [Map()]
> > > > > (kafka.controller.PartitionStateMachine$TopicChangeListener)
> > > > > [2017-07-20 16:40:33,507] INFO [TopicChangeListener on Controller
> 1]:
> > > New
> > > > > topics: [Set()], deleted topics:
> > > > > [Set(perf_doorway-supplier-scheduler-uat_raw)], new partition
> > replica
> > > > > assignment [Map()]
> > > > > (kafka.controller.PartitionStateMachine$TopicChangeListener)
> > > > > [2017-07-20 16:40:36,504] INFO [TopicChangeListener on Controller
> 1]:
> > > New
> > > > > topics: [Set()], deleted topics: [Set(perf_gocontent-uat_raw)], new
> > > > > partition replica assignment [Map()]
> > > > > (kafka.controller.PartitionStateMachine$TopicChangeListener)
> > > > > [2017-07-20 16:40:38,290] INFO [TopicChangeListener on Controller
> 1]:
> > > New
> > > > > topics: [Set()], deleted topics: [Set(perf_goplatform-uat_raw)],
> new
> > > > > partition replica assignment [Map()]
> > > > > (kafka.controller.PartitionStateMachine$TopicChangeListener)
> > > > >
> > > > > I query Zookeeper and the path is not there under /brokers/topics
> as
> > > > well.
> > > > >
> > > > > But, one of the nodes in my cluster continues to try and use them:
> > > > >
> > > > > [2017-07-20 17:04:36,723] ERROR Conditional update of path
> > > > > /brokers/topics/perf_doorway-supplier-scheduler-uat_raw/
> > > > partitions/3/state
> > > > > with data
> > > > > {"controller_epoch":34,"leader":1,"version":1,"leader_
> > > > > epoch":2,"isr":[1,0]}
> > > > > and expected version 69 failed due to
> > > > > org.apache.zookeeper.KeeperException$NoNodeException:
> > KeeperErrorCode
> > > =
> > > > > NoNode for
> > > > > /brokers/topics/perf_doorway-supplier-scheduler-uat_raw/
> > > > partitions/3/state
> > > > > (kafka.utils.ZkUtils$)
> > > > > [2017-07-20 17:04:36,723] INFO Partition
> > > > > [perf_doorway-supplier-scheduler-uat_raw,3] on broker 1: Cached
> > > > zkVersion
> > > > > [69] not equal to that in zookeeper, skip updating ISR
> > > > > (kafka.cluster.Partition)
> > > > > [2017-07-20 17:04:36,723] INFO Partition
> > > > > [perf_doorway-supplier-scheduler-uat_raw,3] on broker 1: Cached
> > > > zkVersion
> > > > > [69] not equal to that in zookeeper, skip updating ISR
> > > > > (kafka.cluster.Partition)
> > > > > [2017-07-20 17:04:36,764] INFO Partition
> [perf_goplatform-uat_raw,2]
> > on
> > > > > broker 1: Shrinking ISR for partition [perf_goplatform-uat_raw,2]
> > from
> > > > 1,0
> > > > > to 1 (kafka.cluster.Partition)
>

Re: Help please: Topics deleted, but Kafka continues to try and sync them with Zookeeper

2017-07-20 Thread Carl Haferd
If delete normally works, there would hopefully be some log entries when it
fails.  Are there any unusual zookeeper entries in the /admin/delete_topics
path or in the other /admin folders?

Does the topic name still exist in zookeeper under /config/topics?  If so,
that should probably deleted as well.

Carl

On Thu, Jul 20, 2017 at 6:42 PM, Chris Neal  wrote:

> Delete is definitely there.  The delete worked fine, based on the fact that
> there is nothing in Zookeeper, and that the controller reported that the
> delete was successful, it's just something seems to have gotten out of
> sync.
>
> delete.topic.enabled is true.  I've successfully deleted topics in the
> past, so I know it *should* work. :)
>
> I also had already checked in Zookeeper, and there is no directory for the
> topics under /brokers/topics  Very strange indeed.
>
> If I just remove the log directories from the filesystem, is that enough to
> get the broker to stop asking about the topics?  I would guess there would
> need to be more than just that, but I could be wrong.
>
> Thanks guys for the suggestions though!
>
> On Thu, Jul 20, 2017 at 8:19 PM, Stephen Powis 
> wrote:
>
> > I could be totally wrong, but I seem to recall that delete wasn't fully
> > implemented in 0.8.x?
> >
> > On Fri, Jul 21, 2017 at 10:10 AM, Carl Haferd
>  > >
> > wrote:
> >
> > > Chris,
> > >
> > > You could first check to make sure that delete.topic.enable is true and
> > try
> > > deleting again if not.  If that doesn't work with 0.8.1.1 you might
> need
> > to
> > > manually remove the topic's log files from the configured log.dirs
> folder
> > > on each broker in addition to removing the topic's zookeeper path.
> > >
> > > Carl
> > >
> > > On Thu, Jul 20, 2017 at 10:06 AM, Chris Neal  wrote:
> > >
> > > > Hi all,
> > > >
> > > > I have a weird situation here.  I have deleted a few topics on my
> > 0.8.1.1
> > > > cluster (old, I know...).  The deletes succeeded according to the
> > > > controller.log:
> > > >
> > > > [2017-07-20 16:40:31,175] INFO [TopicChangeListener on Controller 1]:
> > New
> > > > topics: [Set()], deleted topics:
> > > > [Set(perf_doorway-supplier-adapter-uat_raw)], new partition replica
> > > > assignment [Map()]
> > > > (kafka.controller.PartitionStateMachine$TopicChangeListener)
> > > > [2017-07-20 16:40:33,507] INFO [TopicChangeListener on Controller 1]:
> > New
> > > > topics: [Set()], deleted topics:
> > > > [Set(perf_doorway-supplier-scheduler-uat_raw)], new partition
> replica
> > > > assignment [Map()]
> > > > (kafka.controller.PartitionStateMachine$TopicChangeListener)
> > > > [2017-07-20 16:40:36,504] INFO [TopicChangeListener on Controller 1]:
> > New
> > > > topics: [Set()], deleted topics: [Set(perf_gocontent-uat_raw)], new
> > > > partition replica assignment [Map()]
> > > > (kafka.controller.PartitionStateMachine$TopicChangeListener)
> > > > [2017-07-20 16:40:38,290] INFO [TopicChangeListener on Controller 1]:
> > New
> > > > topics: [Set()], deleted topics: [Set(perf_goplatform-uat_raw)], new
> > > > partition replica assignment [Map()]
> > > > (kafka.controller.PartitionStateMachine$TopicChangeListener)
> > > >
> > > > I query Zookeeper and the path is not there under /brokers/topics as
> > > well.
> > > >
> > > > But, one of the nodes in my cluster continues to try and use them:
> > > >
> > > > [2017-07-20 17:04:36,723] ERROR Conditional update of path
> > > > /brokers/topics/perf_doorway-supplier-scheduler-uat_raw/
> > > partitions/3/state
> > > > with data
> > > > {"controller_epoch":34,"leader":1,"version":1,"leader_
> > > > epoch":2,"isr":[1,0]}
> > > > and expected version 69 failed due to
> > > > org.apache.zookeeper.KeeperException$NoNodeException:
> KeeperErrorCode
> > =
> > > > NoNode for
> > > > /brokers/topics/perf_doorway-supplier-scheduler-uat_raw/
> > > partitions/3/state
> > > > (kafka.utils.ZkUtils$)
> > > > [2017-07-20 17:04:36,723] INFO Partition
> > > > [perf_doorway-supplier-scheduler-uat_raw,3] on broker 1: Cached
> > > zkVersion
> > > > [69] not equal to that in zookeeper, skip updating ISR
> > > > (kafka.cluster.Partition)
> > > > [2017-07-20 17:04:36,723] INFO Partition
> > > > [perf_doorway-supplier-scheduler-uat_raw,3] on broker 1: Cached
> > > zkVersion
> > > > [69] not equal to that in zookeeper, skip updating ISR
> > > > (kafka.cluster.Partition)
> > > > [2017-07-20 17:04:36,764] INFO Partition [perf_goplatform-uat_raw,2]
> on
> > > > broker 1: Shrinking ISR for partition [perf_goplatform-uat_raw,2]
> from
> > > 1,0
> > > > to 1 (kafka.cluster.Partition)
> > > > [2017-07-20 17:04:36,764] INFO Partition [perf_goplatform-uat_raw,2]
> on
> > > > broker 1: Shrinking ISR for partition [perf_goplatform-uat_raw,2]
> from
> > > 1,0
> > > > to 1 (kafka.cluster.Partition)
> > > > [2017-07-20 17:04:36,765] ERROR Conditional update of path
> > > > /brokers/topics/perf_goplatform-uat_raw/partitions/2/state with data
> > > > {"controller_epoch":34,"leader":1,"version":1,"leader_
> > > epoch":2,"isr"

Re: Help please: Topics deleted, but Kafka continues to try and sync them with Zookeeper

2017-07-20 Thread Chris Neal
Delete is definitely there.  The delete worked fine, based on the fact that
there is nothing in Zookeeper, and that the controller reported that the
delete was successful, it's just something seems to have gotten out of
sync.

delete.topic.enabled is true.  I've successfully deleted topics in the
past, so I know it *should* work. :)

I also had already checked in Zookeeper, and there is no directory for the
topics under /brokers/topics  Very strange indeed.

If I just remove the log directories from the filesystem, is that enough to
get the broker to stop asking about the topics?  I would guess there would
need to be more than just that, but I could be wrong.

Thanks guys for the suggestions though!

On Thu, Jul 20, 2017 at 8:19 PM, Stephen Powis 
wrote:

> I could be totally wrong, but I seem to recall that delete wasn't fully
> implemented in 0.8.x?
>
> On Fri, Jul 21, 2017 at 10:10 AM, Carl Haferd  >
> wrote:
>
> > Chris,
> >
> > You could first check to make sure that delete.topic.enable is true and
> try
> > deleting again if not.  If that doesn't work with 0.8.1.1 you might need
> to
> > manually remove the topic's log files from the configured log.dirs folder
> > on each broker in addition to removing the topic's zookeeper path.
> >
> > Carl
> >
> > On Thu, Jul 20, 2017 at 10:06 AM, Chris Neal  wrote:
> >
> > > Hi all,
> > >
> > > I have a weird situation here.  I have deleted a few topics on my
> 0.8.1.1
> > > cluster (old, I know...).  The deletes succeeded according to the
> > > controller.log:
> > >
> > > [2017-07-20 16:40:31,175] INFO [TopicChangeListener on Controller 1]:
> New
> > > topics: [Set()], deleted topics:
> > > [Set(perf_doorway-supplier-adapter-uat_raw)], new partition replica
> > > assignment [Map()]
> > > (kafka.controller.PartitionStateMachine$TopicChangeListener)
> > > [2017-07-20 16:40:33,507] INFO [TopicChangeListener on Controller 1]:
> New
> > > topics: [Set()], deleted topics:
> > > [Set(perf_doorway-supplier-scheduler-uat_raw)], new partition replica
> > > assignment [Map()]
> > > (kafka.controller.PartitionStateMachine$TopicChangeListener)
> > > [2017-07-20 16:40:36,504] INFO [TopicChangeListener on Controller 1]:
> New
> > > topics: [Set()], deleted topics: [Set(perf_gocontent-uat_raw)], new
> > > partition replica assignment [Map()]
> > > (kafka.controller.PartitionStateMachine$TopicChangeListener)
> > > [2017-07-20 16:40:38,290] INFO [TopicChangeListener on Controller 1]:
> New
> > > topics: [Set()], deleted topics: [Set(perf_goplatform-uat_raw)], new
> > > partition replica assignment [Map()]
> > > (kafka.controller.PartitionStateMachine$TopicChangeListener)
> > >
> > > I query Zookeeper and the path is not there under /brokers/topics as
> > well.
> > >
> > > But, one of the nodes in my cluster continues to try and use them:
> > >
> > > [2017-07-20 17:04:36,723] ERROR Conditional update of path
> > > /brokers/topics/perf_doorway-supplier-scheduler-uat_raw/
> > partitions/3/state
> > > with data
> > > {"controller_epoch":34,"leader":1,"version":1,"leader_
> > > epoch":2,"isr":[1,0]}
> > > and expected version 69 failed due to
> > > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode
> =
> > > NoNode for
> > > /brokers/topics/perf_doorway-supplier-scheduler-uat_raw/
> > partitions/3/state
> > > (kafka.utils.ZkUtils$)
> > > [2017-07-20 17:04:36,723] INFO Partition
> > > [perf_doorway-supplier-scheduler-uat_raw,3] on broker 1: Cached
> > zkVersion
> > > [69] not equal to that in zookeeper, skip updating ISR
> > > (kafka.cluster.Partition)
> > > [2017-07-20 17:04:36,723] INFO Partition
> > > [perf_doorway-supplier-scheduler-uat_raw,3] on broker 1: Cached
> > zkVersion
> > > [69] not equal to that in zookeeper, skip updating ISR
> > > (kafka.cluster.Partition)
> > > [2017-07-20 17:04:36,764] INFO Partition [perf_goplatform-uat_raw,2] on
> > > broker 1: Shrinking ISR for partition [perf_goplatform-uat_raw,2] from
> > 1,0
> > > to 1 (kafka.cluster.Partition)
> > > [2017-07-20 17:04:36,764] INFO Partition [perf_goplatform-uat_raw,2] on
> > > broker 1: Shrinking ISR for partition [perf_goplatform-uat_raw,2] from
> > 1,0
> > > to 1 (kafka.cluster.Partition)
> > > [2017-07-20 17:04:36,765] ERROR Conditional update of path
> > > /brokers/topics/perf_goplatform-uat_raw/partitions/2/state with data
> > > {"controller_epoch":34,"leader":1,"version":1,"leader_
> > epoch":2,"isr":[1]}
> > > and expected version 70 failed due to
> > > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode
> =
> > > NoNode for /brokers/topics/perf_goplatform-uat_raw/partitions/2/state
> > > (kafka.utils.ZkUtils$)
> > > [2017-07-20 17:04:36,765] ERROR Conditional update of path
> > > /brokers/topics/perf_goplatform-uat_raw/partitions/2/state with data
> > > {"controller_epoch":34,"leader":1,"version":1,"leader_
> > epoch":2,"isr":[1]}
> > > and expected version 70 failed due to
> > > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode
> =
> > > NoNod

Re: Help please: Topics deleted, but Kafka continues to try and sync them with Zookeeper

2017-07-20 Thread Stephen Powis
I could be totally wrong, but I seem to recall that delete wasn't fully
implemented in 0.8.x?

On Fri, Jul 21, 2017 at 10:10 AM, Carl Haferd 
wrote:

> Chris,
>
> You could first check to make sure that delete.topic.enable is true and try
> deleting again if not.  If that doesn't work with 0.8.1.1 you might need to
> manually remove the topic's log files from the configured log.dirs folder
> on each broker in addition to removing the topic's zookeeper path.
>
> Carl
>
> On Thu, Jul 20, 2017 at 10:06 AM, Chris Neal  wrote:
>
> > Hi all,
> >
> > I have a weird situation here.  I have deleted a few topics on my 0.8.1.1
> > cluster (old, I know...).  The deletes succeeded according to the
> > controller.log:
> >
> > [2017-07-20 16:40:31,175] INFO [TopicChangeListener on Controller 1]: New
> > topics: [Set()], deleted topics:
> > [Set(perf_doorway-supplier-adapter-uat_raw)], new partition replica
> > assignment [Map()]
> > (kafka.controller.PartitionStateMachine$TopicChangeListener)
> > [2017-07-20 16:40:33,507] INFO [TopicChangeListener on Controller 1]: New
> > topics: [Set()], deleted topics:
> > [Set(perf_doorway-supplier-scheduler-uat_raw)], new partition replica
> > assignment [Map()]
> > (kafka.controller.PartitionStateMachine$TopicChangeListener)
> > [2017-07-20 16:40:36,504] INFO [TopicChangeListener on Controller 1]: New
> > topics: [Set()], deleted topics: [Set(perf_gocontent-uat_raw)], new
> > partition replica assignment [Map()]
> > (kafka.controller.PartitionStateMachine$TopicChangeListener)
> > [2017-07-20 16:40:38,290] INFO [TopicChangeListener on Controller 1]: New
> > topics: [Set()], deleted topics: [Set(perf_goplatform-uat_raw)], new
> > partition replica assignment [Map()]
> > (kafka.controller.PartitionStateMachine$TopicChangeListener)
> >
> > I query Zookeeper and the path is not there under /brokers/topics as
> well.
> >
> > But, one of the nodes in my cluster continues to try and use them:
> >
> > [2017-07-20 17:04:36,723] ERROR Conditional update of path
> > /brokers/topics/perf_doorway-supplier-scheduler-uat_raw/
> partitions/3/state
> > with data
> > {"controller_epoch":34,"leader":1,"version":1,"leader_
> > epoch":2,"isr":[1,0]}
> > and expected version 69 failed due to
> > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
> > NoNode for
> > /brokers/topics/perf_doorway-supplier-scheduler-uat_raw/
> partitions/3/state
> > (kafka.utils.ZkUtils$)
> > [2017-07-20 17:04:36,723] INFO Partition
> > [perf_doorway-supplier-scheduler-uat_raw,3] on broker 1: Cached
> zkVersion
> > [69] not equal to that in zookeeper, skip updating ISR
> > (kafka.cluster.Partition)
> > [2017-07-20 17:04:36,723] INFO Partition
> > [perf_doorway-supplier-scheduler-uat_raw,3] on broker 1: Cached
> zkVersion
> > [69] not equal to that in zookeeper, skip updating ISR
> > (kafka.cluster.Partition)
> > [2017-07-20 17:04:36,764] INFO Partition [perf_goplatform-uat_raw,2] on
> > broker 1: Shrinking ISR for partition [perf_goplatform-uat_raw,2] from
> 1,0
> > to 1 (kafka.cluster.Partition)
> > [2017-07-20 17:04:36,764] INFO Partition [perf_goplatform-uat_raw,2] on
> > broker 1: Shrinking ISR for partition [perf_goplatform-uat_raw,2] from
> 1,0
> > to 1 (kafka.cluster.Partition)
> > [2017-07-20 17:04:36,765] ERROR Conditional update of path
> > /brokers/topics/perf_goplatform-uat_raw/partitions/2/state with data
> > {"controller_epoch":34,"leader":1,"version":1,"leader_
> epoch":2,"isr":[1]}
> > and expected version 70 failed due to
> > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
> > NoNode for /brokers/topics/perf_goplatform-uat_raw/partitions/2/state
> > (kafka.utils.ZkUtils$)
> > [2017-07-20 17:04:36,765] ERROR Conditional update of path
> > /brokers/topics/perf_goplatform-uat_raw/partitions/2/state with data
> > {"controller_epoch":34,"leader":1,"version":1,"leader_
> epoch":2,"isr":[1]}
> > and expected version 70 failed due to
> > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
> > NoNode for /brokers/topics/perf_goplatform-uat_raw/partitions/2/state
> > (kafka.utils.ZkUtils$)
> > [2017-07-20 17:04:36,765] INFO Partition [perf_goplatform-uat_raw,2] on
> > broker 1: Cached zkVersion [70] not equal to that in zookeeper, skip
> > updating ISR (kafka.cluster.Partition)
> > [2017-07-20 17:04:36,765] INFO Partition [perf_goplatform-uat_raw,2] on
> > broker 1: Cached zkVersion [70] not equal to that in zookeeper, skip
> > updating ISR (kafka.cluster.Partition)
> > [2017-07-20 17:04:36,981] INFO Partition [perf_gocontent-uat_raw,1] on
> > broker 1: Shrinking ISR for partition [perf_gocontent-uat_raw,1] from 1,0
> > to 1 (kafka.cluster.Partition)
> > [2017-07-20 17:04:36,981] INFO Partition [perf_gocontent-uat_raw,1] on
> > broker 1: Shrinking ISR for partition [perf_gocontent-uat_raw,1] from 1,0
> > to 1 (kafka.cluster.Partition)
> > [2017-07-20 17:04:36,988] ERROR Conditional update of path
> > /brokers/topics/perf_gocontent-uat_raw/partitions/1/sta

Re: Help please: Topics deleted, but Kafka continues to try and sync them with Zookeeper

2017-07-20 Thread Carl Haferd
Chris,

You could first check to make sure that delete.topic.enable is true and try
deleting again if not.  If that doesn't work with 0.8.1.1 you might need to
manually remove the topic's log files from the configured log.dirs folder
on each broker in addition to removing the topic's zookeeper path.

Carl

On Thu, Jul 20, 2017 at 10:06 AM, Chris Neal  wrote:

> Hi all,
>
> I have a weird situation here.  I have deleted a few topics on my 0.8.1.1
> cluster (old, I know...).  The deletes succeeded according to the
> controller.log:
>
> [2017-07-20 16:40:31,175] INFO [TopicChangeListener on Controller 1]: New
> topics: [Set()], deleted topics:
> [Set(perf_doorway-supplier-adapter-uat_raw)], new partition replica
> assignment [Map()]
> (kafka.controller.PartitionStateMachine$TopicChangeListener)
> [2017-07-20 16:40:33,507] INFO [TopicChangeListener on Controller 1]: New
> topics: [Set()], deleted topics:
> [Set(perf_doorway-supplier-scheduler-uat_raw)], new partition replica
> assignment [Map()]
> (kafka.controller.PartitionStateMachine$TopicChangeListener)
> [2017-07-20 16:40:36,504] INFO [TopicChangeListener on Controller 1]: New
> topics: [Set()], deleted topics: [Set(perf_gocontent-uat_raw)], new
> partition replica assignment [Map()]
> (kafka.controller.PartitionStateMachine$TopicChangeListener)
> [2017-07-20 16:40:38,290] INFO [TopicChangeListener on Controller 1]: New
> topics: [Set()], deleted topics: [Set(perf_goplatform-uat_raw)], new
> partition replica assignment [Map()]
> (kafka.controller.PartitionStateMachine$TopicChangeListener)
>
> I query Zookeeper and the path is not there under /brokers/topics as well.
>
> But, one of the nodes in my cluster continues to try and use them:
>
> [2017-07-20 17:04:36,723] ERROR Conditional update of path
> /brokers/topics/perf_doorway-supplier-scheduler-uat_raw/partitions/3/state
> with data
> {"controller_epoch":34,"leader":1,"version":1,"leader_
> epoch":2,"isr":[1,0]}
> and expected version 69 failed due to
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
> NoNode for
> /brokers/topics/perf_doorway-supplier-scheduler-uat_raw/partitions/3/state
> (kafka.utils.ZkUtils$)
> [2017-07-20 17:04:36,723] INFO Partition
> [perf_doorway-supplier-scheduler-uat_raw,3] on broker 1: Cached zkVersion
> [69] not equal to that in zookeeper, skip updating ISR
> (kafka.cluster.Partition)
> [2017-07-20 17:04:36,723] INFO Partition
> [perf_doorway-supplier-scheduler-uat_raw,3] on broker 1: Cached zkVersion
> [69] not equal to that in zookeeper, skip updating ISR
> (kafka.cluster.Partition)
> [2017-07-20 17:04:36,764] INFO Partition [perf_goplatform-uat_raw,2] on
> broker 1: Shrinking ISR for partition [perf_goplatform-uat_raw,2] from 1,0
> to 1 (kafka.cluster.Partition)
> [2017-07-20 17:04:36,764] INFO Partition [perf_goplatform-uat_raw,2] on
> broker 1: Shrinking ISR for partition [perf_goplatform-uat_raw,2] from 1,0
> to 1 (kafka.cluster.Partition)
> [2017-07-20 17:04:36,765] ERROR Conditional update of path
> /brokers/topics/perf_goplatform-uat_raw/partitions/2/state with data
> {"controller_epoch":34,"leader":1,"version":1,"leader_epoch":2,"isr":[1]}
> and expected version 70 failed due to
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
> NoNode for /brokers/topics/perf_goplatform-uat_raw/partitions/2/state
> (kafka.utils.ZkUtils$)
> [2017-07-20 17:04:36,765] ERROR Conditional update of path
> /brokers/topics/perf_goplatform-uat_raw/partitions/2/state with data
> {"controller_epoch":34,"leader":1,"version":1,"leader_epoch":2,"isr":[1]}
> and expected version 70 failed due to
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
> NoNode for /brokers/topics/perf_goplatform-uat_raw/partitions/2/state
> (kafka.utils.ZkUtils$)
> [2017-07-20 17:04:36,765] INFO Partition [perf_goplatform-uat_raw,2] on
> broker 1: Cached zkVersion [70] not equal to that in zookeeper, skip
> updating ISR (kafka.cluster.Partition)
> [2017-07-20 17:04:36,765] INFO Partition [perf_goplatform-uat_raw,2] on
> broker 1: Cached zkVersion [70] not equal to that in zookeeper, skip
> updating ISR (kafka.cluster.Partition)
> [2017-07-20 17:04:36,981] INFO Partition [perf_gocontent-uat_raw,1] on
> broker 1: Shrinking ISR for partition [perf_gocontent-uat_raw,1] from 1,0
> to 1 (kafka.cluster.Partition)
> [2017-07-20 17:04:36,981] INFO Partition [perf_gocontent-uat_raw,1] on
> broker 1: Shrinking ISR for partition [perf_gocontent-uat_raw,1] from 1,0
> to 1 (kafka.cluster.Partition)
> [2017-07-20 17:04:36,988] ERROR Conditional update of path
> /brokers/topics/perf_gocontent-uat_raw/partitions/1/state with data
> {"controller_epoch":34,"leader":1,"version":1,"leader_epoch":4,"isr":[1]}
> and expected version 90 failed due to
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
> NoNode for /brokers/topics/perf_gocontent-uat_raw/partitions/1/state
> (kafka.utils.ZkUtils$)
> [2017-07-20 17:04:36,988] ERROR Conditional update 

Re: [DISCUSS] KIP-177 Consumer perf tool should count rebalance time

2017-07-20 Thread Guozhang Wang
I agree with Jason that we are just adding a new field so the impacted
parser tools maybe limited. This additional information would be very
useful.

Guozhang

On Wed, Jul 19, 2017 at 11:57 AM, Jason Gustafson 
wrote:

> Ismael, I debated that also, but the main point was to make users aware of
> the rebalance latency (with KIP-134 in mind). I'm guessing no one would
> notice if it required another option. Note that the KIP does preserve the
> existing fields (and in the same order), so if it is parsed as generic csv
> data, it should be fine. But yeah, it could break some dumb parsers. In
> general, I think we should at least allow ourselves compatible changes
> given the output format that we have chosen for a tool.
>
> -Jason
>
> On Wed, Jul 19, 2017 at 7:54 AM, Ismael Juma  wrote:
>
> > I think this is a good chance although it's unfortunate that it's likely
> to
> > break code that is parsing the output of the performance tool. Would it
> > make sense to only enable this if an option is provided?
> >
> > Ismael
> >
> > On Mon, Jul 17, 2017 at 3:41 PM, Jason Gustafson 
> > wrote:
> >
> > > +Users
> > >
> > > Thanks for the KIP. I think tracking the rebalance time separately will
> > > help resolve some confusion about the performance results given the
> > > rebalance delay in KIP-134. And it seems generally useful to know how
> > much
> > > overhead is coming from the rebalance in any case.
> > >
> > > -Jason
> > >
> > > On Thu, Jul 13, 2017 at 4:15 PM, Hu Xi  wrote:
> > >
> > > > Hi all, I opened up a new KIP > > > confluence/display/ARIES/KIP-177%3A+Consumer+perf+tool+
> > > > should+count+rebalance+time> (KIP-177) concerning consumer perf tool
> > > > counting and showing rebalance time in the output. Be free to leave
> > your
> > > > comments here. Thanks in advance.
> > > >
> > >
> >
>



-- 
-- Guozhang


Re: Kafka Streams: why aren't offsets being committed?

2017-07-20 Thread Dmitry Minkovsky
Hi Bill,

> When you say "even if the application has not had data for a long time" do
you have a rough idea of how long?

Minutes, hours

> What is the value of  your
"auto.offset.reset"  configuration?

I don't specify it explicitly, but the ConsumerConfig logs indicate
"auto.offset.reset = earliest" for all consumers the application creates.

Thank you,
Dmitry


On Thu, Jul 20, 2017 at 8:07 PM, Bill Bejeck  wrote:

> Hi Dmitry,
>
> When you say "even if the application has not had data for a long time" do
> you have a rough idea of how long?  What is the value of  your
> "auto.offset.reset"  configuration?
>
> Thanks,
> Bill
>
> On Thu, Jul 20, 2017 at 6:03 PM, Dmitry Minkovsky 
> wrote:
>
> > My Streams application is configured to commit offsets every 250ms:
> >
> > Properties streamsConfig = new Properties();
> > streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 250);
> >
> >
> > However, every time I restart my application, records that have already
> > been processed are re-processed, even if the application has not had data
> > for a long time.
> >
> > My guess is that offsets are committed only when all tasks in the
> topology
> > have received input. Is this what's happening?
> >
> >
> >
> > Thank you,
> > Dmitry
> >
>


Re: Kafka Streams: why aren't offsets being committed?

2017-07-20 Thread Bill Bejeck
Hi Dmitry,

When you say "even if the application has not had data for a long time" do
you have a rough idea of how long?  What is the value of  your
"auto.offset.reset"  configuration?

Thanks,
Bill

On Thu, Jul 20, 2017 at 6:03 PM, Dmitry Minkovsky 
wrote:

> My Streams application is configured to commit offsets every 250ms:
>
> Properties streamsConfig = new Properties();
> streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 250);
>
>
> However, every time I restart my application, records that have already
> been processed are re-processed, even if the application has not had data
> for a long time.
>
> My guess is that offsets are committed only when all tasks in the topology
> have received input. Is this what's happening?
>
>
>
> Thank you,
> Dmitry
>


Re: Consumer throughput drop

2017-07-20 Thread Apurva Mehta
Hi Ovidu,

The see-saw behavior is inevitable with linux when you have concurrent
reads and writes. However, tuning the following two settings may help
achieve more stable performance (from Jay's link):


> *dirty_ratio*Defines a percentage value. Writeout of dirty data begins
> (via *pdflush*) when dirty data comprises this percentage of total system
> memory. The default value is 20.
> Red Hat recommends a slightly lower value of 15 for database workloads.
>


>
> *dirty_background_ratio*Defines a percentage value. Writeout of dirty
> data begins in the background (via *pdflush*) when dirty data comprises
> this percentage of total memory. The default value is 10. For database
> workloads, Red Hat recommends a lower value of 3.


Thanks,
Apurva


On Thu, Jul 20, 2017 at 12:25 PM, Ovidiu-Cristian MARCU <
ovidiu-cristian.ma...@inria.fr> wrote:

> Yes, I’m using Debian Jessie 2.6 installed on this hardware [1].
>
> It is also my understanding that Kafka is based on system’s cache (Linux
> in this case) which is based on Clock-Pro for page replacement policy,
> doing complex things for general workloads. I will check the tuning
> parameters, but I was hoping for some advices to avoid disk at all when
> reading, considering the system's cache is used completely by Kafka and is
> huge ~128GB - that is to tune Clock-Pro to be smarter when used for
> streaming access patterns.
>
> Thanks,
> Ovidiu
>
> [1] https://www.grid5000.fr/mediawiki/index.php/Rennes:
> Hardware#Dell_Poweredge_R630_.28paravance.29  mediawiki/index.php/Rennes:Hardware#Dell_Poweredge_R630_.28paravance.29>
>
> > On 20 Jul 2017, at 21:06, Jay Kreps  wrote:
> >
> > I suspect this is on Linux right?
> >
> > The way Linux works is it uses a percent of memory to buffer new writes,
> at a certain point it thinks it has too much buffered data and it gives
> high priority to writing that out. The good news about this is that the
> writes are very linear, well layed out, and high-throughput. The problem is
> that it leads to a bit of see-saw behavior.
> >
> > Now obviously the drop in performance isn't wrong. When your disk is
> writing data out it is doing work and obviously the read throughput will be
> higher when you are just reading and not writing then when you are doing
> both reading and writing simultaneously. So obviously you can't get the
> no-writing performance when you are also writing (unless you add I/O
> capacity).
> >
> > But still these big see-saws in performance are not ideal. You'd rather
> have more constant performance all the time rather than have linux bounce
> back and forth from writing nothing and then frantically writing full bore.
> Fortunately linux provides a set of pagecache tuning parameters that let
> you control this a bit.
> >
> > I think these docs cover some of the parameters:
> > https://access.redhat.com/documentation/en-US/Red_Hat_
> Enterprise_Linux/6/html/Performance_Tuning_Guide/s-memory-tunables.html <
> https://access.redhat.com/documentation/en-US/Red_Hat_
> Enterprise_Linux/6/html/Performance_Tuning_Guide/s-memory-tunables.html>
> >
> > -Jay
> >
> > On Thu, Jul 20, 2017 at 10:24 AM, Ovidiu-Cristian MARCU <
> ovidiu-cristian.ma...@inria.fr >
> wrote:
> > Hi guys,
> >
> > I’m relatively new to Kafka’s world. I have an issue I describe below,
> maybe you can help me understand this behaviour.
> >
> > I’m running a benchmark using the following setup: one producer sends
> data to a topic and concurrently one consumer pulls and writes it to
> another topic.
> > Measuring the consumer throughput, I observe values around 500K
> records/s only until the system’s cache gets filled - from this moment the
> consumer throughout drops to ~200K (2.5 times lower).
> > Looking at disk usage, I observe disk read I/O which corresponds to the
> moment the consumer throughout drops.
> > After some time, I kill the producer and immediately I observe the
> consumer throughput goes up to initial values ~ 500K records/s.
> >
> > What can I do to avoid this throughput drop?
> >
> > Attached an image showing disk I/O and CPU usage. I have about 128GB RAM
> on that server which gets filled at time ~2300.
> >
> > Thanks,
> > Ovidiu
> >
> > 
> >
>
>


Re: [DISCUSS] KIP-175: Additional '--describe' views for ConsumerGroupCommand

2017-07-20 Thread Vahid S Hashemian
Hi Jason,

Regarding your comment about the current limitation on the information 
returned for a consumer group, do you think it's worth expanding the API 
to return some additional info (e.g. generation id, group leader, ...)?

Thanks.
--Vahid




From:   Jason Gustafson 
To: Kafka Users 
Cc: d...@kafka.apache.org
Date:   07/19/2017 01:46 PM
Subject:Re: [DISCUSS] KIP-175: Additional '--describe' views for 
ConsumerGroupCommand



Hey Vahid,

Thanks for the updates. Looks pretty good. A couple comments:

1. For the --state option, should we use the same column-oriented format 
as
we use for the other options? I realize there would only be one row, but
the inconsistency is a little vexing. Also, since this tool is working 
only
with consumer groups, perhaps we can leave out "protocol type" and use
"assignment strategy" in place of "protocol"? It would be nice to also
include the group generation, but it seems we didn't add that to the
DescribeGroup response. Perhaps we could also include a count of the 
number
of members?
2. It's a little annoying that --subscription and --members share so much
in common. Maybe we could drop --subscription and use a --verbose flag to
control whether or not to include the subscription and perhaps the
assignment as well? Not sure if that's more annoying or less, but maybe a
generic --verbose will be useful in other contexts.

As for your question on whether we need the --offsets option at all, I
don't have a strong opinion, but it seems to make the command semantics a
little more consistent.

-Jason

On Tue, Jul 18, 2017 at 12:56 PM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> Hi Jason,
>
> I updated the KIP based on your earlier suggestions:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 175%3A+Additional+%27--describe%27+views+for+ConsumerGroupCommand
> The only thing I am wondering at this point is whether it's worth to 
have
> a `--describe --offsets` option that behaves exactly like `--describe`.
>
> Thanks.
> --Vahid
>
>
>
> From:   "Vahid S Hashemian" 
> To: d...@kafka.apache.org
> Cc: Kafka Users 
> Date:   07/17/2017 03:24 PM
> Subject:Re: [DISCUSS] KIP-175: Additional '--describe' views for
> ConsumerGroupCommand
>
>
>
> Hi Jason,
>
> Thanks for your quick feedback. Your suggestions seem reasonable.
> I'll start updating the KIP accordingly and will send out another note
> when it's ready.
>
> Regards.
> --Vahid
>
>
>
>
> From:   Jason Gustafson 
> To: d...@kafka.apache.org
> Cc: Kafka Users 
> Date:   07/17/2017 02:11 PM
> Subject:Re: [DISCUSS] KIP-175: Additional '--describe' views for
> ConsumerGroupCommand
>
>
>
> Hey Vahid,
>
> Hmm... If possible, it would be nice to avoid cluttering the default
> option
> too much, especially if it is information which is going to be the same
> for
> all members (such as the generation). My preference would be to use the
> --state option that you've suggested for that info so that we can
> represent
> it more concisely.
>
> The reason I prefer the current output is that it is clear every entry
> corresponds to a partition for which we have committed offset. Entries
> like
> this look strange:
>
> TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET
> LAGCONSUMER-ID
> HOST   CLIENT-ID
> -  -  -   -
> -  consumer4-e173f09d-c761-4f4e-95c7-6fb73bb8fbff
> /127.0.0.1
> consumer4
> -  -  -   -
> -  consumer5-7b80e428-f8ff-43f3-8360-afd1c8ba43ea
> /127.0.0.1
> consumer5
>
> It makes me think that the consumers have committed offsets for an 
unknown
> partition. The --members option seems like a clearer way to communicate
> the
> fact that there are some members with no assigned partitions.
>
> A few additional suggestions:
>
> 1. Maybe we can rename --partitions to --offsets or --committed-offsets
> and
> the output could match the default output (in other words, --offsets is
> treated as the default switch). Seems no harm including the assignment
> information if we have it.
> 2. Along the lines of Onur's comment, it would be nice if the --members
> option included the list of assignment strategies that the consumer 
joined
> with (round-robin, range, etc). This list should always be small.
> 3. Thinking a little more, I'm not sure how necessary a --topics option
> is.
> The --partitions (or --offsets) option already shows the current
> assignment. Maybe --topics could be --subscription and just list the
> topics
> that the members subscribed to?
>
> Thanks,
> Jason
>
> On Mon, Jul 17, 2017 at 11:04 AM, Vahid S Hashemian <
> vahidhashem...@us.ibm.com> wrote:
>
> > Jason, Onur, thank you for reviewing the KIP.
> >
> > Regarding the default `--describe` option, so far there have been a 
few
> > suggestions that conflict a bit. Here are the suggestions:
> > - Keep the current behavior exactly as is (Edo

Kafka Streams: why aren't offsets being committed?

2017-07-20 Thread Dmitry Minkovsky
My Streams application is configured to commit offsets every 250ms:

Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 250);


However, every time I restart my application, records that have already
been processed are re-processed, even if the application has not had data
for a long time.

My guess is that offsets are committed only when all tasks in the topology
have received input. Is this what's happening?



Thank you,
Dmitry


Re: Spring release using apache clients 11

2017-07-20 Thread David Espinosa
Thanks Rajini!

El dia 20 jul. 2017 18:41, "Rajini Sivaram"  va
escriure:

> David,
>
> The release plans are here: https://github.com/spring-
> projects/spring-kafka/
> milestone/20?closed=1
>
> We have already included TX and headers support to the current M3 which is
> planned just after the next SF 5.0 RC3, which is expected tomorrow.
>
> Regards,
>
> Rajini
>
> On Thu, Jul 20, 2017 at 5:01 PM, David Espinosa  wrote:
>
> > Hi, somebody know if we will any spring integration/kafka release soon
> > using apache clients 11?
> >
>


Re: Consumer throughput drop

2017-07-20 Thread Ovidiu-Cristian MARCU
Yes, I’m using Debian Jessie 2.6 installed on this hardware [1].

It is also my understanding that Kafka is based on system’s cache (Linux in 
this case) which is based on Clock-Pro for page replacement policy, doing 
complex things for general workloads. I will check the tuning parameters, but I 
was hoping for some advices to avoid disk at all when reading, considering the 
system's cache is used completely by Kafka and is huge ~128GB - that is to tune 
Clock-Pro to be smarter when used for streaming access patterns.

Thanks,
Ovidiu

[1] 
https://www.grid5000.fr/mediawiki/index.php/Rennes:Hardware#Dell_Poweredge_R630_.28paravance.29
 


> On 20 Jul 2017, at 21:06, Jay Kreps  wrote:
> 
> I suspect this is on Linux right?
> 
> The way Linux works is it uses a percent of memory to buffer new writes, at a 
> certain point it thinks it has too much buffered data and it gives high 
> priority to writing that out. The good news about this is that the writes are 
> very linear, well layed out, and high-throughput. The problem is that it 
> leads to a bit of see-saw behavior.
> 
> Now obviously the drop in performance isn't wrong. When your disk is writing 
> data out it is doing work and obviously the read throughput will be higher 
> when you are just reading and not writing then when you are doing both 
> reading and writing simultaneously. So obviously you can't get the no-writing 
> performance when you are also writing (unless you add I/O capacity).
> 
> But still these big see-saws in performance are not ideal. You'd rather have 
> more constant performance all the time rather than have linux bounce back and 
> forth from writing nothing and then frantically writing full bore. 
> Fortunately linux provides a set of pagecache tuning parameters that let you 
> control this a bit. 
> 
> I think these docs cover some of the parameters:
> https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/6/html/Performance_Tuning_Guide/s-memory-tunables.html
>  
> 
> 
> -Jay
> 
> On Thu, Jul 20, 2017 at 10:24 AM, Ovidiu-Cristian MARCU 
> mailto:ovidiu-cristian.ma...@inria.fr>> 
> wrote:
> Hi guys,
> 
> I’m relatively new to Kafka’s world. I have an issue I describe below, maybe 
> you can help me understand this behaviour.
> 
> I’m running a benchmark using the following setup: one producer sends data to 
> a topic and concurrently one consumer pulls and writes it to another topic.
> Measuring the consumer throughput, I observe values around 500K records/s 
> only until the system’s cache gets filled - from this moment the consumer 
> throughout drops to ~200K (2.5 times lower).
> Looking at disk usage, I observe disk read I/O which corresponds to the 
> moment the consumer throughout drops.
> After some time, I kill the producer and immediately I observe the consumer 
> throughput goes up to initial values ~ 500K records/s.
> 
> What can I do to avoid this throughput drop?
> 
> Attached an image showing disk I/O and CPU usage. I have about 128GB RAM on 
> that server which gets filled at time ~2300.
> 
> Thanks,
> Ovidiu
> 
> 
> 



Re: Consumer throughput drop

2017-07-20 Thread Jay Kreps
I suspect this is on Linux right?

The way Linux works is it uses a percent of memory to buffer new writes, at
a certain point it thinks it has too much buffered data and it gives high
priority to writing that out. The good news about this is that the writes
are very linear, well layed out, and high-throughput. The problem is that
it leads to a bit of see-saw behavior.

Now obviously the drop in performance isn't wrong. When your disk is
writing data out it is doing work and obviously the read throughput will be
higher when you are just reading and not writing then when you are doing
both reading and writing simultaneously. So obviously you can't get the
no-writing performance when you are also writing (unless you add I/O
capacity).

But still these big see-saws in performance are not ideal. You'd rather
have more constant performance all the time rather than have linux bounce
back and forth from writing nothing and then frantically writing full bore.
Fortunately linux provides a set of pagecache tuning parameters that let
you control this a bit.

I think these docs cover some of the parameters:
https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/6/html/Performance_Tuning_Guide/s-memory-tunables.html

-Jay

On Thu, Jul 20, 2017 at 10:24 AM, Ovidiu-Cristian MARCU <
ovidiu-cristian.ma...@inria.fr> wrote:

> Hi guys,
>
> I’m relatively new to Kafka’s world. I have an issue I describe below,
> maybe you can help me understand this behaviour.
>
> I’m running a benchmark using the following setup: one producer sends data
> to a topic and concurrently one consumer pulls and writes it to another
> topic.
> Measuring the consumer throughput, I observe values around 500K records/s
> only until the system’s cache gets filled - from this moment the consumer
> throughout drops to ~200K (2.5 times lower).
> Looking at disk usage, I observe disk read I/O which corresponds to the
> moment the consumer throughout drops.
> After some time, I kill the producer and immediately I observe the
> consumer throughput goes up to initial values ~ 500K records/s.
>
> What can I do to avoid this throughput drop?
>
> Attached an image showing disk I/O and CPU usage. I have about 128GB RAM
> on that server which gets filled at time ~2300.
>
> Thanks,
> Ovidiu
>
>


Named Pipe and Kafka Producer

2017-07-20 Thread Milind Vaidya
Hi

I am using named pipe and reading from it using Java and sending events to
Kafka Cluster.

The std out of a process is `tee` ed to

But I am observing data loss. I am yet to debug this issue. I was wondering
if anybody has already interfaced name pipe for sending data to kafka and
what are the things to be taken care of.

Thanks


Help please: Topics deleted, but Kafka continues to try and sync them with Zookeeper

2017-07-20 Thread Chris Neal
Hi all,

I have a weird situation here.  I have deleted a few topics on my 0.8.1.1
cluster (old, I know...).  The deletes succeeded according to the
controller.log:

[2017-07-20 16:40:31,175] INFO [TopicChangeListener on Controller 1]: New
topics: [Set()], deleted topics:
[Set(perf_doorway-supplier-adapter-uat_raw)], new partition replica
assignment [Map()]
(kafka.controller.PartitionStateMachine$TopicChangeListener)
[2017-07-20 16:40:33,507] INFO [TopicChangeListener on Controller 1]: New
topics: [Set()], deleted topics:
[Set(perf_doorway-supplier-scheduler-uat_raw)], new partition replica
assignment [Map()]
(kafka.controller.PartitionStateMachine$TopicChangeListener)
[2017-07-20 16:40:36,504] INFO [TopicChangeListener on Controller 1]: New
topics: [Set()], deleted topics: [Set(perf_gocontent-uat_raw)], new
partition replica assignment [Map()]
(kafka.controller.PartitionStateMachine$TopicChangeListener)
[2017-07-20 16:40:38,290] INFO [TopicChangeListener on Controller 1]: New
topics: [Set()], deleted topics: [Set(perf_goplatform-uat_raw)], new
partition replica assignment [Map()]
(kafka.controller.PartitionStateMachine$TopicChangeListener)

I query Zookeeper and the path is not there under /brokers/topics as well.

But, one of the nodes in my cluster continues to try and use them:

[2017-07-20 17:04:36,723] ERROR Conditional update of path
/brokers/topics/perf_doorway-supplier-scheduler-uat_raw/partitions/3/state
with data
{"controller_epoch":34,"leader":1,"version":1,"leader_epoch":2,"isr":[1,0]}
and expected version 69 failed due to
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
NoNode for
/brokers/topics/perf_doorway-supplier-scheduler-uat_raw/partitions/3/state
(kafka.utils.ZkUtils$)
[2017-07-20 17:04:36,723] INFO Partition
[perf_doorway-supplier-scheduler-uat_raw,3] on broker 1: Cached zkVersion
[69] not equal to that in zookeeper, skip updating ISR
(kafka.cluster.Partition)
[2017-07-20 17:04:36,723] INFO Partition
[perf_doorway-supplier-scheduler-uat_raw,3] on broker 1: Cached zkVersion
[69] not equal to that in zookeeper, skip updating ISR
(kafka.cluster.Partition)
[2017-07-20 17:04:36,764] INFO Partition [perf_goplatform-uat_raw,2] on
broker 1: Shrinking ISR for partition [perf_goplatform-uat_raw,2] from 1,0
to 1 (kafka.cluster.Partition)
[2017-07-20 17:04:36,764] INFO Partition [perf_goplatform-uat_raw,2] on
broker 1: Shrinking ISR for partition [perf_goplatform-uat_raw,2] from 1,0
to 1 (kafka.cluster.Partition)
[2017-07-20 17:04:36,765] ERROR Conditional update of path
/brokers/topics/perf_goplatform-uat_raw/partitions/2/state with data
{"controller_epoch":34,"leader":1,"version":1,"leader_epoch":2,"isr":[1]}
and expected version 70 failed due to
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
NoNode for /brokers/topics/perf_goplatform-uat_raw/partitions/2/state
(kafka.utils.ZkUtils$)
[2017-07-20 17:04:36,765] ERROR Conditional update of path
/brokers/topics/perf_goplatform-uat_raw/partitions/2/state with data
{"controller_epoch":34,"leader":1,"version":1,"leader_epoch":2,"isr":[1]}
and expected version 70 failed due to
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
NoNode for /brokers/topics/perf_goplatform-uat_raw/partitions/2/state
(kafka.utils.ZkUtils$)
[2017-07-20 17:04:36,765] INFO Partition [perf_goplatform-uat_raw,2] on
broker 1: Cached zkVersion [70] not equal to that in zookeeper, skip
updating ISR (kafka.cluster.Partition)
[2017-07-20 17:04:36,765] INFO Partition [perf_goplatform-uat_raw,2] on
broker 1: Cached zkVersion [70] not equal to that in zookeeper, skip
updating ISR (kafka.cluster.Partition)
[2017-07-20 17:04:36,981] INFO Partition [perf_gocontent-uat_raw,1] on
broker 1: Shrinking ISR for partition [perf_gocontent-uat_raw,1] from 1,0
to 1 (kafka.cluster.Partition)
[2017-07-20 17:04:36,981] INFO Partition [perf_gocontent-uat_raw,1] on
broker 1: Shrinking ISR for partition [perf_gocontent-uat_raw,1] from 1,0
to 1 (kafka.cluster.Partition)
[2017-07-20 17:04:36,988] ERROR Conditional update of path
/brokers/topics/perf_gocontent-uat_raw/partitions/1/state with data
{"controller_epoch":34,"leader":1,"version":1,"leader_epoch":4,"isr":[1]}
and expected version 90 failed due to
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
NoNode for /brokers/topics/perf_gocontent-uat_raw/partitions/1/state
(kafka.utils.ZkUtils$)
[2017-07-20 17:04:36,988] ERROR Conditional update of path
/brokers/topics/perf_gocontent-uat_raw/partitions/1/state with data
{"controller_epoch":34,"leader":1,"version":1,"leader_epoch":4,"isr":[1]}
and expected version 90 failed due to
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
NoNode for /brokers/topics/perf_gocontent-uat_raw/partitions/1/state
(kafka.utils.ZkUtils$)
[2017-07-20 17:04:36,988] INFO Partition [perf_gocontent-uat_raw,1] on
broker 1: Cached zkVersion [90] not equal to that in zookeeper, skip
updating ISR (kafka.cluster.Partition)
[

Re: Spring release using apache clients 11

2017-07-20 Thread Rajini Sivaram
David,

The release plans are here: https://github.com/spring-projects/spring-kafka/
milestone/20?closed=1

We have already included TX and headers support to the current M3 which is
planned just after the next SF 5.0 RC3, which is expected tomorrow.

Regards,

Rajini

On Thu, Jul 20, 2017 at 5:01 PM, David Espinosa  wrote:

> Hi, somebody know if we will any spring integration/kafka release soon
> using apache clients 11?
>


Re: Kafka with Zookeeper behind AWS ELB

2017-07-20 Thread Luigi Tagliamonte
Hello Pradeep,
thank you for sharing your experience, will certainly consider it.

On Thu, Jul 20, 2017 at 9:29 AM, Pradeep Gollakota 
wrote:

> Luigi,
>
> I strongly urge you to consider a 5 node ZK deployment. I've always done
> that in the past for resiliency during maintenance. In a 3 node cluster,
> you can only tolerate one "failure", so if you bring one node down for
> maintenance and another node crashes during said maintenance, your ZK
> cluster is down. All the deployments I've had were 5 nodes of ZK and 5
> nodes of Kafka.
>
> - Pradeep
>
> On Thu, Jul 20, 2017 at 9:12 AM, Luigi Tagliamonte <
> luigi.tagliamont...@gmail.com> wrote:
>
> > Yes Andrey,
> > you can use an ENI without EIP on AWS if you only want a private address.
> >
> > After some consideration, I think that growing the zookeeper cluster more
> > than 3 nodes is really unlikely so I think that I will attach 3 ENI to 3
> > servers in autoscaling and I will configure Kafka in using this 3 IPs.
> > In this way I can get rid of the additional ELB/Haproxy layer, if I will
> > ever need to grow the zk ensemble I will re-engineering the solution.
> >
> > I'm wondering if reusing an old IP on a brand new zk node will create
> > issues in the ensemble.
> > Is anybody here aware of possible drawbacks?
> >
> > On Wed, Jul 19, 2017 at 11:58 PM, Andrey Dyachkov <
> > andrey.dyach...@gmail.com
> > > wrote:
> >
> > > The problem with EIP it is a public ip.
> > > Another option is to have the secondary interface attached to the
> > instance
> > > on start(or a bit later) with the private static ip, but we are
> > > investigating the possibility.
> > > On Wed 19. Jul 2017 at 23:38, Luigi Tagliamonte <
> > > luigi.tagliamont...@gmail.com> wrote:
> > >
> > > > Hello Andrey,
> > > > I see that the ELB is not going to help directly with the bug, but
> > > > introduces a nice layer that makes zookeeper DNS management easier.
> > > > Introducing and ELB I don't have to deal with keep DNS in sync for
> all
> > > the
> > > > servers in the zk ensemble.
> > > > For the moment I can use an HAproxy with EIP and when the bug is
> > solved I
> > > > can move to ELB.
> > > > What do you think about it?
> > > > Regards
> > > > L.
> > > >
> > > > On Wed, Jul 19, 2017 at 2:16 PM, Andrey Dyachkov <
> > > > andrey.dyach...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > I have just posted almost the same question in dev list.
> > > > > Zookeeper client resolves address only once, on start, introducing
> > ELB
> > > > > won't really help here (ELBs can be replaced, which involved ip
> > > change),
> > > > > but I am eager to know if there is a solution for that.
> > > > >
> > > > > On Wed, 19 Jul 2017 at 23:08 Luigi Tagliamonte <
> > > > > luigi.tagliamont...@gmail.com> wrote:
> > > > >
> > > > > > Hello, Users!
> > > > > > I'm designing a Kafka deployment on AWS and it's my first time
> > > working
> > > > > with
> > > > > > Kafka and Zookeeper so I've collected a lot of info so far but
> also
> > > > some
> > > > > > questions that I would love to submit to a much expert audience
> > like
> > > > you.
> > > > > >
> > > > > > I have been experimenting with exhibitor and zookeeper in auto
> > > scaling
> > > > > > group and the exhibitor orchestration seems to work so far.
> > > > > >
> > > > > > I was trying to find a way to configure zookeeper servers in
> Kafka
> > > conf
> > > > > and
> > > > > > do not have to reconfigure them in case a zookeeper node needs to
> > be
> > > > > > replaced/dies, so i taught of course of using DNS but then I read
> > > that
> > > > > > zkclient library used by Kafka has this bug:
> > > > > > https://issues.apache.org/jira/browse/ZOOKEEPER-2184.
> > > > > >
> > > > > > So I'm now thinking about using an ELB in front of the zookeeper
> > > > cluster.
> > > > > > Teorically on how zookeeper client should work there should be no
> > > > problem
> > > > > > but I'm wondering if any of you used that and how is the outcome?
> > > > > >
> > > > > --
> > > > >
> > > > > With great enthusiasm,
> > > > > Andrey
> > > > >
> > > >
> > > --
> > >
> > > With great enthusiasm,
> > > Andrey
> > >
> >
>


Re: Kafka with Zookeeper behind AWS ELB

2017-07-20 Thread Pradeep Gollakota
Luigi,

I strongly urge you to consider a 5 node ZK deployment. I've always done
that in the past for resiliency during maintenance. In a 3 node cluster,
you can only tolerate one "failure", so if you bring one node down for
maintenance and another node crashes during said maintenance, your ZK
cluster is down. All the deployments I've had were 5 nodes of ZK and 5
nodes of Kafka.

- Pradeep

On Thu, Jul 20, 2017 at 9:12 AM, Luigi Tagliamonte <
luigi.tagliamont...@gmail.com> wrote:

> Yes Andrey,
> you can use an ENI without EIP on AWS if you only want a private address.
>
> After some consideration, I think that growing the zookeeper cluster more
> than 3 nodes is really unlikely so I think that I will attach 3 ENI to 3
> servers in autoscaling and I will configure Kafka in using this 3 IPs.
> In this way I can get rid of the additional ELB/Haproxy layer, if I will
> ever need to grow the zk ensemble I will re-engineering the solution.
>
> I'm wondering if reusing an old IP on a brand new zk node will create
> issues in the ensemble.
> Is anybody here aware of possible drawbacks?
>
> On Wed, Jul 19, 2017 at 11:58 PM, Andrey Dyachkov <
> andrey.dyach...@gmail.com
> > wrote:
>
> > The problem with EIP it is a public ip.
> > Another option is to have the secondary interface attached to the
> instance
> > on start(or a bit later) with the private static ip, but we are
> > investigating the possibility.
> > On Wed 19. Jul 2017 at 23:38, Luigi Tagliamonte <
> > luigi.tagliamont...@gmail.com> wrote:
> >
> > > Hello Andrey,
> > > I see that the ELB is not going to help directly with the bug, but
> > > introduces a nice layer that makes zookeeper DNS management easier.
> > > Introducing and ELB I don't have to deal with keep DNS in sync for all
> > the
> > > servers in the zk ensemble.
> > > For the moment I can use an HAproxy with EIP and when the bug is
> solved I
> > > can move to ELB.
> > > What do you think about it?
> > > Regards
> > > L.
> > >
> > > On Wed, Jul 19, 2017 at 2:16 PM, Andrey Dyachkov <
> > > andrey.dyach...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > > I have just posted almost the same question in dev list.
> > > > Zookeeper client resolves address only once, on start, introducing
> ELB
> > > > won't really help here (ELBs can be replaced, which involved ip
> > change),
> > > > but I am eager to know if there is a solution for that.
> > > >
> > > > On Wed, 19 Jul 2017 at 23:08 Luigi Tagliamonte <
> > > > luigi.tagliamont...@gmail.com> wrote:
> > > >
> > > > > Hello, Users!
> > > > > I'm designing a Kafka deployment on AWS and it's my first time
> > working
> > > > with
> > > > > Kafka and Zookeeper so I've collected a lot of info so far but also
> > > some
> > > > > questions that I would love to submit to a much expert audience
> like
> > > you.
> > > > >
> > > > > I have been experimenting with exhibitor and zookeeper in auto
> > scaling
> > > > > group and the exhibitor orchestration seems to work so far.
> > > > >
> > > > > I was trying to find a way to configure zookeeper servers in Kafka
> > conf
> > > > and
> > > > > do not have to reconfigure them in case a zookeeper node needs to
> be
> > > > > replaced/dies, so i taught of course of using DNS but then I read
> > that
> > > > > zkclient library used by Kafka has this bug:
> > > > > https://issues.apache.org/jira/browse/ZOOKEEPER-2184.
> > > > >
> > > > > So I'm now thinking about using an ELB in front of the zookeeper
> > > cluster.
> > > > > Teorically on how zookeeper client should work there should be no
> > > problem
> > > > > but I'm wondering if any of you used that and how is the outcome?
> > > > >
> > > > --
> > > >
> > > > With great enthusiasm,
> > > > Andrey
> > > >
> > >
> > --
> >
> > With great enthusiasm,
> > Andrey
> >
>


Re: Kafka with Zookeeper behind AWS ELB

2017-07-20 Thread Luigi Tagliamonte
Yes Andrey,
you can use an ENI without EIP on AWS if you only want a private address.

After some consideration, I think that growing the zookeeper cluster more
than 3 nodes is really unlikely so I think that I will attach 3 ENI to 3
servers in autoscaling and I will configure Kafka in using this 3 IPs.
In this way I can get rid of the additional ELB/Haproxy layer, if I will
ever need to grow the zk ensemble I will re-engineering the solution.

I'm wondering if reusing an old IP on a brand new zk node will create
issues in the ensemble.
Is anybody here aware of possible drawbacks?

On Wed, Jul 19, 2017 at 11:58 PM, Andrey Dyachkov  wrote:

> The problem with EIP it is a public ip.
> Another option is to have the secondary interface attached to the instance
> on start(or a bit later) with the private static ip, but we are
> investigating the possibility.
> On Wed 19. Jul 2017 at 23:38, Luigi Tagliamonte <
> luigi.tagliamont...@gmail.com> wrote:
>
> > Hello Andrey,
> > I see that the ELB is not going to help directly with the bug, but
> > introduces a nice layer that makes zookeeper DNS management easier.
> > Introducing and ELB I don't have to deal with keep DNS in sync for all
> the
> > servers in the zk ensemble.
> > For the moment I can use an HAproxy with EIP and when the bug is solved I
> > can move to ELB.
> > What do you think about it?
> > Regards
> > L.
> >
> > On Wed, Jul 19, 2017 at 2:16 PM, Andrey Dyachkov <
> > andrey.dyach...@gmail.com>
> > wrote:
> >
> > > Hi,
> > > I have just posted almost the same question in dev list.
> > > Zookeeper client resolves address only once, on start, introducing ELB
> > > won't really help here (ELBs can be replaced, which involved ip
> change),
> > > but I am eager to know if there is a solution for that.
> > >
> > > On Wed, 19 Jul 2017 at 23:08 Luigi Tagliamonte <
> > > luigi.tagliamont...@gmail.com> wrote:
> > >
> > > > Hello, Users!
> > > > I'm designing a Kafka deployment on AWS and it's my first time
> working
> > > with
> > > > Kafka and Zookeeper so I've collected a lot of info so far but also
> > some
> > > > questions that I would love to submit to a much expert audience like
> > you.
> > > >
> > > > I have been experimenting with exhibitor and zookeeper in auto
> scaling
> > > > group and the exhibitor orchestration seems to work so far.
> > > >
> > > > I was trying to find a way to configure zookeeper servers in Kafka
> conf
> > > and
> > > > do not have to reconfigure them in case a zookeeper node needs to be
> > > > replaced/dies, so i taught of course of using DNS but then I read
> that
> > > > zkclient library used by Kafka has this bug:
> > > > https://issues.apache.org/jira/browse/ZOOKEEPER-2184.
> > > >
> > > > So I'm now thinking about using an ELB in front of the zookeeper
> > cluster.
> > > > Teorically on how zookeeper client should work there should be no
> > problem
> > > > but I'm wondering if any of you used that and how is the outcome?
> > > >
> > > --
> > >
> > > With great enthusiasm,
> > > Andrey
> > >
> >
> --
>
> With great enthusiasm,
> Andrey
>


Re: On rebalance consumer group start from old offset

2017-07-20 Thread Justin Maltat
Solved by kafka-5600

Le mar. 18 juil. 2017 18:51, Sabarish Sasidharan  a
écrit :

> This is similar to a problem I am also grappling with. We store the
> processed offset for each partition in state store. And after restarts we
> see that sometimes the start offset that Kafka Streams uses is a few
> thousands to a couple million behind per partition. To compound it, this is
> not repeatable.
>
> Regards
> Sab
>
> On Tue, Jul 18, 2017 at 9:00 PM, Justin Maltat 
> wrote:
>
> > Hi
> >
> > On a 3 brokers cluster when one of the broker come back after a restart
> > group rebalancing happens on our 2 consumers which make them restart to
> > consume from an old offset which is not the earliest. Looking at the
> > consumer offsets through kafka tools when running commits look good but
> on
> > rebalance offset changes
> >
> > If we cut our consumer it restarts at the right offset
> >
> > Do you have any idea of what could be happening?
> >
> > Regards
> >
> > Justin Maltat
> >
> > >
> > >
> >
>


Spring release using apache clients 11

2017-07-20 Thread David Espinosa
Hi, somebody know if we will any spring integration/kafka release soon
using apache clients 11?


Reminder: Please accept my invitation to join Advanced Spark and TensorFlow Meetup

2017-07-20 Thread Chris Fregly via Meetup

Advanced Spark and TensorFlow Meetup


Join Chris Fregly and 7,676 other Spark and TensorFlow Experts in San 
Francisco. Be the first to hear about upcoming Meetups.

Spark and Deep Learning Experts digging deep into the internals of Spark 
Core, Spark SQL, DataFrames, Spark Streaming, MLlib, Graph X, BlinkDB, 
TensorFlow, Caffe, Theano, OpenDeep, DeepLearning4J, etc.
Deeper than a blog post or typical meetup, we'll explore and discuss 
the best practices and idioms of the code base across many areas including 
Spark's JVM Bytecode Generation, CPU-cache-aware Data Structures and 
Algorithms, Approximations, Probabilistic Data Structures, Shuffle and I/O 
Optimizations, Streaming Micro-batch Scheduling, Performance Tuning, 
Configuration, Monitoring, Auto-Scaling, etc.
 
 

--

Accept invitation

http://meet.meetup.com/wf/click?upn=pEEcc35imY7Cq0tG1vyTt4Z4gND5RbLM8N-2BuJDsKubhlsuh5g4Jbj0Xb-2Ba7-2BJOrs7eqkf5U07yDualtMk4G9XU8HSPDz-2FOR381TTPPky2K-2FT45NkX6ZmY5pxu1PEBxNilFki1YXQIvlilu-2FJibrEigWBKHvtjtLTKFgDNI32X2dmTi9erfvfaT5jtPPZP0OgDC4GbQUOT6VkFtvWizE4Su8t9qZkmW80BHXNmL4SOQH5tUTLKrfWsbbaU-2F-2Brobl5bjw-2BmMTGB-2BO-2BjkQtXVHH7jvw0keUv9vFS5tGDQQvmqFFAbR5iUzhkEltL9Ol0JTZ6fsbwSrLINI-2FZz5Lnu6AqTwuw9VvDMEhmhNNGJWR1SA6qu67dyLo3MTluZIsgBexqpRE8vNvEcvUA6-2BGC4y5VtVJ7yUWock-2BIlCVBpOZegzVw7xbPRsgFRGUWsA6h4oy7BRjF2hOn1rJ-2FknfHKtjgJHw7serSR6HVdM90sX5wsbxgnpwY2-2BnF6jOzAODeC3Py7jwGpVsw1laqF1UlqdCsBODLVtdi3PkiLRbpnaTYMd1jlm162Z80I7wRbKTszHUtApJV1u-2Bw70wd4Rk0Pc65HIotZMuhoSAJuYgA0OYRuPM3kdVdud-2BU24E-2F1edki9Qw29ZdsysA-2FlkCvq0zbMI1Gr7lknXLFTiuHhgLxjcJepd9xsPPZ5FB11M1vElH2P-2BiPzmvdMwYuPYAMgLMnHJOvaaYt0rmoul0yV7O-2FghHEirCFv8ueSW3Xc3bG0Mnmz6Uwj9PuHvmbBxNo7oPLqVsRv0MgI7PYm9P2nxkryyn1Vb-2Ba1QrDyhyoY8GaOUK6EDhNnoiMZVw-2B8HAPxhEjmXEhz0h-2Fbo0utljZc6B74g5bfKt8o7hFqHu0WuebcOlThQ_oEMJ28Tbv3NCKzw6SHcfLkwgGQePfTBsOuG0f1Ja6O1NETiicktnseYauQnTB-2FKuPV0qVlu4t6Sj3B4zm1OLzidBelcYGEMRnu0SIo9-2Fd6iuWHLNTYPTMNb3hBIm0QDBy9pRzftGJe9Ev-2FS8gGkDtV0HQmIHI-2FI7feu31L6fFeEUpKRdlrX75ZXiblmX4QNOYpAQ7IJaRIgceRRFFOXZN6PnwawE8LE2t0d5-2F1O7Hjk-3D

--

---
This message was sent by Meetup on behalf of Chris Fregly 
(http://meet.meetup.com/wf/click?upn=pEEcc35imY7Cq0tG1vyTt5I3-2FkOygpMJKs1-2BJk22XgQsmXJdCXZecXHuTOmfajUqepmgKOG38LpII1h0zlJM2PZbxd9oydnankPOghkJHi8wMrTQNS-2FMy3NI1lZGdh3Q_oEMJ28Tbv3NCKzw6SHcfLkwgGQePfTBsOuG0f1Ja6O1NETiicktnseYauQnTB-2FKuxTovUrcYobdG6cC4bEW4p7-2BHP-2ByjEB6Ukp5bnDOfx2wbbHICMcvorTsrRegdPE88TTohA-2BrBN5XGZd1IXqEc7UBvQzPCRwA3234pGB97qg9-2FDCi8luHnO9pvzJK2qUzO5xFNkolcaNf0YHi1UOE6FDeXriijZopVFiWowakC41I-3D
 from Advanced Spark and TensorFlow Meetup.


Questions? You can email Meetup Support at supp...@meetup.com

Unsubscribe 
(http://meet.meetup.com/wf/click?upn=pEEcc35imY7Cq0tG1vyTt4Z4gND5RbLM8N-2BuJDsKubhlsuh5g4Jbj0Xb-2Ba7-2BJOrs7eqkf5U07yDualtMk4G9XXM9AVZBrUeKDq0HbE4ayLXUofWSR5jrzgDpGi4KePNgeIiHLY81A-2FZ5XZXHMP-2B5wd6ElCOq1vuqVAeHj-2Bmkfxx3bzNrYNb3UrRtaQIGFg-2B-2BzXBlLUXd025OiUInkSF6Qrq24fhp8tVqVVzgb60g8RP6sLtklFWh9lbUkoawPuAEdRreQLRB3cZ1UJoDVkvrAHPbY-2BpUpxzzYacVniKPsHWhmWQrHbUeP2s3JwwnRoW-2FsL73fZYeTiKyUbI148CkqsOX3L3oiv2qypNmZevWYTyxsCJFW5-2BhNeVIl8hz0v8hCbiJXOWfNUL7nFhzQKPyROem1RZycGkz5LAvevTnncWkXFX7KLGVRbCsDQUGzbZWOodZ0LauDVVfqPkIPdRSCAmIfPoxj-2FknAkRF5EdgLMxAFDH6Thr7EYWqAye7AjaiEysvw9T-2BCOU-2F6uX8n3xAivdo9VNk1zc3R9FjHH85x4ITCPqjgfISQ-2BmeQ6rRms1IEdn3EQ-2BHPv-2BX0vP3yCA3BVd5612nN5B-2F3lUHe8AaGPQ-3D_oEMJ28Tbv3NCKzw6SHcfLkwgGQePfTBsOuG0f1Ja6O1NETiicktnseYauQnTB-2FKu4et8Coy2-2FK5I0J7ZDy-2BHF2nQjdbiRfkT4RQlb3fhDOoRcVFqHn9HSEmswxfS0ngsoZvS3WvjwQuvICHsL4Dg5TdRgcdKDOt8W7ZEkRGZBw5gSJyWI9cVo9CSRUUke6RiKlVuk3GWi6xF3K62LQVYoN2KHE2RcTbzKu535INTdzo-3D
 from this type of email.

Meetup Inc. 
(http://meet.meetup.com/wf/click?upn=pEEcc35imY7Cq0tG1vyTt-2B7KQDdcKC7y-2FRa2MaHHwu32BVPjg98Avxo09r3TPTi7_oEMJ28Tbv3NCKzw6SHcfLkwgGQePfTBsOuG0f1Ja6O1NETiicktnseYauQnTB-2FKu4et8Coy2-2FK5I0J7ZDy-2BHFziOO0d4agbn30p6MdZBkr-2F4M7xM16UGCbI6kvseuIf88awVG2CNpD4raPpfOtLHZr01OJdRaZ-2B0CmMWzu1eC3505fdy67hLL7T-2FRW0epHmlOV9LHuOSQdjCO8iO99QHV1NGuPs29kVwEvcdjX0HS3k-3D
 POB 4668 #37895 New York NY USA 10163


Kafka Connect distributed mode rebalance

2017-07-20 Thread Stephen Durfey
I'm seeing some behavior with the DistributedHerder that I am trying to
understand. I'm working on setting up a cluster of kafka connect nodes and
have a relatively large number of connectors to submit to it (392
connectors right now that will soon become over 1100). As for the
deployment of it I am using chef, and having that PUT connector configs at
deployment time so I can create/update any connectors.

Everytime I PUT a new connector config to the worker it appears to be
initiating an assignment rebalance. I believe this is only happening when
submitting a new connector. This is causing all existing and running
connectors to stop and restart. My logs end up being flooded with
exceptions from the source jdbc task with sql connections being closed and
wakeup exceptions in my sink tasks when committing offsets. This causes
issues beyond having to wait for a rebalance as restarting the jdbc
connectors causes them to re-pull all data, since they are using bulk mode.
Everything eventually settles down and all the connectors finish
successfully, but each PUT takes progressively longer waiting for a
rebalance to finish.

If I simply restart the worker nodes and let them only instantiate
connectors that have already been successfully submitted everything starts
up fine. So, this is only an issue when submitting new connectors over the
REST endpoint.

So, I'm trying to understand why submitting a new connector causes the
rebalancing, but also if there is a better way to deploy the connector
configs in distributed mode?

Thanks,

Stephen


Re: Get data from old offsets

2017-07-20 Thread Matthias J. Sax
Did you try setting `auto.offset.reset` to "earliest" ?

-Matthias


On 7/18/17 8:44 PM, Yuri da Costa Gouveia wrote:
> Hello,
> I am having trouble to get the data from old offsets. I'm using the version
> 0.10.2.1, and I need any assistance to recover this data.
> This is my consumer class:
> 
>  String topicName = "test";
>  Properties props = new Properties();
> 
>  props.put("bootstrap.servers", "localhost:9092");
>  props.put("group.id", "test");
>  props.put("enable.auto.commit", "true");
>  props.put("auto.commit.interval.ms", "1000");
>  props.put("session.timeout.ms", "3");
>  props.put("key.deserializer", StringDeserializer.class.getName());
>  props.put("value.deserializer", StringDeserializer.class.getName());
> 
>  Thread.currentThread().setContextClassLoader(null);
>  KafkaConsumer consumer = new KafkaConsumer
>  (props);
> 
>   //Kafka Consumer subscribes list of topics here.
>   consumer.subscribe(Arrays.asList(topicName));
>   //print the topic name
>   System.out.println("Subscribed to topic " + topicName);
>   int i = 0;
> 
>   while (true) {
>  ConsumerRecords records = consumer.poll(100);
>  for (ConsumerRecord record : records)
> 
>  // print the offset,key and value for the consumer records.
>  System.out.printf("offset = %d, key = %s, value = %s\n",
> record.offset(), record.key(), record.value());
>  }
> 



signature.asc
Description: OpenPGP digital signature


Re: DAG processing in Kafka Streams

2017-07-20 Thread Matthias J. Sax
Sameer,

the optimization you describe applies to batch processing but not to
stream processing.

As you mentioned: "will traverse the data only once".

This property is interesting in batch processing only, as it means that
the data is only read from disk once and both map operations are applies
during this read. Ie, a "dump" execution would do:

1) read -> map -> write
2) read -> map -> write
3) read -> reduce -> write

but Spark does it like this:

read -> map -> map -> reduce -> write

(this is called pipeline parallelism, or operator chaining/fusion).

In stream processing, you will do the "optimized" version anyway, and
thus, because the optimized version is the native way to execute a
streaming program, there is no need to optimize :)

Note, that data is not read/written from disk in intermediate steps in
stream processing. The deployed program is a continuous query and the
operator are deployed and "online" all the time, while data is streamed
through them. It's a completely different runtime model.

Thus, with regard to Spark vs Kafka Streams and your example, Kafka
Streams will execute two consecutive maps quite similar to Spark. I say
"quite similar" only because Kafka Streams is a true stream processing
while Spark Streaming does micro batching (ie, it emulates streaming by
doing batch processing).

Hope this answers your question.


-Matthias


On 7/18/17 7:35 AM, Sameer Kumar wrote:
> Hi Guozhang,
> 
> I was comparing it with DAG processing in Spark.  Spark Streaming is a
> close competitor to Kafka Streams, one difference which might accounts for
> a faster performance was that Spark submits the code to the code and does a
> bit of code optimization that its end.
> 
> Lets consider an example code which has map-> map-> reduce. The map
> functions will not be executed unless reduce executes since its a terminal
> operation but whenever execution happens spark will traverse data only once
> and may call map functions one after the another. This is same as in Java 8
> concept of streams.
> 
> Please refer to the following link, that explains it very well
> https://stackoverflow.com/questions/25836316/how-dag-works-under-the-covers-in-rdd/30685279#30685279
> 
> In Kafka Streams, we do specify the topology here but i dont think we do
> some sort of code optimization. My earlier example will traverse the data
> twice once for each map phase.
> 
> Please excuse with the late response, I am operating out of different
> geography.
> 
> -Sameer.
> 
> 
> 
> On Tue, Jul 18, 2017 at 2:44 AM, Guozhang Wang  wrote:
> 
>> If that is what it meant for DAG processing (we still need to confirm with
>> Sameer), then programming-wise I do not see what's the difference with
>> Kafka Streams since inside Streams users is also just specifying the
>> topology as a DAG:
>>
>> https://kafka.apache.org/0110/documentation/streams/core-
>> concepts#streams_topology
>>
>> What is even better is that for Streams since we use Kafka as intermeidate
>> buffer between connected sub-topologies user do not need to worry about
>> back-pressure at all:
>>
>> http://docs.confluent.io/current/streams/architecture.html#backpressure
>>
>>
>> And flexibility-wise, as you mention "it is a bit more flexible than Kafka
>> Streams", I also cannot agree with you that it is the case, since with
>> Kafka Streams threading model people can easily have multiple tasks
>> representing different connected parts (i.e. sub-topologies) of the DAG
>> which are then hosted by different threads executing on their own pace
>> concurrently:
>>
>> https://kafka.apache.org/0110/documentation/streams/architecture#streams_
>> architecture_threads
>>
>> Again, this is because different threads never need to talk to each other,
>> but they just read / write data from / to Kafka topics which are then the
>> persistent buffer of the intermeidate streams, no synchronization between
>> threads are needed.
>>
>>
>> Guozhang
>>
>> On Mon, Jul 17, 2017 at 10:38 AM, David Garcia 
>> wrote:
>>
>>> On that note, akka streams has Kafka integration.  We use it heavily and
>>> it is quite a bit more flexible than K-Streams (which we also use…but for
>>> simpler applications)  Akka-streams-Kafka is particularly good for
>>> asynchronous processing: http://doc.akka.io/docs/akka-
>>> stream-kafka/current/home.html
>>>
>>> -David
>>>
>>> On 7/17/17, 12:35 PM, "David Garcia"  wrote:
>>>
>>> I think he means something like Akka Streams:
>>> http://doc.akka.io/docs/akka/2.5.2/java/stream/stream-graphs.html
>>>
>>> Directed Acyclic Graphs are trivial to construct in Akka Streams and
>>> use back-pressure to preclude memory issues.
>>>
>>> -David
>>>
>>> On 7/17/17, 12:20 PM, "Guozhang Wang"  wrote:
>>>
>>> Sameer,
>>>
>>> Could you elaborate a bit more what do you mean by "DAG
>>> processing"?
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Sun, Jul 16, 2017 at 11:58 PM, Sameer Kumar <
>>> sam.kum.w...@gmail.com>
>>> wrote:
>>>
>>>   

Re: IllegalStateException with custom state store ..

2017-07-20 Thread Matthias J. Sax
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-WhydoIgetanIllegalStateExceptionwhenaccessingrecordmetadata?

-Matthias

On 7/1/17 8:13 PM, Debasish Ghosh wrote:
> Just to give some more information, the ProcessorContext that gets passed
> to the init method of the custom store has a null RecordContext. Gave the
> following debug statement ..
> 
> println(context.asInstanceOf[ProcessorContextImpl].recordContext)
> 
> and got null.
> 
> regards.
> 
> On Sat, Jul 1, 2017 at 9:41 PM, Debasish Ghosh 
> wrote:
> 
>> Hi -
>>
>> I have implemented a custom state store named BFStore with a change
>> logger as follows:
>>
>> class BFStoreChangeLogger[K, V](val storeName: String,
>> val context: ProcessorContext,
>> val partition: Int,
>> val serialization: StateSerdes[K, V]) {
>>
>>   private val topic = 
>> ProcessorStateManager.storeChangelogTopic(context.applicationId,
>> storeName)
>>   private val collector = context.asInstanceOf[RecordCollector.Supplier].
>> recordCollector
>>
>>   def this(storeName: String, context: ProcessorContext, serialization:
>> StateSerdes[K, V]) {
>> this(storeName, context, context.taskId.partition, serialization)
>>   }
>>
>>   def logChange(key: K, value: V): Unit = {
>> if (collector != null) {
>>   val keySerializer = serialization.keySerializer
>>   val valueSerializer = serialization.valueSerializer
>>   collector.send(this.topic, key, value, this.partition,
>> context.timestamp, keySerializer, valueSerializer)  //**//
>> }
>>   }
>> }
>>
>> In my driver program I build the topology and start the streams as follows:
>>
>> val builder: TopologyBuilder = new TopologyBuilder()
>>
>> builder.addSource("Source", config.fromTopic)
>>.addProcessor("Process", () => new WeblogProcessor(), "Source")
>>.addStateStore(new BFStoreSupplier[String](LOG_COUNT_STATE_STORE,
>> stringSerde, true, changelogConfig), "Process")
>>.addSink("Sink", "weblog-count-topic", "Process")
>>
>> val streams = new KafkaStreams(builder, streamingConfig)
>> streams.start()
>>
>> When I run the program, immediately I get the following exception ..
>>
>> Exception in thread "StreamThread-1" 
>> org.apache.kafka.streams.errors.ProcessorStateException:
>> task [0_0] Failed to flush state store log-counts
>> at org.apache.kafka.streams.processor.internals.
>> ProcessorStateManager.flush(ProcessorStateManager.java:337)
>> at org.apache.kafka.streams.processor.internals.
>> StreamTask$1.run(StreamTask.java:72)
>> at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>> measureLatencyNs(StreamsMetricsImpl.java:188)
>> at org.apache.kafka.streams.processor.internals.
>> StreamTask.commit(StreamTask.java:280)
>> at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
>> StreamThread.java:807)
>> at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
>> StreamThread.java:794)
>> at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
>> StreamThread.java:769)
>> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:647)
>> at org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:361)
>> *Caused by: java.lang.IllegalStateException: This should not happen as
>> timestamp() should only be called while a record is processed*
>> at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.
>> timestamp(AbstractProcessorContext.java:150)
>> at com.lightbend.fdp.sample.kstream.processor.
>> BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:24)
>> at com.lightbend.fdp.sample.kstream.processor.BFStore.
>> flush(BFStore.scala:86)
>> at org.apache.kafka.streams.processor.internals.
>> ProcessorStateManager.flush(ProcessorStateManager.java:335)
>> ... 8 more
>>
>> Not sure I understand the whole trace but looks like this may be related
>> to https://issues.apache.org/jira/browse/KAFKA-5528. It comes from the
>> class BFStoreChangeLogger in the line I marked above with //**//.
>>
>> Any help / workaround will be appreciated ..
>>
>> regards.
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: Custer startup order WRT Zookeeper Quorum for Kafka

2017-07-20 Thread Tom Crayford
You'll need a ZK quorum established before brokers boot, for sure.

On Thu, Jul 20, 2017 at 12:53 PM, M. Manna  wrote:

> Hello,
>
> This might be too obvious for some people, but just thinking out loud here.
>
> So we need a recommended 3 node cluster to achieve the 1 point failure
> model. I am trying to deploy a 3 node cluster (3 zks and 3 brokers) in
> Linux (or Even Windows, doesn't matter here).
>
> Under the circumstance (or any such multiple node cluster setup) - should I
> always bring up the #Quorum ZKs before starting any broker? Or is there a
> recommended startup order ?
>
> My thinking behind this (right or wrong) is that since the Quorum
> availability is important, without the ZK quorum, any broker startup will
> struggle and give up after respective retries and backoff timeouts are
> overrun. But if my understanding is incorrect, could someone please explain
> their analysis?
>
> Regards,
>


Custer startup order WRT Zookeeper Quorum for Kafka

2017-07-20 Thread M. Manna
Hello,

This might be too obvious for some people, but just thinking out loud here.

So we need a recommended 3 node cluster to achieve the 1 point failure
model. I am trying to deploy a 3 node cluster (3 zks and 3 brokers) in
Linux (or Even Windows, doesn't matter here).

Under the circumstance (or any such multiple node cluster setup) - should I
always bring up the #Quorum ZKs before starting any broker? Or is there a
recommended startup order ?

My thinking behind this (right or wrong) is that since the Quorum
availability is important, without the ZK quorum, any broker startup will
struggle and give up after respective retries and backoff timeouts are
overrun. But if my understanding is incorrect, could someone please explain
their analysis?

Regards,


Re: Streams StateStore topic name/reuse

2017-07-20 Thread Damian Guy
Hi,

I have two questions:
> 1°/ Is the format written on this topic easily readable using the same
> Serde I use for the state store or does Streams change it in any way?
>

If it is a KeyValue Store then you can use your Serdes to read from the
changelog.


> 2°/ since the topic will be used by several applications, I don't think
> naming it {StreamsAppID}-{StateStoreName}-changelog is a good name. Is
> there any way I can choose the topic name that Streams is going to use for
> a given StateStore?
>

Sorry, but this isn't supported. Alternatively, you could use the `to`
operator and send the data to another topic of your choice, of course this
will duplicate the data.

Thanks,
Damian


>
>
> Thanks in advance,
> Vincent Bernardi
>


Streams StateStore topic name/reuse

2017-07-20 Thread Vincent Bernardi
Hello everyone,
I currently run a small Streams app which accumulates data in a state store
and periodically erases it. I would like another application (not running
Kafka Streams), to consume the Kafka topic which backs this State Store and
sometimes take actions depending on the state (of course never writing on
the topic, only reading).

I have two questions:
1°/ Is the format written on this topic easily readable using the same
Serde I use for the state store or does Streams change it in any way?
2°/ since the topic will be used by several applications, I don't think
naming it {StreamsAppID}-{StateStoreName}-changelog is a good name. Is
there any way I can choose the topic name that Streams is going to use for
a given StateStore?


Thanks in advance,
Vincent Bernardi


Re: [DISCUSS] KIP-177 Consumer perf tool should count rebalance time

2017-07-20 Thread Ismael Juma
OK, sounds good. Let's just make sure we note this in the upgrade notes.

Ismael

On Wed, Jul 19, 2017 at 11:57 AM, Jason Gustafson 
wrote:

> Ismael, I debated that also, but the main point was to make users aware of
> the rebalance latency (with KIP-134 in mind). I'm guessing no one would
> notice if it required another option. Note that the KIP does preserve the
> existing fields (and in the same order), so if it is parsed as generic csv
> data, it should be fine. But yeah, it could break some dumb parsers. In
> general, I think we should at least allow ourselves compatible changes
> given the output format that we have chosen for a tool.
>
> -Jason
>
> On Wed, Jul 19, 2017 at 7:54 AM, Ismael Juma  wrote:
>
> > I think this is a good chance although it's unfortunate that it's likely
> to
> > break code that is parsing the output of the performance tool. Would it
> > make sense to only enable this if an option is provided?
> >
> > Ismael
> >
> > On Mon, Jul 17, 2017 at 3:41 PM, Jason Gustafson 
> > wrote:
> >
> > > +Users
> > >
> > > Thanks for the KIP. I think tracking the rebalance time separately will
> > > help resolve some confusion about the performance results given the
> > > rebalance delay in KIP-134. And it seems generally useful to know how
> > much
> > > overhead is coming from the rebalance in any case.
> > >
> > > -Jason
> > >
> > > On Thu, Jul 13, 2017 at 4:15 PM, Hu Xi  wrote:
> > >
> > > > Hi all, I opened up a new KIP > > > confluence/display/ARIES/KIP-177%3A+Consumer+perf+tool+
> > > > should+count+rebalance+time> (KIP-177) concerning consumer perf tool
> > > > counting and showing rebalance time in the output. Be free to leave
> > your
> > > > comments here. Thanks in advance.
> > > >
> > >
> >
>