Can leader be removed from ISR if leader is unstable

2018-02-01 Thread tao xiao
Hi team,

I use Kafka 0.10.0.0 and recently encountered an issue where the hard disk
mounted in one of the nodes experienced performance degradation that caused
the node being unstable. But controller didn't remove the node from ISR of
partitions for which the node is a leader. I wonder if anyway to kick the
leader out of ISR when it become unstable


Re: Streams: Why is ForwardingCacheFlushListener internal? Can I replicate its functionality in my code?

2018-02-01 Thread Dmitry Minkovsky
Thank you Guozhang.

> related to your consistency requirement of the store? Do you mean
flushing the cache to persist into the store or flushing the store

Yes, motivated by consistency, I want to forward the state downstream only
after LRU cache is persisted into the store on disk, and the store's
changelog topic has been replicated.

> So if your goal is to achieve de-duping the downstream traffic

I am trying to make sure, to whatever degree is supposed by the library
now, that downstream processors don't see a message that is the result of
state that is possibly inconsistent.

Thank you for describing those mechanisms. I will investigate them.

On Thu, Feb 1, 2018 at 1:53 PM, Guozhang Wang  wrote:

> Hello Dmitry,
>
> Forwarding to downstream upon state store flushing is an internal
> implementation detail that is used by the DSL operators only, and hence we
> define related classes internals to abstract them away from the users.
>
> For your case, one thing I need to clarify is for "flushing the store", is
> how that is related to your consistency requirement of the store? Do you
> mean flushing the cache to persist into the store or flushing the store
> (e.g. flush a rocksDB store) itself? Here are some things for you to
> notice:
>
> 1. A state store can be flushed by calling store.flush() programmatically,
> and if the store is a cached store it will also automatically flush the
> cache on top of it to make sure all dirty keys are persisted to the
> underlying storage.
> 2. A state store will also be flushed whenever the processor topology calls
> commit(), which can either be user-triggered (context.commit() ) or based
> on time period (there is a commit interval config).
>
> So if your goal is to achieve de-duping the downstream traffic, you can
> consider using punctuator to periodically flush the store and send the
> whole key-value map entries to downstream; if your goal is to only send to
> downstream when the cache is flushed, you can consider overriding the
> `flush()` function of the state store, that after the flush, send the whole
> key-value map entries to downstream.
>
>
> Guozhang
>
>
>
> On Thu, Feb 1, 2018 at 2:10 AM, Dmitry Minkovsky 
> wrote:
>
> > Right, but I want to forward messages to downstream processor nodes only
> > when the store flushes. How does that happen automatically
> > when KTableSourceProcessor sets that up to happen with a TupleForwarder?
> >
> > On Thu, Feb 1, 2018 at 2:59 AM, Matthias J. Sax 
> > wrote:
> >
> > > If you build a store and enable caching, you get the KTable behavior
> out
> > > of the box. No need to write any custom code in the processor itself.
> > >
> > >
> > > StoreBuilder builder =
> > > Stores.keyValueStoreBuilder(...).withCachingEnabled();
> > >
> > > topology.addStateStore(builder, ...)
> > >
> > >
> > >
> > > -Matthias
> > >
> > > On 1/31/18 6:19 PM, Dmitry Minkovsky wrote:
> > > > I am writing a processor and I want its stores to behave like
> KTables:
> > > For
> > > > consistency, I don't want to forward values until the stores have
> been
> > > > flushed.
> > > >
> > > > I am looking at `ForwardingCacheFlushListener` and see that it is
> using
> > > > `InternalProcessorContext` to change the current node, perform a
> > forward,
> > > > and then set the node back.
> > > >
> > > > Now, I could do the same (`InternalProcessorContext` is public), but:
> > > > should I? I'm not well versed in the internals, so I am wondering
> what
> > > the
> > > > ramifications of this might be. Is this okay to do? Should I?
> > > >
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: Recommended max number of topics (and data separation)

2018-02-01 Thread Ted Yu
After brief search, I found KAFKA-6469
FYI
 Original message From: Andrey Falko  
Date: 2/1/18  5:28 PM  (GMT-08:00) To: users@kafka.apache.org Subject: Re: 
Recommended max number of topics (and data separation) 
Indeed David, I confirmed that I can't push my clusters to more than
72k topics with default zookeeper settings. Once I get to that
quantity, leader election never happens for new topics. Additionally
if I kill one of the brokers, all the topics don't get leaders
re-elected and it is impossible to trigger it due to the ClientCnxn
"Packet len4194494 is out of range" exception or "Failed to start
preferred replica election" error. It seems like there should be a
couple of JIRA item for starters:
1) Don't let # of topics exceed what jute.maxbuffer will bear; i.e.
send a good error message back to users saying the peak has been
reached and no more topics can be created.
2) Kafka should support more than 72k topics (probably without
requiring messing with jute.maxbuffer).

