Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-07-11 Thread Thameem Ansari
Open files in the system was set to 10Million and users limit is 1million. When 
the process was active i was closely watching the open files and it was around 
400K so its well within the set limit. Rocksdb open files we tried setting 100, 
50, -1 but the results are same. 
I am using Rocksdb config setter and not options (not sure what it is) file. 

Quick question: Our topology has 3100 tasks with 3 brokers and 8 partitions. 
But we are using only one streaming application instance with single thread. 
Does this cause any issue? Today i am going to try with 2 or 3 instances and 
see whether we can get rid of the issue. 

Thanks
Thameem


> On Jul 12, 2019, at 1:57 AM, Sophie Blee-Goldman  wrote:
> 
> I believe the "resource temporarily unavailable" actually is related to the
> open files, most likely you are hitting the total file descriptor limit.
> Sorry if you mentioned this and I missed it, but what was the
> max.open.files in your RocksDBConfigSetter when you ran this? Actually,
> could you just include your full RocksDBConfigSetter implementation?
> 
> Not sure why Rocks would be spawning so many child processes...it does use
> background threads for compaction but this sounds like something totally
> different. I also notice a read "This is a RocksDB options file" -- are you
> trying to use an options file vs setting the options with the config setter
> (or otherwise using Rocksdb outside of Streams?)
> 
> Have you tried cleaning up the state between runs? Are you using
> interactive queries?
> 
> Regarding the .checkpoint file not found thing -- this is an annoying but
> pretty much harmless bug we only recently figured out. There's an open PR
> for it but for now you can just ignore the warning. See KAFKA-5998
>  > for a long history (but
> scroll to the bottom for the actual explanation)
> 
> 
> On Thu, Jul 4, 2019 at 11:02 AM Thameem Ansari  > wrote:
> 
>> i have few more details to share from today’s testing.
>> 
>> Attached strace to the process and noticed that there are thousands of
>> short lived child processes have been created by the stream application.
>> Not sure whether rocksdb is playing any role here. Noticed more than 73000
>> child processes were created and this is after i increased the default
>> linux system limit from 32767 to 4million per PID. Appreciate if someone
>> answer the following questions.
>> 1. Why rocksdb creates these many processes? Each process is trying to
>> read the contents of the file and getting EAGAIN (Resource temporarily
>> unavailable)
>> 2. Noticed very high number of .checkpoint files missing and hence “No
>> such file or directory” message
>> 3. The child processes were created using “clone” system call. Not sure
>> whether there is any significance of this but wanna mention. Moreover these
>> child processes were keep on increasing when the load is getting applied
>> continuously. Not sure under what condition it will hit the ceiling.
>> 4. Noticed very high VIRT memory usage (around 31G) but RES memory was
>> only 1.9G. Isn’t every file open/close consume memory? But it is not going
>> up even though the number of processes is increased.
>> 
>> Thanks
>> Thameem
>> 
>>> On Jul 4, 2019, at 9:28 AM, Thameem Ansari  wrote:
>>> 
>>> Sorry for reposting the previous message as the images didn’t come
>> thru.. pasting as text.
>>> 
>>> I have changed both system and user limits.
>>> 
>>> To completely isolate the problem, I have tried the application in
>> Centos 7 environment. Set the ulimit to 1million and system limits to
>> 10million open files. Now 3 kafka nodes are running in separate servers
>> each and streaming application is running in a dedicated VM. Now the
>> application is not explicitly throwing “too many open files” error but
>> automatically aborted with the message
>>> 
>>> terminate called after throwing an instance of 'std::system_error'
>>>  what():  Resource temporarily unavailable
>>> ./bin/start.sh: line 42:  1100 Aborted
>>> 
>>> Here is the last few lines from strace output which shows the aborted
>> message.
>>> 
>>> 25721 14:58:35
>> open("/home/devops/data/kafka-streams/RawLog_Processor/393_7/MAIL.INCOMING_FILTER_BY_USER_MAIL_INCOMING_DOMAIN_TRAFFIC_DETAIL.DETECTED_CONTENT_FILTER_RECIPIENT_DOMAIN/MAIL.INCOMING_FILTER_BY_USER_MAIL_INCOMING_DOMAIN_TRAFFIC_DETAIL.DETECTED_CONTENT_FILTER_RECIPIENT_DOMAIN.155520/MANIFEST-07",
>> O_RDONLY|O_CLOEXEC) = 12505
>>> 25721 14:58:35
>> open("/sys/devices/virtual/block/dm-2/queue/logical_block_size", O_RDONLY)
>> = 12506
>>> 25721 14:58:35 read(12506, "512\n", 4096) = 4
>>> 25721 14:58:35 close(12506) = 0
>>> 25721 14:58:35 write(12502, "s.advise_random_on_open: 0\n2019/"...,
>> 4096) = 4096
>>> 25721 14:58:35 write(12502, "ions.comparator: leveldb.Bytewis"..., 4096)
>> = 4096
>>> 25721 14:58:35 read(12505,
>> "V\371\270\370\34\0\1\1\32le

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-07-11 Thread Sophie Blee-Goldman
I believe the "resource temporarily unavailable" actually is related to the
open files, most likely you are hitting the total file descriptor limit.
Sorry if you mentioned this and I missed it, but what was the
max.open.files in your RocksDBConfigSetter when you ran this? Actually,
could you just include your full RocksDBConfigSetter implementation?

Not sure why Rocks would be spawning so many child processes...it does use
background threads for compaction but this sounds like something totally
different. I also notice a read "This is a RocksDB options file" -- are you
trying to use an options file vs setting the options with the config setter
(or otherwise using Rocksdb outside of Streams?)

 Have you tried cleaning up the state between runs? Are you using
interactive queries?

Regarding the .checkpoint file not found thing -- this is an annoying but
pretty much harmless bug we only recently figured out. There's an open PR
for it but for now you can just ignore the warning. See KAFKA-5998
 for a long history (but
scroll to the bottom for the actual explanation)


On Thu, Jul 4, 2019 at 11:02 AM Thameem Ansari  wrote:

> i have few more details to share from today’s testing.
>
> Attached strace to the process and noticed that there are thousands of
> short lived child processes have been created by the stream application.
> Not sure whether rocksdb is playing any role here. Noticed more than 73000
> child processes were created and this is after i increased the default
> linux system limit from 32767 to 4million per PID. Appreciate if someone
> answer the following questions.
> 1. Why rocksdb creates these many processes? Each process is trying to
> read the contents of the file and getting EAGAIN (Resource temporarily
> unavailable)
> 2. Noticed very high number of .checkpoint files missing and hence “No
> such file or directory” message
> 3. The child processes were created using “clone” system call. Not sure
> whether there is any significance of this but wanna mention. Moreover these
> child processes were keep on increasing when the load is getting applied
> continuously. Not sure under what condition it will hit the ceiling.
> 4. Noticed very high VIRT memory usage (around 31G) but RES memory was
> only 1.9G. Isn’t every file open/close consume memory? But it is not going
> up even though the number of processes is increased.
>
> Thanks
> Thameem
>
> > On Jul 4, 2019, at 9:28 AM, Thameem Ansari  wrote:
> >
> > Sorry for reposting the previous message as the images didn’t come
> thru.. pasting as text.
> >
> > I have changed both system and user limits.
> >
> > To completely isolate the problem, I have tried the application in
> Centos 7 environment. Set the ulimit to 1million and system limits to
> 10million open files. Now 3 kafka nodes are running in separate servers
> each and streaming application is running in a dedicated VM. Now the
> application is not explicitly throwing “too many open files” error but
> automatically aborted with the message
> >
> > terminate called after throwing an instance of 'std::system_error'
> >   what():  Resource temporarily unavailable
> > ./bin/start.sh: line 42:  1100 Aborted
> >
> > Here is the last few lines from strace output which shows the aborted
> message.
> >
> > 25721 14:58:35
> open("/home/devops/data/kafka-streams/RawLog_Processor/393_7/MAIL.INCOMING_FILTER_BY_USER_MAIL_INCOMING_DOMAIN_TRAFFIC_DETAIL.DETECTED_CONTENT_FILTER_RECIPIENT_DOMAIN/MAIL.INCOMING_FILTER_BY_USER_MAIL_INCOMING_DOMAIN_TRAFFIC_DETAIL.DETECTED_CONTENT_FILTER_RECIPIENT_DOMAIN.155520/MANIFEST-07",
> O_RDONLY|O_CLOEXEC) = 12505
> > 25721 14:58:35
> open("/sys/devices/virtual/block/dm-2/queue/logical_block_size", O_RDONLY)
> = 12506
> > 25721 14:58:35 read(12506, "512\n", 4096) = 4
> > 25721 14:58:35 close(12506) = 0
> > 25721 14:58:35 write(12502, "s.advise_random_on_open: 0\n2019/"...,
> 4096) = 4096
> > 25721 14:58:35 write(12502, "ions.comparator: leveldb.Bytewis"..., 4096)
> = 4096
> > 25721 14:58:35 read(12505,
> "V\371\270\370\34\0\1\1\32leveldb.BytewiseCompara"..., 32768) = 192
> > 25721 14:58:35 read(12505, "", 28672)   = 0
> > 25721 14:58:35 close(12505) = 0
> > 17701 14:58:35
> open("/home/devops/data/kafka-streams/RawLog_Processor/393_7/MAIL.INCOMING_FILTER_BY_USER_MAIL_INCOMING_DOMAIN_TRAFFIC_DETAIL.DETECTED_CONTENT_FILTER_RECIPIENT_DOMAIN/MAIL.INCOMING_FILTER_BY_USER_MAIL_INCOMING_DOMAIN_TRAFFIC_DETAIL.DETECTED_CONTENT_FILTER_RECIPIENT_DOMAIN.155520/06.sst",
> O_RDONLY|O_CLOEXEC) = 12505
> > 17702 14:58:35 +++ exited with 0 +++
> > 25721 14:58:35 write(2, "terminate called after throwing "..., 48) = 48
> > 25721 14:58:35 write(2, "std::system_error", 17) = 17
> > 25721 14:58:35 write(2, "'\n", 2)   = 2
> > 25721 14:58:35 write(2, "  what():  ", 11) = 11
> > 25721 14:58:35 write(2, "Resource temporarily unavailable", 32) = 32
> > 25721 14:58:35 write(2, "\n", 1)= 1
> > 177

Re: Facing memory issues with kafka streams application

2019-07-11 Thread Bill Bejeck
Thanks for reporting this Kalyani, we'll take a look.
By chance can provide log files?

Thanks,
Bill

On Mon, Jul 8, 2019 at 7:43 AM kalyani yarlagadda <
kalyani.yarlagad...@gmail.com> wrote:

> Hi,
>
> I need assistance in the below scenario. Please help me with this.
>
> I am using the hopping time window in Kafka streams with *suppress*() I am
> seeing the following memory Errors.
>
> *1. Facing the memory issue when the Kafka application is running
> continuously* for 2 to 3 days of deployment without any restart on the
> machine
>
>
>
>
>
>
>
>
>
>
>
>
> *Exception in thread
>
> "change_detection_stream-08bd427d-36fd-467a-8923-4f7bb67aa949-StreamThread-2"
> java.lang.OutOfMemoryError: Java heap spaceat
>
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:249)
>   at
>
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer$$Lambda$385/0x000800bc7440.restoreBatch(Unknown
> Source)at
>
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
>   at
>
> org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:83)
>   at
>
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:310)
>   at
>
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92)
>   at
>
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
>   at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
>   at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
>   at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)*
>
>
> we are having the following Specifications in the machine:
> RAM: 16GB
>
> *2.   /tmp Folder is filled with more memory also.*
>
>
> *Kafka Version:* *2.1.0*
>
> *I am adding the POC code below*
>
>
>
>
>
>
>
>
>
> *// define the time window as a hopping time windowTimeWindows timeWindow =
>
> TimeWindows.of(Duration.ofHours(4)).advanceBy(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1));KTable,
> MetricsTimeSeries> windowedMetricsTimeSeriesStream =
> builder.stream("metrics_ip", Consumed.with(Serdes.String(), new
> JSONSerde<>())).groupByKey().windowedBy(timeWindow).aggregate(() -> new
> MetricsTimeSeries(), /* initializer */ * //*MetricsTimeSeries*  is the
> aggregator class
>
>
>
>
>
>
>
>
>
>
> *(aggKey, newValue, aggValue) -> {aggValue.addDataPoint(newValue);return
> aggValue;}, /* adder
> */Materialized.as("windowed_aggregated_metric_store_poc_partitions")) /*
> state store name
>
> */.suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(<1GB>).shutDownWhenFull()));windowedMetricsTimeSeriesStream.toStream().map((key,
> value) -> //mapping logic goes here ).foreach(//logic to validate and
> save);*
>
> *Properties set to Kafka Streams:*
>
>
>
>
>
> *StreamsConfig.APPLICATION_ID_CONFIG -
>
> "streams_changedetection_poc_partitions"StreamsConfig.BOOTSTRAP_SERVERS_CONFIG
> - "kafka:9092"StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG -
>
> Serdes.String().getClass()StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
> - Serdes.String().getClass()StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG
> - JSONSerde.class StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS -
> JSONSerde.class*
>
> *StreamsConfig.NUM_STREAM_THREADS_CONFIG -  2*
>
>
> *StreamsConfig.PROCESSING_GUARANTEE_CONFIG -
> StreamsConfig.EXACTLY_ONCEStreamsConfig.COMMIT_INTERVAL_MS_CONFIG - 1000ms*
>
>
>
> Thanks in Advance.
>
> Kalyani Y,
> 9177982636
>


