I tried all these configurations and now like version 0.10.1.1 I see a very slow startup. I decreased the cluster to a single server which was running without any problem for a few hours. Now, each time I restart this process it gets into rebalancing state for several hours. That mean that every time we need to deploy a new version of our app (which can be several times a day) we have a down time of hours.
On Sat, May 6, 2017 at 5:13 PM, Eno Thereska <eno.there...@gmail.com> wrote: > Yeah we’ve seen cases when the session timeout might also need increasing. > Could you try upping it to something like 60000ms and let us know how it > goes: > > >> streamsProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000); > > > Thanks > Eno > > > On May 6, 2017, at 8:35 AM, Shimi Kiviti <shim...@gmail.com> wrote: > > > > Thanks Eno, > > I already set the the recurve buffer size to 1MB > > I will also try producer > > > > What about session timeout and heart beat timeout? Do you think it should > > be increased? > > > > Thanks, > > Shimi > > > > On Sat, 6 May 2017 at 0:21 Eno Thereska <eno.there...@gmail.com> wrote: > > > >> Hi Shimi, > >> > >> I’ve noticed with our benchmarks that on AWS environments with high > >> network latency the network socket buffers often need adjusting. Any > chance > >> you could add the following to your streams configuration to change the > >> default socket size bytes to a higher value (at least 1MB) and let us > know? > >> > >> private static final int SOCKET_SIZE_BYTES = 1 * 1024 * 1024; // at > least > >> 1MB > >> streamsProps.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, > SOCKET_SIZE_BYTES); > >> streamsProps.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES); > >> > >> Thanks > >> Eno > >> > >>> On May 4, 2017, at 3:45 PM, Shimi Kiviti <shim...@gmail.com> wrote: > >>> > >>> Thanks Eno, > >>> > >>> We still see problems on our side. > >>> when we run kafka-streams 0.10.1.1 eventually the problem goes away but > >>> with 0.10.2.1 it is not. > >>> We see a lot of the rebalancing messages I wrote before > >>> > >>> on at least 1 kafka-stream nodes we see disconnection messages like the > >>> following. These messages repeat all the time > >>> > >>> 2017-05-04 14:25:56,063 [StreamThread-1] INFO > >>> o.a.k.c.c.i.AbstractCoordinator: Discovered coordinator > >>> ip-10-0-91-10.ec2.internal:9092 (id: 2147483646 rack: null) for group > sa. > >>> 2017-05-04 14:25:56,063 [StreamThread-1] DEBUG o.a.k.c.NetworkClient: > >>> Initiating connection to node 2147483646 at > >> ip-10-0-91-10.ec2.internal:9092. > >>> 2017-05-04 14:25:56,091 [StreamThread-1] INFO > >>> o.a.k.c.c.i.AbstractCoordinator: (Re-)joining group sa > >>> 2017-05-04 14:25:56,093 [StreamThread-1] DEBUG > >>> o.a.k.s.p.i.StreamPartitionAssignor: stream-thread [StreamThread-1] > found > >>> [sa-events] topics possibly matching regex > >>> 2017-05-04 14:25:56,096 [StreamThread-1] DEBUG > o.a.k.s.p.TopologyBuilder: > >>> stream-thread [StreamThread-1] updating builder with > >>> SubscriptionUpdates{updatedTopicSubscriptions=[sa-events]} topic(s) > with > >> po > >>> ssible matching regex subscription(s) > >>> 2017-05-04 14:25:56,096 [StreamThread-1] DEBUG > >>> o.a.k.c.c.i.AbstractCoordinator: Sending JoinGroup ((type: > >>> JoinGroupRequest, groupId=sa, sessionTimeout=10000, > >>> rebalanceTimeout=2147483647, memb > >>> erId=, protocolType=consumer, > >>> > >> groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ > ProtocolMetadata@2f894d9b > >> )) > >>> to coordinator ip-10-0-91-10.ec2.internal:9092 (id: 2147483646 rack: > >> null) > >>> 2017-05-04 14:25:56,097 [StreamThread-1] DEBUG o.a.k.c.n.Selector: > >> Created > >>> socket with SO_RCVBUF = 1048576, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to > >> node > >>> 2147483646 > >>> 2017-05-04 14:25:56,097 [StreamThread-1] DEBUG o.a.k.c.NetworkClient: > >>> Completed connection to node 2147483646. Fetching API versions. > >>> 2017-05-04 14:25:56,097 [StreamThread-1] DEBUG o.a.k.c.NetworkClient: > >>> Initiating API versions fetch from node 2147483646. > >>> 2017-05-04 14:25:56,104 [StreamThread-1] DEBUG o.a.k.c.NetworkClient: > >>> Recorded API versions for node 2147483646: (Produce(0): 0 to 2 [usable: > >> 2], > >>> Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 to 1 [usable: 1], > >>> Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0 [usable: 0], > >>> StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 2 [usable: 2], > >>> ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 [usable: > >>> 2], OffsetFetch(9): 0 to 1 [usable: 1], GroupCoordinator(10): 0 > [usable: > >>> 0], JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0], > >>> LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0], Desc > >>> ribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0], > >>> SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0], > >>> CreateTopics(19): 0 [usable: 0], DeleteTopics(20): 0 [usable: 0]) > >>> 2017-05-04 14:29:44,800 [kafka-producer-network-thread | > >>> sa-5788b5a5-aadc-4276-916f-1640008c17da-StreamThread-1-producer] DEBUG > >>> o.a.k.c.NetworkClient: Node -2 disconnected. > >>> 2017-05-04 14:29:44,801 [kafka-producer-network-thread | > >>> sa-5788b5a5-aadc-4276-916f-1640008c17da-StreamThread-1-producer] DEBUG > >>> o.a.k.c.NetworkClient: Sending metadata request (type=MetadataR > >>> equest, topics=) to node 1 > >>> 2017-05-04 14:29:44,801 [kafka-producer-network-thread | > >>> sa-5788b5a5-aadc-4276-916f-1640008c17da-StreamThread-1-producer] DEBUG > >>> o.a.k.c.NetworkClient: Node -1 disconnected. > >>> 2017-05-04 14:29:44,802 [kafka-producer-network-thread | > >>> sa-5788b5a5-aadc-4276-916f-1640008c17da-StreamThread-1-producer] DEBUG > >>> o.a.k.c.Metadata: Updated cluster metadata version 4 to Cluster > >>> (id = JsVqjH3tS4CIcqpd2jkogA, nodes = [ip-10-0-91-10.ec2.internal:9092 > >> (id: > >>> 1 rack: null), ip-10-0-95-250.ec2.internal:9092 (id: 2 rack: null)], > >>> partitions = []) > >>> 2017-05-04 14:30:56,062 [StreamThread-1] DEBUG o.a.k.c.NetworkClient: > >>> Sending metadata request (type=MetadataRequest, topics=<ALL>) to node 2 > >>> 2017-05-04 14:30:56,073 [StreamThread-1] DEBUG o.a.k.c.Metadata: > Updated > >>> cluster metadata version 7 to Cluster(id = JsVqjH3tS4CIcqpd2jkogA, > nodes > >> = > >>> [ip-10-0-95-250.ec2.internal:9092 (id: 2 rack: null), ip-10 > >>> -0-91-10.ec2.internal:9092 (id: 1 rack: null)], partitions = > >>> [Partition(topic = sa-events, partition = 0, leader = 1, replicas = > >> [1,2], > >>> isr = [2,1]), Partition(topic = sa-events, partition = 1, lea > >>> der = 2, replicas = [1,2], isr = [2,1]), Partition(topic = sa-events, > >>> partition = 2, leader = 1, replicas = [1,2], isr = [2,1])]) > >>> 2017-05-04 14:31:06,085 [StreamThread-1] DEBUG o.a.k.c.NetworkClient: > >>> Disconnecting from node 2147483646 due to request timeout. > >>> 2017-05-04 14:31:06,086 [StreamThread-1] DEBUG > >>> o.a.k.c.c.i.ConsumerNetworkClient: Cancelled JOIN_GROUP request > >>> > >> {api_key=11,api_version=1,correlation_id=16,client_id= > sa-5788b5a5-aadc-4276-916f > >>> -1640008c17da-StreamThread-1-consumer} with correlation id 16 due to > node > >>> 2147483646 being disconnected > >>> 2017-05-04 14:31:06,086 [StreamThread-1] INFO > >>> o.a.k.c.c.i.AbstractCoordinator: Marking the coordinator > >>> ip-10-0-91-10.ec2.internal:9092 (id: 2147483646 rack: null) dead for > >> group > >>> sa > >>> 2017-05-04 14:31:06,195 [StreamThread-1] DEBUG > >>> o.a.k.c.c.i.AbstractCoordinator: Sending GroupCoordinator request for > >> group > >>> sa to broker ip-10-0-91-10.ec2.internal:9092 (id: 1 rack: null) > >>> 2017-05-04 14:31:06,196 [StreamThread-1] DEBUG o.a.k.c.NetworkClient: > >>> Sending metadata request (type=MetadataRequest, topics=<ALL>) to node 2 > >>> 2017-05-04 14:31:06,200 [StreamThread-1] DEBUG > >>> o.a.k.c.c.i.AbstractCoordinator: Received GroupCoordinator response > >>> ClientResponse(receivedTimeMs=1493908266200, latencyMs=5, > >>> disconnected=false, requestHeader= > >>> > >> {api_key=10,api_version=0,correlation_id=19,client_id= > sa-5788b5a5-aadc-4276-916f-1640008c17da-StreamThread-1-consumer}, > >>> responseBody={error_code=0,coordinator={node_id=1,host=ip > -10-0-91-10.ec > >>> 2.internal,port=9092}}) for group sa > >>> > >>> > >>> On Mon, May 1, 2017 at 4:19 PM, Eno Thereska <eno.there...@gmail.com> > >> wrote: > >>> > >>>> Hi Shimi, > >>>> > >>>> 0.10.2.1 contains a number of fixes that should make the out of box > >>>> experience better, including resiliency under broker failures and > better > >>>> exception handling. If you ever get back to it, and if the problem > >> happens > >>>> again, please do send us the logs and we'll happily have a look. > >>>> > >>>> Thanks > >>>> Eno > >>>>> On 1 May 2017, at 12:05, Shimi Kiviti <shim...@gmail.com> wrote: > >>>>> > >>>>> Hi Eno, > >>>>> I am afraid I played too much with the configuration to make this > >>>>> productive investigation :( > >>>>> > >>>>> This is a QA environment which includes 2 kafka instances and 3 > >> zookeeper > >>>>> instances in AWS. There are only 3 partition for this topic. > >>>>> Kafka broker and kafka-stream are version 0.10.1.1 > >>>>> Our kafka-stream app run on docker using kubernetes. > >>>>> I played around with with 1 to 3 kafka-stream processes, but I got > the > >>>>> same results. It is too easy to scale with kubernetes :) > >>>>> Since there are only 3 partitions, I didn't start more then 3 > >> instances. > >>>>> > >>>>> I was too quick to upgraded only the kafka-stream app to 0.10.2.1 > with > >>>> hope > >>>>> that it will solve the problem, It didn't. > >>>>> The log I sent before are from this version. > >>>>> > >>>>> I did notice "unknown" offset for the main topic with kafka-stream > >>>> version > >>>>> 0.10.2.1 > >>>>> $ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 > >>>>> --describe --group sa > >>>>> GROUP TOPIC > PARTITION > >>>>> CURRENT-OFFSET LOG-END-OFFSET LAG OWNER > >>>>> sa sa-events 0 842199 > >>>>> 842199 0 > >>>>> sa-4557bf2d-ba79-42a6-aa05-5b4c9013c022-StreamThread-1-consumer_/ > >>>> 10.0.10.9 > >>>>> sa sa-events 1 1078428 > >>>>> 1078428 0 > >>>>> sa-4557bf2d-ba79-42a6-aa05-5b4c9013c022-StreamThread-1-consumer_/ > >>>> 10.0.10.9 > >>>>> sa sa-events 2 unknown > >>>>> 26093910 unknown > >>>>> sa-4557bf2d-ba79-42a6-aa05-5b4c9013c022-StreamThread-1-consumer_/ > >>>> 10.0.10.9 > >>>>> > >>>>> After that I downgraded the kafka-stream app back to version 0.10.1.1 > >>>>> After a LONG startup time (more than an hour) where the status of the > >>>> group > >>>>> was rebalancing, all the 3 processes started processing messages > again. > >>>>> > >>>>> This all thing started after we hit a bug in our code (NPE) that > >> crashed > >>>>> the stream processing thread. > >>>>> So now after 4 days, everything is back to normal. > >>>>> This worries me since it can happen again > >>>>> > >>>>> > >>>>> On Mon, May 1, 2017 at 11:45 AM, Eno Thereska < > eno.there...@gmail.com> > >>>>> wrote: > >>>>> > >>>>>> Hi Shimi, > >>>>>> > >>>>>> Could you provide more info on your setup? How many kafka streams > >>>>>> processes do you have and from how many partitions are they > consuming > >>>> from. > >>>>>> If you have more processes than partitions some of the processes > will > >> be > >>>>>> idle and won’t do anything. > >>>>>> > >>>>>> Eno > >>>>>>> On Apr 30, 2017, at 5:58 PM, Shimi Kiviti <shim...@gmail.com> > wrote: > >>>>>>> > >>>>>>> Hi Everyone, > >>>>>>> > >>>>>>> I have a problem and I hope one of you can help me figuring it out. > >>>>>>> One of our kafka-streams processes stopped processing messages > >>>>>>> > >>>>>>> When I turn on debug log I see lots of these messages: > >>>>>>> > >>>>>>> 2017-04-30 15:42:20,228 [StreamThread-1] DEBUG o.a.k.c.c.i.Fetcher: > >>>>>> Sending > >>>>>>> fetch for partitions [devlast-changelog-2] to broker ip-x-x-x-x > >>>>>>> .ec2.internal:9092 (id: 1 rack: null) > >>>>>>> 2017-04-30 15:42:20,696 [StreamThread-1] DEBUG o.a.k.c.c.i.Fetcher: > >>>>>>> Ignoring fetched records for devlast-changelog-2 at offset 2962649 > >>>> since > >>>>>>> the current position is 2963379 > >>>>>>> > >>>>>>> After a LONG time, the only messages in the log are these: > >>>>>>> > >>>>>>> 2017-04-30 16:46:33,324 [kafka-coordinator-heartbeat-thread | sa] > >>>> DEBUG > >>>>>>> o.a.k.c.c.i.AbstractCoordinator: Sending Heartbeat request for > group > >>>> sa > >>>>>> to > >>>>>>> coordinator ip-x-x-x-x.ec2.internal:9092 (id: 2147483646 rack: > null) > >>>>>>> 2017-04-30 16:46:33,425 [kafka-coordinator-heartbeat-thread | sa] > >>>> DEBUG > >>>>>>> o.a.k.c.c.i.AbstractCoordinator: Received successful Heartbeat > >>>> response > >>>>>> for > >>>>>>> group same > >>>>>>> > >>>>>>> Any idea? > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Shimi > >>>>>> > >>>>>> > >>>> > >>>> > >> > >> > >