Is anyone aware of JIRA tickets that might already cover the above?

On Wed, Jan 31, 2018 at 8:35 AM, David Espinosa  wrote:
> I used:
> -Djute.maxbuffer=50111000
> and the gain I had is that I could increment number of topics from 70k to
> 100k :P
>
> 2018-01-30 23:25 GMT+01:00 Andrey Falko :
>
>> On Tue, Jan 30, 2018 at 1:38 PM, David Espinosa  wrote:
>> > Hi Andrey,
>> > My topics are replicated with a replicated factor equals to the number of
>> > nodes, 3 in this test.
>> > Didn't know about the kip-227.
>> > The problems I see at 70k topics coming from ZK are related to any
>> > operation where ZK has to retrieve topics metadata. Just listing topics
>> at
>> > 50K or 60k you will experience a big delay in the response. I have no
>> more
>> > details about these problems, but is easy to reproduce the latency in the
>> > topics list request.
>>
>> AFAIK kafka doesn't do a full list as part of normal operations from
>> ZK. If you have requirements in your consumer/producer code on doing
>> --describe, then that would be a problem. I think that can be worked
>> around. Based on my profiling data so far, while things are working in
>> non-failure mode, none of the ZK functions pop up as "hot methods".
>>
>> > Thanks me for pointing me to this parameter,  vm.max_map_count, it wasn't
>> > on my radar. Could you tell me what value you use?
>>
>> I set it to the max allowable on Amzn Linux: vm.max_map_count=1215752192
>>
>> > The other way around about topic naming, I think the longer the topic
>> names
>> > are the sooner jute.maxbuffer overflows.
>>
>> I see; what value(s) have you tried with and how much gain did you you see?
>>
>> > David
>> >
>> >
>> > 2018-01-30 4:40 GMT+01:00 Andrey Falko :
>> >
>> >> On Sun, Jan 28, 2018 at 8:45 AM, David Espinosa 
>> wrote:
>> >> > Hi Monty,
>> >> >
>> >> > I'm also planning to use a big amount of topics in Kafka, so recently
>> I
>> >> > made a test within a 3 nodes kafka cluster where I created 100k topics
>> >> with
>> >> > one partition. Sent 1M messages in total.
>> >>
>> >> Are your topic partitions replicated?
>> >>
>> >> > These are my conclusions:
>> >> >
>> >> >    - There is not any limitation on kafka regarding the number of
>> topics
>> >> >    but on Zookeeper and in the system where Kafka nodes is allocated.
>> >>
>> >> There are also the problems being addressed in KIP-227:
>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> >> 227%3A+Introduce+Incremental+FetchRequests+to+Increase+
>> >> Partition+Scalability
>> >>
>> >> >    - Zookeeper will start having problems from 70k topics, which can
>> be
>> >> >    solved modifying a buffer parameter on the JVM (-Djute.maxbuffer).
>> >> >    Performance is reduced.
>> >>
>> >> What kind of problems do you see at 70k topics? If performance is
>> >> reduced w/ modifying jute.maxbuffer, won't that effect the performance
>> >> of kafka interms of how long it takes to recover from broker failure,
>> >> creating/deleting topics, producing and consuming?
>> >>
>> >> >    - Open file descriptors of the system are equivalent to [number of
>> >> >    topics]X[number of partitions per topic]. Set to 128k in my test to
>> >> avoid
>> >> >    problems.
>> >> >    - System needs a big amount of memory for page caching.
>> >>
>> >> I also had to tune vm.max_map_count much higher.
>> >>
>> >> >
>> >> > So, after creating 100k with the required setup (system+JVM) but
>> seeing
>> >> > problems at 70k, I feel safe by not creating more than 50k, and always
>> >> will
>> >> > have Zookeeper as my first suspect if a problem comes. I think with
>> >> proper
>> >> > resources (memory) and system setup (open file descriptors), you don't
>> >> have
>> >> > any real limitation regarding partitions.
>> >>
>> >> I can confirm the 50k number. After about 40k-45k topics, I start
>> >> seeing slow down in consume 

Re: Recommended max number of topics (and data separation)