Re: kafka-streams disconnection errors

2019-07-11 Thread Bill Bejeck
Hi Javier,

Your theory could be correct, but it's hard to say exactly without looking
at some more information.
Can you provide your streams configuration and logs (both streams and
broker).

Thanks,
Bill

On Thu, Jul 11, 2019 at 2:55 AM Javier Arias Losada 
wrote:

> Hello there,
>
> I managed to fix this, but I would love to understand the why of the
> failure... hopefully some of you can explain :-)
>
> The production configuration has 4 kafka-streams threads, also there are
> about 4 instances, so its about 16 kafka-streams threads working.
>
> The production topic has 48 partitions.
>
> I wanted to debug locally our application, so since the 4 threads was
> making it too noisy, I changed the configuration to 1 kafka thread. I debug
> using a different application name (consumer-group) and connecting to the
> production broker, this is 48 partitions.
>
> So with one thread, the application starts and the KafkaStreams status goes
> to RUNNING after receiving properly the tasks assignation, etc, but after
> 30 seconds without doing anything, it starts throwing exceptions such as:
>
> Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1:
> org.apache.kafka.common.errors.DisconnectException
> and
> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
> fetching topic metadata
>
> If I change the kafka-streams configuration to 4 threads, the problem
> dissappears, so I guess is related to the single thread connecting to 48
> partitions and not being able to manage the kafka-connection management
> internals and being disconnected... but can someone explain the reason why?
>
> Thank you!
> Best,
> Javier arias losada
>


