Does SinkTaskContext.timeout recall the connector's put method with the original collection of sink records?

2017-09-10 Thread Behrang Saeedzadeh
Hi,

If a connector encounters a temporary error (e.g. exceeding throughput) and
it calls the timeout(long) method, would it get passed the same set of
records that had caused the call to the timeout(long) method or should it
internally keep track of these records?

Best regards,
Behrang Saeedzadeh


Kafka streams application failed after a long time due to rocks db errors

2017-09-10 Thread Sachin Mittal
Hi,
We have been running a clustered kafka streams application and say after 3
months or so of uninterrupted running few threads of couple of instances
failed.
We checked the logs and we found these two common stack traces pointing to
underlying cause of fetch and put operations of rocksdb.

Cause 1 - flush
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error
while executing flush from store key-table-201709080400
at
org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:354)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:345)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.state.internals.Segments.flush(Segments.java:134)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.flush(RocksDBSegmentedBytesStore.java:114)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.flush(MeteredSegmentedBytesStore.java:111)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.flush(RocksDBWindowStore.java:91)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:323)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
...
Caused by: org.rocksdb.RocksDBException:
at org.rocksdb.RocksDB.flush(Native Method) ~[rocksdbjni-5.0.1.jar:na]
at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
~[rocksdbjni-5.0.1.jar:na]
at
org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:352)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
...

Cause 2 - put
ERROR 2017-09-08 09:40:47,305 [StreamThread-1]:
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error
while executing put key  and value [...] from store
key-table-201709080410
at
org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:257)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:232)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:74)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:54)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:101)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:112)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
...
Caused by: org.rocksdb.RocksDBException:
at org.rocksdb.RocksDB.put(Native Method) ~[rocksdbjni-5.0.1.jar:na]

Kafka Connect sink connector in distributed mode: how are records distributed to workers?

2017-09-10 Thread Behrang Saeedzadeh
Hi,

How does Kafka Connect distribute records between workers for a sink
connector when the connector is only configured to get data from one topic?

* Does it ensure all records in a given partition are sent to the same
worker instance?
* When a new worker is added to the cluster, what steps are taken by
Connect? In particular, if Connect unassigns some partitions from the given
workers and assigns them to this new worker, would it wait for all
in-flight records belonging to these partitions to be processed by the
existing workers?

Best regards,
Behrang Saeedzadeh


Re: Kafka 11 | Stream Application crashed the brokers

2017-09-10 Thread Sameer Kumar
Hi Guozhang,

Nope, I was not using exactly-once mode. I dont have the client logs with
me right now, I will try to replicate it again and share the other details
with you.

My concern was that it crashed my brokers as well.

-Sameer.

On Sat, Sep 9, 2017 at 1:51 AM, Guozhang Wang  wrote:

> Hello Sameer,
>
> I looked through your code, and here is what I figured: in 0.11 version we
> added the exactly-once feature (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> )
>
> Which uses the transaction log (internal topic named "__transaction_state")
> that has a default replication of 3 (that will overwrite your global config
> value of 2). Then at around 12:30, the leader of the transation log
> partition kicked both replicas of 190 and 192 out of the replica:
>
> [2017-09-05 12:30:31,256] INFO [GroupCoordinator 193]: Preparing to
> rebalance group KafkaCache_TEST15 with old generation 14
> (__consumer_offsets-27) (kafka.coordinator.group.GroupCoordinator)
>
> *[2017-09-05 12:30:41,510] INFO Partition [__transaction_state,9] on broker
> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>
> *[2017-09-05 12:30:41,513] INFO Partition [__transaction_state,6] on broker
> 193: Shrinking ISR from 193,190,192 to 193 (kafka.cluster.Partition)*
>
> *[2017-09-05 12:30:41,514] INFO Partition [__transaction_state,3] on broker
> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>
> *[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,18] on
> broker 193: Shrinking ISR from 193,190,192 to 193
> (kafka.cluster.Partition)*
>
> *[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,15] on
> broker 193: Shrinking ISR from 193,192,190 to 193
> (kafka.cluster.Partition)*
>
> *[2017-09-05 12:30:41,516] INFO Partition [__transaction_state,12] on
> broker 193: Shrinking ISR from 193,190,192 to 193
> (kafka.cluster.Partition)*
>
> *[2017-09-05 12:30:41,516] INFO Partition [__consumer_offsets,12] on broker
> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>
> *[2017-09-05 12:30:41,517] INFO Partition [__consumer_offsets,15] on broker
> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>
> *[2017-09-05 12:30:41,517] INFO Partition [__transaction_state,24] on
> broker 193: Shrinking ISR from 193,190,192 to 193
> (kafka.cluster.Partition)*
>
> At the mean time, both replicas of 190 and 192 seems to be timed out on
> their fetch requests (note the big timestamp gap in the logs):
>
> [2017-09-05 12:26:21,130] INFO Rolled new log segment for 'AdServe-4' in 1
> ms. (kafka.log.Log)
> [2017-09-05 12:30:59,046] WARN [ReplicaFetcherThread-2-193]: Error in fetch
> to broker 193, request (type=FetchRequest, replicaId=190, maxWait=500,
> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-21=(offset=0,
> logStartOffset=0, maxBytes=1048576)
>
> ...
>
> [2017-09-05 12:28:37,514] INFO Deleting index
> /data1/kafka/AdServe-5/000405000294.timeindex.deleted
> (kafka.log.TimeIndex)
> [2017-09-05 12:30:59,042] WARN [ReplicaFetcherThread-2-193]: Error in fetch
> to broker 193, request (type=FetchRequest, replicaId=192, maxWait=500,
> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-21=(offset=0,
> logStartOffset=0, maxBytes=1048576)
>
>
>
> This caused the NotEnoughReplicasException since any appends to the
> transaction logs are required "acks=all, and min.isr=num.replicas".
>
> *[2017-09-05 12:32:11,612] ERROR [Replica Manager on Broker 193]: Error
> processing append operation on partition __transaction_state-18
> (kafka.server.ReplicaManager)*
>
> *org.apache.kafka.common.errors.NotEnoughReplicasException: Number of
> insync replicas for partition __transaction_state-18 is [1], below required
> minimum [3]*
>
> Upon seeing this error, the transaction coordinator should retry appending,
> but if the retry never succeeds it will be blocked. I did not see the
> Streams API client-side logs and so cannot tell for sure, why this caused
> the Streams app to fail as well. A quick question: did you enable
> `processing.mode=exactly-once` on your streams app?
>
>
> Guozhang
>
>
>
>
> On Fri, Sep 8, 2017 at 1:34 AM, Sameer Kumar 
> wrote:
>
> > Hi All,
> >
> >
> > Any thoughts on the below mail.
> >
> > -Sameer.
> >
> > On Wed, Sep 6, 2017 at 12:28 PM, Sameer Kumar 
> > wrote:
> >
> > > Hi All,
> > >
> > > I want to report a scenario wherein my running 2 different instances of
> > my
> > > stream application caused my brokers to crash and eventually my stream
> > > application as well. This scenario only happens when my brokers run on
> > > Kafka11, everything works fine if my brokers are on Kafka 10..2 and
> > stream
> > > application on Kafka11.
> > >
> > > I am attaching herewith the logs in a zipped format.
> > >
> > > The cluster configuration
> > > 3 nodes(190,192,193) , Kafka 11
> > > Topic Replication Factor - 2
> > >
> > > App configuration
> > > Kafka 11 stream