2018-02-01 Thread Andrey Falko
Indeed David, I confirmed that I can't push my clusters to more than
72k topics with default zookeeper settings. Once I get to that
quantity, leader election never happens for new topics. Additionally
if I kill one of the brokers, all the topics don't get leaders
re-elected and it is impossible to trigger it due to the ClientCnxn
"Packet len4194494 is out of range" exception or "Failed to start
preferred replica election" error. It seems like there should be a
couple of JIRA item for starters:
1) Don't let # of topics exceed what jute.maxbuffer will bear; i.e.
send a good error message back to users saying the peak has been
reached and no more topics can be created.
2) Kafka should support more than 72k topics (probably without
requiring messing with jute.maxbuffer).

Is anyone aware of JIRA tickets that might already cover the above?

On Wed, Jan 31, 2018 at 8:35 AM, David Espinosa  wrote:
> I used:
> -Djute.maxbuffer=50111000
> and the gain I had is that I could increment number of topics from 70k to
> 100k :P
>
> 2018-01-30 23:25 GMT+01:00 Andrey Falko :
>
>> On Tue, Jan 30, 2018 at 1:38 PM, David Espinosa  wrote:
>> > Hi Andrey,
>> > My topics are replicated with a replicated factor equals to the number of
>> > nodes, 3 in this test.
>> > Didn't know about the kip-227.
>> > The problems I see at 70k topics coming from ZK are related to any
>> > operation where ZK has to retrieve topics metadata. Just listing topics
>> at
>> > 50K or 60k you will experience a big delay in the response. I have no
>> more
>> > details about these problems, but is easy to reproduce the latency in the
>> > topics list request.
>>
>> AFAIK kafka doesn't do a full list as part of normal operations from
>> ZK. If you have requirements in your consumer/producer code on doing
>> --describe, then that would be a problem. I think that can be worked
>> around. Based on my profiling data so far, while things are working in
>> non-failure mode, none of the ZK functions pop up as "hot methods".
>>
>> > Thanks me for pointing me to this parameter,  vm.max_map_count, it wasn't
>> > on my radar. Could you tell me what value you use?
>>
>> I set it to the max allowable on Amzn Linux: vm.max_map_count=1215752192
>>
>> > The other way around about topic naming, I think the longer the topic
>> names
>> > are the sooner jute.maxbuffer overflows.
>>
>> I see; what value(s) have you tried with and how much gain did you you see?
>>
>> > David
>> >
>> >
>> > 2018-01-30 4:40 GMT+01:00 Andrey Falko :
>> >
>> >> On Sun, Jan 28, 2018 at 8:45 AM, David Espinosa 
>> wrote:
>> >> > Hi Monty,
>> >> >
>> >> > I'm also planning to use a big amount of topics in Kafka, so recently
>> I
>> >> > made a test within a 3 nodes kafka cluster where I created 100k topics
>> >> with
>> >> > one partition. Sent 1M messages in total.
>> >>
>> >> Are your topic partitions replicated?
>> >>
>> >> > These are my conclusions:
>> >> >
>> >> >- There is not any limitation on kafka regarding the number of
>> topics
>> >> >but on Zookeeper and in the system where Kafka nodes is allocated.
>> >>
>> >> There are also the problems being addressed in KIP-227:
>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> >> 227%3A+Introduce+Incremental+FetchRequests+to+Increase+
>> >> Partition+Scalability
>> >>
>> >> >- Zookeeper will start having problems from 70k topics, which can
>> be
>> >> >solved modifying a buffer parameter on the JVM (-Djute.maxbuffer).
>> >> >Performance is reduced.
>> >>
>> >> What kind of problems do you see at 70k topics? If performance is
>> >> reduced w/ modifying jute.maxbuffer, won't that effect the performance
>> >> of kafka interms of how long it takes to recover from broker failure,
>> >> creating/deleting topics, producing and consuming?
>> >>
>> >> >- Open file descriptors of the system are equivalent to [number of
>> >> >topics]X[number of partitions per topic]. Set to 128k in my test to
>> >> avoid
>> >> >problems.
>> >> >- System needs a big amount of memory for page caching.
>> >>
>> >> I also had to tune vm.max_map_count much higher.
>> >>
>> >> >
>> >> > So, after creating 100k with the required setup (system+JVM) but
>> seeing
>> >> > problems at 70k, I feel safe by not creating more than 50k, and always
>> >> will
>> >> > have Zookeeper as my first suspect if a problem comes. I think with
>> >> proper
>> >> > resources (memory) and system setup (open file descriptors), you don't
>> >> have
>> >> > any real limitation regarding partitions.
>> >>
>> >> I can confirm the 50k number. After about 40k-45k topics, I start
>> >> seeing slow down in consume offset commit latencies that eclipse 50ms.
>> >> Hopefully KIP-227 will alleviate that problem and leave ZK as the last
>> >> remaining hurdle. I'm testing with 3x replication per partition and 10
>> >> brokers.
>> >>
>> >> > By the way, I used long 