Re: Kafka Streams not fetching data from one partition

2019-07-11 Thread Bill Bejeck
Hi Piotr,

Thanks for reporting this issue.  Can you provide full kafka-streams and
broker logs around the timeframe you observed this?

-Bill

On Thu, Jul 11, 2019 at 8:53 AM Piotr Strąk  wrote:

> Hello,
>
> I'm investigating an issue in which a Kafka Streams application does not
> consume from one of the partitions it was assigned. I'm using the 2.3.0
> version.
>
> All the fetch requests are sent for two partitions only:
>
>  > Using older server API v6 to send FETCH
> {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,topics=[{topic=Topic_v2,partitions=[{partition=25,fetch_offset=71484113,log_start_offset=-1,partition_max_bytes=1048576},{partition=22,fetch_offset=71296374,log_start_offset=-1,partition_max_bytes=1048576}]}]}
>
> with correlation id 1049090 to node 1
>
> But there should be three of them; using kafka-consumer-groups.sh I can
> see that the third one (partition 23) that has no current offset.
>
>  > Topic_v222 7129637471296374 0
> Topic-c1f03-StreamThread-1-consumer
>  > Topic_v223 -  72830772 -
> Topic-c1f03-StreamThread-1-consumer
>  > Topic_v225 7148411371484113 0
> Topic-c1f03-StreamThread-1-consumer
>
> It worked fine until the partition was revoked, but absolutely nothing
> happened afterwards. Notice that the (shortened by me) clientId has
> changed.
>
>  > 2019-07-10 02:15:36.347 [INFO]
> ConsumerCoordinator:Topic-c1f03-StreamThread-1 - [Consumer
> clientId=Topic-c1f03-StreamThread-1-consumer, groupId=Topic] Setting
> offset for partition Topic_v2-23 to the committed offset
> FetchPosition{offset=68735327, offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=ip-10-0-1-81.us-west-2.compute.internal:9092
>
> (id: 3 rack: null), epoch=-1}}
>  > 2019-07-10 02:15:36.383 [INFO]
> ConsumerCoordinator:Topic-3e717-StreamThread-1 - [Consumer
> clientId=Topic-3e717-StreamThread-1-consumer, groupId=Topic] Revoking
> previously assigned partitions [Topic_v2-23]
>  > 2019-07-10 02:15:36.498 [INFO]
> ConsumerCoordinator:Topic-3e717-StreamThread-1 - [Consumer
> clientId=Topic-3e717-StreamThread-1-consumer, groupId=Topic] Setting
> newly assigned partitions: Topic_v2-23
>  > 2019-07-10 02:15:36.506 [INFO]
> ConsumerCoordinator:Topic-3e717-StreamThread-1 - [Consumer
> clientId=Topic-3e717-StreamThread-1-consumer, groupId=Topic] Setting
> offset for partition Topic_v2-23 to the committed offset
> FetchPosition{offset=70030508, offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=ip-10-0-1-81.us-west-2.compute.internal:9092
>
> (id: 3 rack: null), epoch=-1}}
>
> What could be a reason that this partition is not included in the fetch
> request? It uses a single thread to process it (num.stream.threads set
> to 1), so if the thread was locked it couldn't keep working on two other
> partitions, if I understand correctly.
>
>