Re: Kafka Consumers not rebalancing.

2018-02-01 Thread satyajit vegesna
Any help would be appreciated!

Hi All,

I was experimenting on the new consumer API and have a question regarding
the rebalance process.

I start a consumer group with single thread and make the Thread sleep while
processing the records retrieved from the first consumer.poll call, i was
making sure the Thread.sleep time goes beyond the session timeout and was
expecting to see a rebalance on the consumer group.

But when i try to get the state of the consumer group using the below
command,

/opt/confluent-4.0.0/bin/kafka-consumer-groups  --group
"consumer-grievances-group15" --bootstrap-server xxx:9092,:9092,
:9092  --describe

i get below result ,

Consumer group 'consumer-grievances-group15' has no active members.


TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET
LAGCONSUMER-ID   HOST
CLIENT-ID

TELMATEQA.grievances.grievances 0  125855  152037
26182  - -
-

The same happens with multiple thread in the consumer group scenario,
and *going
further one step i was able to make the thread get into running state, from
sleep state and could see that the consumer group started off from where it
left.*

My only question is , why isn't the rebalancing happening in this scenario.
My expectation was that the threads rebalance and start from the committed
offset.

Regards,
Satyajit.


On Mon, Jan 29, 2018 at 3:36 PM, satyajit vegesna 
wrote:

> Hi All,
>
> I was experimenting on the new consumer API and have a question regarding
> the rebalance process.
>
> I start a consumer group with single thread and make the Thread sleep
> while processing the records retrieved from the first consumer.poll call, i
> was making sure the Thread.sleep time goes beyond the session timeout and
> was expecting to see a rebalance on the consumer group.
>
> But when i try to get the state of the consumer group using the below
> command,
>
> /opt/confluent-4.0.0/bin/kafka-consumer-groups  --group
> "consumer-grievances-group15" --bootstrap-server xxx:9092,:9092,
> :9092  --describe
>
> i get below result ,
>
> Consumer group 'consumer-grievances-group15' has no active members.
>
>
> TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET
> LAGCONSUMER-ID   HOST
>   CLIENT-ID
>
> TELMATEQA.grievances.grievances 0  125855  152037
>   26182  - -
> -
>
> The same happens with multiple thread in the consumer group scenario, and 
> *going
> further one step i was able to make the thread get into running state, from
> sleep state and could see that the consumer group started off from where it
> left.*
>
> My only question is , why isn't the rebalancing happening in this
> scenario. My expectation was that the threads rebalance and start from the
> committed offset.
>
> Regards,
> Satyajit.
>
>


Re: Streams: Why is ForwardingCacheFlushListener internal? Can I replicate its functionality in my code?

2018-02-01 Thread Guozhang Wang
Hello Dmitry,

Forwarding to downstream upon state store flushing is an internal
implementation detail that is used by the DSL operators only, and hence we
define related classes internals to abstract them away from the users.

For your case, one thing I need to clarify is for "flushing the store", is
how that is related to your consistency requirement of the store? Do you
mean flushing the cache to persist into the store or flushing the store
(e.g. flush a rocksDB store) itself? Here are some things for you to notice:

1. A state store can be flushed by calling store.flush() programmatically,
and if the store is a cached store it will also automatically flush the
cache on top of it to make sure all dirty keys are persisted to the
underlying storage.
2. A state store will also be flushed whenever the processor topology calls
commit(), which can either be user-triggered (context.commit() ) or based
on time period (there is a commit interval config).

So if your goal is to achieve de-duping the downstream traffic, you can
consider using punctuator to periodically flush the store and send the
whole key-value map entries to downstream; if your goal is to only send to
downstream when the cache is flushed, you can consider overriding the
`flush()` function of the state store, that after the flush, send the whole
key-value map entries to downstream.


Guozhang



On Thu, Feb 1, 2018 at 2:10 AM, Dmitry Minkovsky 
wrote:

> Right, but I want to forward messages to downstream processor nodes only
> when the store flushes. How does that happen automatically
> when KTableSourceProcessor sets that up to happen with a TupleForwarder?
>
> On Thu, Feb 1, 2018 at 2:59 AM, Matthias J. Sax 
> wrote:
>
> > If you build a store and enable caching, you get the KTable behavior out
> > of the box. No need to write any custom code in the processor itself.
> >
> >
> > StoreBuilder builder =
> > Stores.keyValueStoreBuilder(...).withCachingEnabled();
> >
> > topology.addStateStore(builder, ...)
> >
> >
> >
> > -Matthias
> >
> > On 1/31/18 6:19 PM, Dmitry Minkovsky wrote:
> > > I am writing a processor and I want its stores to behave like KTables:
> > For
> > > consistency, I don't want to forward values until the stores have been
> > > flushed.
> > >
> > > I am looking at `ForwardingCacheFlushListener` and see that it is using
> > > `InternalProcessorContext` to change the current node, perform a
> forward,
> > > and then set the node back.
> > >
> > > Now, I could do the same (`InternalProcessorContext` is public), but:
> > > should I? I'm not well versed in the internals, so I am wondering what
> > the
> > > ramifications of this might be. Is this okay to do? Should I?
> > >
> >
> >
>



-- 
-- Guozhang


Load connect jars without restarting connector

2018-02-01 Thread farhan

Hi,

How can I load new connector jars without restarting connect?

Thanks,

Farhan Haider.
www.technify.pk


KAFKA step by step installation help

2018-02-01 Thread trading DEV
Hi,

We are interested in using KAFKA for our application and are trying to
start it on out Windows system. It would be great if you could help us with
a more easier step by step guide to setup the environment on Windows.

Thanks & Regards,

Naveen

Consultant | Host End

trad...@neml.in [image: Screenshot 2016-02-11 at 12.24.51.png] Ext :1072

as...@neml.in


Failed to send SSL Close messages unexpected status returned by SSLEngine.wrap, expected Close received Ok

2018-02-01 Thread John Chu
 Hello all:

I encountered an issue. Have filed a issue:
https://issues.apache.org/jira/browse/KAFKA-6510

Anybody have encountered that before?

Thanks.


Re: Fixing Wrecked Kafka Topic Partitions after a cluster reconfigure

2018-02-01 Thread Traiano Welcome
Hi Michal

Many thanks for the detailed advice. I've tested this on a dev cluster and
indeed the data is recoverable by reverting.


On Thu, Feb 1, 2018 at 10:11 PM, Michal Michalski <
michal.michal...@zalando.ie> wrote:

> Hey,
>
> Changing broker IDs made existing data in ZK irrelevant - since ZK uses
> broker ID to identify brokers and "associate" them with the partition
> placement (where the data is for each partition), it expects your data to
> be on the "old" brokers, while the ones that are connecting now are
> completely new to him (new IDs). This is the main problem here, I think.
>
> Also, changing default replication factor will not affect existing topics,
> so this change doesn't matter here and it's not causing any more issues :-)
>
> I think you may still be able to recover your data given:
> a) the ZK downtime you mentioned didn't cause a data loss / corruption
> (doesn't seem like, from what you wrote)
> b) you still have the "old" nodes with the data in "old" directory
>
> If that's the case, you should be able to recover the cluster to the
> "previous" state by simply reverting your changes, specifically the broker
> IDs and directory dir settings (if data is still in the old directory and
> configuration in ZK still correctly refers to old broker IDs I can't see a
> reason why it wouldn't work). Once your cluster is up and data is
> available, you're good to start over again.
>
> If you want to start from changing the data directory, simply:
> 1. stop Kafka process on one of the nodes
> 2. change data dir in config
> 3. move your data to the new directory
> 4. restart Kafka process
> Repeat for other nodes.
>
> To increase RF you have to use the bin/kafka-reassign-partitions.sh tool.
> I'd suggest referring to the Confluent documentation (
> https://docs.confluent.io/1.0.1/kafka/post-deployment.html#
> increasing-replication-factor
> ) for the details, as it's explained very well there. While it's a
> Confluent one, this specific CLI is not Confluent Platform specific, so
> will work for you as well. This may take a while depending on your data
> size.
>
> Also,* If* you started from increasing replication factor, to change your
> data dir you can simply stop Kafka, change the data dir in config, delete
> the data directory, and start the Kafka process again. Kafka will take care
> of getting the missing data from other brokers and putting them in the new
> data dir. Keep in mind that while it's one step less than what I proposed
> above, this means transferring all the data through the network - if you
> have a lot of data, it might be a bad idea. Also, be absolutely sure that
> your data is correctly replicated - if it's not, deleting your data from
> that broker means (obviously) a data loss.
>
> And an advice to keep in mind when dealing with Kafka in general: DO NOT
> CHANGE BROKER IDS for brokers with data, unless you exactly know what
> you're doing and have a good reason to do it - it will save you from many
> problems :-)
>
> Kind regards,
> Michał
>
> On 1 February 2018 at 13:28, Traiano Welcome  wrote:
>
> > Hi all,
> > I reconfigured my kafka cluster, changing:
> >
> > -  default replication factor from 1 to 3 and also
> > - changing the location of the kafka data dir on disk
> >
> > So after restarting all nodes, the cluster seemed ok  but then I noticed
> > all the topics are failing to come online. In the logs there are messages
> > like this for each topic:
> >
> > state-change.log: [2018-02-01 12:41:42,176] ERROR Controller 826437096
> > epoch 19 initiated state change for partition [filedrop,0] from
> > OfflinePartition to OnlinePartition failed (state.change.logger)
> >
> >
> >  So none of the topics are usable; Listing topics with kafkacat -L -b
> shows
> > leaders not availabil
> >
> >
> > ---
> > Metadata for all topics (from broker -1: lol-045:9092/bootstrap):
> >  7 brokers:
> >   broker 826437096 at lol-044:9092
> >   broker 746155422 at lol-047:9092
> >   broker 651737161 at lol-046:9092
> >   broker 728512596 at lol-048:9092
> >   broker 213763378 at lol-045:9092
> >   broker 622553932 at lol-049:9092
> >   broker 746727274 at lol-050:9092
> >  14 topics:
> >   topic "lol.stripped" with 3 partitions:
> > partition 2, leader -1, replicas: , isrs: , Broker: Leader not
> > available
> > partition 1, leader -1, replicas: , isrs: , Broker: Leader not
> > available
> > partition 0, leader -1, replicas: , isrs: , Broker: Leader not
> > available
> > ---
> >
> >  However, newly created topics are correctly replicated and healthy
> >
> > ---
> >   topic "lol-kafka-health" with 3 partitions:
> > partition 2, leader 622553932, replicas:
> 622553932,213763378,651737161,
> > isrs: 622553932,213763378,651737161
> > partition 1, leader 213763378, replicas:
> 622553932,213763378,826437096,
> > isrs: 213763378,826437096,622553932
> > partition 0, leader 826437096, replicas:
> 213763378,746727274,826437096,
> > isrs: 