Re: Group Coordinator stuck on PrepareRebalance state.

2019-07-11 Thread Thameem Ansari
In addition to the session timeout try increasing the request timeout as well. 
We had similar issue and resolved it by increasing the timeouts. As per my 
understanding, If you have complex topology then it will take some time for 
kafka brokers to create the tasks and assign them to consumers. In the mean 
time if any consumer try to join the group it will timeout due to lower timeout 
values. Increasing the timeouts will give enough time for the brokers to assign 
the tasks to consumers properly. 

request.timeout.ms=6

Thanks
Thameem


> On Jul 11, 2019, at 7:56 PM, Aravind Dongara  
> wrote:
> 
> Hi Boyang,
> 
> Thanks for the quick response.
> We are on version 2.2.0.
> 
> We are using the following properties on KStreams/consumer:
> session.timeout.ms=15000
> heartbeat.interval.ms=3000
> 
> I was wondering if a member might leak if it satisfies “shouldKeepAlive” 
> condition in "onExpireHeartbeat" and the consumer restarts or goes down right 
> after that, before next heartbeat is sent.
> 
> Thanks
> Aravind
> 
> 
>> On Jul 10, 2019, at 10:40 PM, Boyang Chen  wrote:
>> 
>> Hey Aravind,
>> 
>> If your client/broker just upgraded to 2.3, Jason has filed a blocker for
>> 2.3: https://issues.apache.org/jira/browse/KAFKA-8653
>> 
>> and a fix is on its way: https://github.com/apache/kafka/pull/7072/files
>> 
>> Let me know if you are actually on a different version.
>> 
>> Boyang
>> 
>> On Wed, Jul 10, 2019 at 7:43 PM Aravind Dongara 
>> wrote:
>> 
>>> Our kafka streams application is stuck and continuously emits
>>> "(Re-)joining group” log message every 5 minutes without making any
>>> progress.
>>> 
>>> Kafka-consumer-groups cmd line tool with “—members” option shows lots of
>>> stale members, in addition to expected member-ids shown on log msgs on
>>> kafka-streams app and broker logs that were failing to join).
>>> For some reason these old members didn’t get evicted from members list.
>>> Looks like this is preventing the GroupCordinator from reaching
>>> “CompletingRebalance” state.
>>> 
>>> Restart of Kafka streams app didn’t help either, it just replaced the
>>> newer member-ids; but the old stale member-ids are still present in the
>>> members-list.
>>> 
>>> Is there any way to resolve this without restarting the broker hosting the
>>> GroupCoordinator for this group.
>>> 
>>> Thanks
>>> Aravind
> 



Re: Topic migration patterns

2019-07-11 Thread Mich Talebzadeh
I don't know any other way that manually synching zookeeper ensembles plus
brokers in the new world :(

In my example below, I have three node Zookeeper ensemble with 9 Kafka
brokers



[image: image.png]

First of all the zookeeper conf file under $ZOOKEEPER_HOME/conf/zoo.cfg has
to match
tickTime=2000
dataDir=/data6/hduser/zookeeper/zookeeper
nitLimit=5
syncLimit=2
# the port at which the clients will connect
clientPort=2181
server.1=rhes75:2888:3888
server.2=rhes564:2888:3888
server.3=rhes76:2888:3888 Those server entries need to reflect new
environment
Also anything under dataDir  needs to be synched for each zookeeper

For Kafka I have three brokers on each host. under
directory $KAFKA_HOME/config, there are three server properties file. You
need to copy these to the new environment and modify them accordingly. By
default there is one log partition for topic

The log directory for each properties file needs to be copied across and
kept in sync

Example, I have three of them

server1.properties:log.dirs=/tmp/kafka-logs-1
server2.properties:log.dirs=/tmp/kafka-logs-2
server3.properties:log.dirs=/tmp/kafka-logs-3

I have not done this before but I guess if you have some clustering and
keep-in-sync mechanism, then it is possible to switch over. Otherwise a
downtime will be required. Please also note that running
$ZOOKEEPER_HOME/bin/zkCli.sh

 $ZOOKEEPER_HOME/bin/zkCli.sh
Connecting to localhost:2181
Welcome to ZooKeeper!
[zk: localhost:2181(CONNECTED) 0] ls /
[cluster, controller_epoch, controller, brokers, zookeeper, admin,
isr_change_notification, consumers, log_dir_event_notification,
latest_producer_id_block, config, hbase]
[zk: localhost:2181(CONNECTED) 1] ls /brokers/topics
[mdout, final, test, __consumer_offsets, md]

you can get list of topics etc.

I think it will be involved unless someone else has better ideas.

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 11 Jul 2019 at 14:00, Elliot West  wrote:

> Thanks for your reply.
>
> Yes, you can assume that nothing is shared between clusters.
>
> There is no specific topic. I'm trying to establish general patterns that
> can be applied to achieve this.
>
> Thanks,
>
> Elliot.
>
> On Thu, 11 Jul 2019 at 10:50, Mich Talebzadeh 
> wrote:
>
> > Hi Elliot,
> >
> > As you are moving the topic from one cluster to another, I assume it
> > implies a new zookeeper ensemble plus sets of new brokers?
> >
> > Can you describe the current topic?
> >
> > ${KAFKA_HOME}/bin/kafka-topics.sh --describe --zookeeper ,
> > ,  --topic 
> >
> > HTH
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn *
> >
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> > <
> >
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> > >*
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
> > *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> > loss, damage or destruction of data or any other property which may arise
> > from relying on this email's technical content is explicitly disclaimed.
> > The author will in no case be liable for any monetary damages arising
> from
> > such loss, damage or destruction.
> >
> >
> >
> >
> > On Thu, 11 Jul 2019 at 09:50, Elliot West  wrote:
> >
> > > Hello,
> > >
> > > I like to understand what strategies can be applied to migrate events,
> > > producers, and consumers from one topic to another. Typically I'm
> > thinking
> > > of cases where:
> > >
> > >
> > >- we wish to migrate a topic from one cluster to another
> > >- high availability - minimise amount of time topic is unavailable
> to
> > >producers/consumers
> > >- no data loss (at least once)
> > >- avoid excessive latency (keep events flowing - no 'stopping the
> > > world')
> > >- preserve event ordering wrt to partitions
> > >- there is a significant retention interval (days) that we wish to
> > move
> > >to the destination topic also
> > >
> > >
> > > I can imagine that tools such as MM1+2, Replicator, etc. are useful in
> > > these circumstances. However, I'm more interested in orchestration
> > > patterns: what things need to be moved, in what order, and subject to
> > what
> > > conditions?
> > >
> > > Thanks for your time,
> > >
> > > Elliot.
> > >
> >
>


Re: Group Coordinator stuck on PrepareRebalance state.

2019-07-11 Thread Aravind Dongara
Hi Boyang,

Thanks for the quick response.
We are on version 2.2.0.

We are using the following properties on KStreams/consumer:
session.timeout.ms=15000
heartbeat.interval.ms=3000

I was wondering if a member might leak if it satisfies “shouldKeepAlive” 
condition in "onExpireHeartbeat" and the consumer restarts or goes down right 
after that, before next heartbeat is sent.

Thanks
Aravind


> On Jul 10, 2019, at 10:40 PM, Boyang Chen  wrote:
> 
> Hey Aravind,
> 
> If your client/broker just upgraded to 2.3, Jason has filed a blocker for
> 2.3: https://issues.apache.org/jira/browse/KAFKA-8653
> 
> and a fix is on its way: https://github.com/apache/kafka/pull/7072/files
> 
> Let me know if you are actually on a different version.
> 
> Boyang
> 
> On Wed, Jul 10, 2019 at 7:43 PM Aravind Dongara 
> wrote:
> 
>> Our kafka streams application is stuck and continuously emits
>> "(Re-)joining group” log message every 5 minutes without making any
>> progress.
>> 
>> Kafka-consumer-groups cmd line tool with “—members” option shows lots of
>> stale members, in addition to expected member-ids shown on log msgs on
>> kafka-streams app and broker logs that were failing to join).
>> For some reason these old members didn’t get evicted from members list.
>> Looks like this is preventing the GroupCordinator from reaching
>> “CompletingRebalance” state.
>> 
>> Restart of Kafka streams app didn’t help either, it just replaced the
>> newer member-ids; but the old stale member-ids are still present in the
>> members-list.
>> 
>> Is there any way to resolve this without restarting the broker hosting the
>> GroupCoordinator for this group.
>> 
>> Thanks
>> Aravind



Re: Topic migration patterns

2019-07-11 Thread Elliot West
Thanks for your reply.

Yes, you can assume that nothing is shared between clusters.

There is no specific topic. I'm trying to establish general patterns that
can be applied to achieve this.

Thanks,

Elliot.

On Thu, 11 Jul 2019 at 10:50, Mich Talebzadeh 
wrote:

> Hi Elliot,
>
> As you are moving the topic from one cluster to another, I assume it
> implies a new zookeeper ensemble plus sets of new brokers?
>
> Can you describe the current topic?
>
> ${KAFKA_HOME}/bin/kafka-topics.sh --describe --zookeeper ,
> ,  --topic 
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn *
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 11 Jul 2019 at 09:50, Elliot West  wrote:
>
> > Hello,
> >
> > I like to understand what strategies can be applied to migrate events,
> > producers, and consumers from one topic to another. Typically I'm
> thinking
> > of cases where:
> >
> >
> >- we wish to migrate a topic from one cluster to another
> >- high availability - minimise amount of time topic is unavailable to
> >producers/consumers
> >- no data loss (at least once)
> >- avoid excessive latency (keep events flowing - no 'stopping the
> > world')
> >- preserve event ordering wrt to partitions
> >- there is a significant retention interval (days) that we wish to
> move
> >to the destination topic also
> >
> >
> > I can imagine that tools such as MM1+2, Replicator, etc. are useful in
> > these circumstances. However, I'm more interested in orchestration
> > patterns: what things need to be moved, in what order, and subject to
> what
> > conditions?
> >
> > Thanks for your time,
> >
> > Elliot.
> >
>


Kafka Streams not fetching data from one partition

2019-07-11 Thread Piotr Strąk

Hello,

I'm investigating an issue in which a Kafka Streams application does not 
consume from one of the partitions it was assigned. I'm using the 2.3.0 
version.


All the fetch requests are sent for two partitions only:

> Using older server API v6 to send FETCH 
{replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,topics=[{topic=Topic_v2,partitions=[{partition=25,fetch_offset=71484113,log_start_offset=-1,partition_max_bytes=1048576},{partition=22,fetch_offset=71296374,log_start_offset=-1,partition_max_bytes=1048576}]}]} 
with correlation id 1049090 to node 1


But there should be three of them; using kafka-consumer-groups.sh I can 
see that the third one (partition 23) that has no current offset.


> Topic_v2    22 71296374    71296374 0 
Topic-c1f03-StreamThread-1-consumer
> Topic_v2    23 -          72830772 -  
Topic-c1f03-StreamThread-1-consumer
> Topic_v2    25 71484113    71484113 0 
Topic-c1f03-StreamThread-1-consumer


It worked fine until the partition was revoked, but absolutely nothing 
happened afterwards. Notice that the (shortened by me) clientId has 
changed.


> 2019-07-10 02:15:36.347 [INFO] 
ConsumerCoordinator:Topic-c1f03-StreamThread-1 - [Consumer 
clientId=Topic-c1f03-StreamThread-1-consumer, groupId=Topic] Setting 
offset for partition Topic_v2-23 to the committed offset 
FetchPosition{offset=68735327, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=ip-10-0-1-81.us-west-2.compute.internal:9092 
(id: 3 rack: null), epoch=-1}}
> 2019-07-10 02:15:36.383 [INFO] 
ConsumerCoordinator:Topic-3e717-StreamThread-1 - [Consumer 
clientId=Topic-3e717-StreamThread-1-consumer, groupId=Topic] Revoking 
previously assigned partitions [Topic_v2-23]
> 2019-07-10 02:15:36.498 [INFO] 
ConsumerCoordinator:Topic-3e717-StreamThread-1 - [Consumer 
clientId=Topic-3e717-StreamThread-1-consumer, groupId=Topic] Setting 
newly assigned partitions: Topic_v2-23
> 2019-07-10 02:15:36.506 [INFO] 
ConsumerCoordinator:Topic-3e717-StreamThread-1 - [Consumer 
clientId=Topic-3e717-StreamThread-1-consumer, groupId=Topic] Setting 
offset for partition Topic_v2-23 to the committed offset 
FetchPosition{offset=70030508, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=ip-10-0-1-81.us-west-2.compute.internal:9092 
(id: 3 rack: null), epoch=-1}}