Re: Fixing Wrecked Kafka Topic Partitions after a cluster reconfigure

2018-02-01 Thread Michal Michalski
Hey,

Changing broker IDs made existing data in ZK irrelevant - since ZK uses
broker ID to identify brokers and "associate" them with the partition
placement (where the data is for each partition), it expects your data to
be on the "old" brokers, while the ones that are connecting now are
completely new to him (new IDs). This is the main problem here, I think.

Also, changing default replication factor will not affect existing topics,
so this change doesn't matter here and it's not causing any more issues :-)

I think you may still be able to recover your data given:
a) the ZK downtime you mentioned didn't cause a data loss / corruption
(doesn't seem like, from what you wrote)
b) you still have the "old" nodes with the data in "old" directory

If that's the case, you should be able to recover the cluster to the
"previous" state by simply reverting your changes, specifically the broker
IDs and directory dir settings (if data is still in the old directory and
configuration in ZK still correctly refers to old broker IDs I can't see a
reason why it wouldn't work). Once your cluster is up and data is
available, you're good to start over again.

If you want to start from changing the data directory, simply:
1. stop Kafka process on one of the nodes
2. change data dir in config
3. move your data to the new directory
4. restart Kafka process
Repeat for other nodes.

To increase RF you have to use the bin/kafka-reassign-partitions.sh tool.
I'd suggest referring to the Confluent documentation (
https://docs.confluent.io/1.0.1/kafka/post-deployment.html#increasing-replication-factor
) for the details, as it's explained very well there. While it's a
Confluent one, this specific CLI is not Confluent Platform specific, so
will work for you as well. This may take a while depending on your data
size.

Also,* If* you started from increasing replication factor, to change your
data dir you can simply stop Kafka, change the data dir in config, delete
the data directory, and start the Kafka process again. Kafka will take care
of getting the missing data from other brokers and putting them in the new
data dir. Keep in mind that while it's one step less than what I proposed
above, this means transferring all the data through the network - if you
have a lot of data, it might be a bad idea. Also, be absolutely sure that
your data is correctly replicated - if it's not, deleting your data from
that broker means (obviously) a data loss.

And an advice to keep in mind when dealing with Kafka in general: DO NOT
CHANGE BROKER IDS for brokers with data, unless you exactly know what
you're doing and have a good reason to do it - it will save you from many
problems :-)

Kind regards,
Michał

On 1 February 2018 at 13:28, Traiano Welcome  wrote:

> Hi all,
> I reconfigured my kafka cluster, changing:
>
> -  default replication factor from 1 to 3 and also
> - changing the location of the kafka data dir on disk
>
> So after restarting all nodes, the cluster seemed ok  but then I noticed
> all the topics are failing to come online. In the logs there are messages
> like this for each topic:
>
> state-change.log: [2018-02-01 12:41:42,176] ERROR Controller 826437096
> epoch 19 initiated state change for partition [filedrop,0] from
> OfflinePartition to OnlinePartition failed (state.change.logger)
>
>
>  So none of the topics are usable; Listing topics with kafkacat -L -b shows
> leaders not availabil
>
>
> ---
> Metadata for all topics (from broker -1: lol-045:9092/bootstrap):
>  7 brokers:
>   broker 826437096 at lol-044:9092
>   broker 746155422 at lol-047:9092
>   broker 651737161 at lol-046:9092
>   broker 728512596 at lol-048:9092
>   broker 213763378 at lol-045:9092
>   broker 622553932 at lol-049:9092
>   broker 746727274 at lol-050:9092
>  14 topics:
>   topic "lol.stripped" with 3 partitions:
> partition 2, leader -1, replicas: , isrs: , Broker: Leader not
> available
> partition 1, leader -1, replicas: , isrs: , Broker: Leader not
> available
> partition 0, leader -1, replicas: , isrs: , Broker: Leader not
> available
> ---
>
>  However, newly created topics are correctly replicated and healthy
>
> ---
>   topic "lol-kafka-health" with 3 partitions:
> partition 2, leader 622553932, replicas: 622553932,213763378,651737161,
> isrs: 622553932,213763378,651737161
> partition 1, leader 213763378, replicas: 622553932,213763378,826437096,
> isrs: 213763378,826437096,622553932
> partition 0, leader 826437096, replicas: 213763378,746727274,826437096,
> isrs: 826437096,746727274,213763378
> ---
>
>  So I think some kind of metadata corruption happened during the
> reconfigure
>
>  My question is:
>
> - Is there any way I can get these topic partitions online again ?
>
> Given that:
>  - the broker ids were changed during the reconfigure
>  - the zookeeper cluster for kafka went down temporarily during the
> reconfig
>
> In addition, are there some procedures  I can use to investigate how
> recoverable 

Fixing Wrecked Kafka Topic Partitions after a cluster reconfigure

2018-02-01 Thread Traiano Welcome
Hi all,
I reconfigured my kafka cluster, changing:

-  default replication factor from 1 to 3 and also
- changing the location of the kafka data dir on disk

So after restarting all nodes, the cluster seemed ok  but then I noticed
all the topics are failing to come online. In the logs there are messages
like this for each topic:

state-change.log: [2018-02-01 12:41:42,176] ERROR Controller 826437096
epoch 19 initiated state change for partition [filedrop,0] from
OfflinePartition to OnlinePartition failed (state.change.logger)


 So none of the topics are usable; Listing topics with kafkacat -L -b shows
leaders not availabil


---
Metadata for all topics (from broker -1: lol-045:9092/bootstrap):
 7 brokers:
  broker 826437096 at lol-044:9092
  broker 746155422 at lol-047:9092
  broker 651737161 at lol-046:9092
  broker 728512596 at lol-048:9092
  broker 213763378 at lol-045:9092
  broker 622553932 at lol-049:9092
  broker 746727274 at lol-050:9092
 14 topics:
  topic "lol.stripped" with 3 partitions:
partition 2, leader -1, replicas: , isrs: , Broker: Leader not available
partition 1, leader -1, replicas: , isrs: , Broker: Leader not available
partition 0, leader -1, replicas: , isrs: , Broker: Leader not available
---

 However, newly created topics are correctly replicated and healthy

---
  topic "lol-kafka-health" with 3 partitions:
partition 2, leader 622553932, replicas: 622553932,213763378,651737161,
isrs: 622553932,213763378,651737161
partition 1, leader 213763378, replicas: 622553932,213763378,826437096,
isrs: 213763378,826437096,622553932
partition 0, leader 826437096, replicas: 213763378,746727274,826437096,
isrs: 826437096,746727274,213763378
---

 So I think some kind of metadata corruption happened during the reconfigure

 My question is:

- Is there any way I can get these topic partitions online again ?

Given that:
 - the broker ids were changed during the reconfigure
 - the zookeeper cluster for kafka went down temporarily during the
reconfig

In addition, are there some procedures  I can use to investigate how
recoverable these topics are?

Many thanks in advance!
Traiano


Re: Streams: Why is ForwardingCacheFlushListener internal? Can I replicate its functionality in my code?

2018-02-01 Thread Dmitry Minkovsky
Right, but I want to forward messages to downstream processor nodes only
when the store flushes. How does that happen automatically
when KTableSourceProcessor sets that up to happen with a TupleForwarder?

On Thu, Feb 1, 2018 at 2:59 AM, Matthias J. Sax 
wrote:

> If you build a store and enable caching, you get the KTable behavior out
> of the box. No need to write any custom code in the processor itself.
>
>
> StoreBuilder builder =
> Stores.keyValueStoreBuilder(...).withCachingEnabled();
>
> topology.addStateStore(builder, ...)
>
>
>
> -Matthias
>
> On 1/31/18 6:19 PM, Dmitry Minkovsky wrote:
> > I am writing a processor and I want its stores to behave like KTables:
> For
> > consistency, I don't want to forward values until the stores have been
> > flushed.
> >
> > I am looking at `ForwardingCacheFlushListener` and see that it is using
> > `InternalProcessorContext` to change the current node, perform a forward,
> > and then set the node back.
> >
> > Now, I could do the same (`InternalProcessorContext` is public), but:
> > should I? I'm not well versed in the internals, so I am wondering what
> the
> > ramifications of this might be. Is this okay to do? Should I?
> >
>
>


Re: RE: NoClassDefFoundError: Could not initialize class com.yammer.metrics.Metrics

2018-02-01 Thread ? ??
yes, it is. It is metrics-core-2.2.0.jar. I have checked all the brokers.


wangchunc...@outlook.com

From: TSANG, Brilly
Date: 2018-02-01 14:54
To: users; user
Subject: RE: NoClassDefFoundError: Could not initialize class 
com.yammer.metrics.Metrics
Just make sure metrics-core-x.x.x.jar is in your class path.  That jar should 
be in your /libs.

I am using kafka_2.11-1.0.0 so I don't have the exact version number of 
metrics-core for you.

Regards,
Brilly

-Original Message-
From: ? ?? [mailto:wangchunc...@outlook.com]
Sent: Thursday, February 01, 2018 2:11 PM
To: users; user
Subject: NoClassDefFoundError: Could not initialize class 
com.yammer.metrics.Metrics

Hi,

I am using KafkaSpout to ingest data into Storm. The versions are:

Storm-1.1.0
Storm-kafka 1.1.0
kafka_2.10-0.8.2.2

The program worked well at the beginning, but at some point the KafkaSpout 
threw an exception and the program seemed stuck there afterwards. The program 
can proceed after restarted but the exception recurs occasionally.

Below is the log:

java.lang.NoClassDefFoundError: Could not initialize class 
com.yammer.metrics.Metrics at 
kafka.metrics.KafkaMetricsGroup$class.newTimer(KafkaMetricsGroup.scala:85) at 
kafka.consumer.FetchRequestAndResponseMetrics.newTimer(FetchRequestAndResponseStats.scala:26)
 at 
kafka.consumer.FetchRequestAndResponseMetrics.(FetchRequestAndResponseStats.scala:35)
 at 
kafka.consumer.FetchRequestAndResponseStats.(FetchRequestAndResponseStats.scala:46)
 at 
kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:59)
 at 
kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:59)
 at kafka.utils.Pool.getAndMaybePut(Pool.scala:61) at 
kafka.consumer.FetchRequestAndResponseStatsRegistry$.getFetchRequestAndResponseStats(FetchRequestAndResponseStats.scala:63)
 at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39) at 
kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34) at 
org.apache.storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:60)
 at org.apache.storm.kafka.PartitionManager.(PartitionManager.java:96) at 
org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:108) at 
org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69)
 at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:130) at 
org.apache.storm.daemon.executor$fn__4976$fn__4991$fn__5022.invoke(executor.clj:644)
 at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) at 
clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:748)

Could anybody tell me why?


wangchunc...@outlook.com




DISCLAIMER:
This email and any attachment(s) are intended solely for the person(s) named 
above, and are or may contain information of a proprietary or confidential 
nature. If you are not the intended recipient(s), you should delete this 
message immediately. Any use, disclosure or distribution of this message 
without our prior consent is strictly prohibited.
This message may be subject to errors, incomplete or late delivery, 
interruption, interception, modification, or may contain viruses. Neither Daiwa 
Capital Markets Hong Kong Limited, its subsidiaries, affiliates nor their 
officers or employees represent or warrant the accuracy or completeness, nor 
accept any responsibility or liability whatsoever for any use of or reliance 
upon, this email or any of the contents hereof. The contents of this message 
are for information purposes only, and subject to change without notice.
This message is not and is not intended to be an offer or solicitation to buy 
or sell any securities or financial products, nor does any recommendation, 
opinion or advice necessarily reflect those of Daiwa Capital Markets Hong Kong 
Limited, its subsidiaries or affiliates.