What could be a reason that this partition is not included in the fetch 
request? It uses a single thread to process it (num.stream.threads set 
to 1), so if the thread was locked it couldn't keep working on two other 
partitions, if I understand correctly.




Re: Topic migration patterns

2019-07-11 Thread Mich Talebzadeh
Hi Elliot,

As you are moving the topic from one cluster to another, I assume it
implies a new zookeeper ensemble plus sets of new brokers?

Can you describe the current topic?

${KAFKA_HOME}/bin/kafka-topics.sh --describe --zookeeper ,
,  --topic 

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 11 Jul 2019 at 09:50, Elliot West  wrote:

> Hello,
>
> I like to understand what strategies can be applied to migrate events,
> producers, and consumers from one topic to another. Typically I'm thinking
> of cases where:
>
>
>- we wish to migrate a topic from one cluster to another
>- high availability - minimise amount of time topic is unavailable to
>producers/consumers
>- no data loss (at least once)
>- avoid excessive latency (keep events flowing - no 'stopping the
> world')
>- preserve event ordering wrt to partitions
>- there is a significant retention interval (days) that we wish to move
>to the destination topic also
>
>
> I can imagine that tools such as MM1+2, Replicator, etc. are useful in
> these circumstances. However, I'm more interested in orchestration
> patterns: what things need to be moved, in what order, and subject to what
> conditions?
>
> Thanks for your time,
>
> Elliot.
>


Topic migration patterns

2019-07-11 Thread Elliot West
Hello,

I like to understand what strategies can be applied to migrate events,
producers, and consumers from one topic to another. Typically I'm thinking
of cases where:


   - we wish to migrate a topic from one cluster to another
   - high availability - minimise amount of time topic is unavailable to
   producers/consumers
   - no data loss (at least once)
   - avoid excessive latency (keep events flowing - no 'stopping the world')
   - preserve event ordering wrt to partitions
   - there is a significant retention interval (days) that we wish to move
   to the destination topic also


I can imagine that tools such as MM1+2, Replicator, etc. are useful in
these circumstances. However, I'm more interested in orchestration
patterns: what things need to be moved, in what order, and subject to what
conditions?

Thanks for your time,

Elliot.