The more I look into it, the more it seems like a Kafka bug or some cluster 
failure from which your Kafka cluster did not recover.

In your cases auto committing should be set to true and in that case 
KafkaConsumer should commit offsets once every so often when it’s polling 
messages. Unless for example `cordinatorUnknown()` returns false in 
`org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#maybeAutoCommitOffsetsAsync`
 (Kafka 0.10.2.1 code base):

private void maybeAutoCommitOffsetsAsync(long now) {
    if (autoCommitEnabled) {
        if (coordinatorUnknown()) {
            this.nextAutoCommitDeadline = now + retryBackoffMs;
        } else if (now >= nextAutoCommitDeadline) {
            this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
            doAutoCommitOffsetsAsync();
        }
    }
}

Have you checked Kafka logs? This suggests that the real problem is hidden 
behind:

>  INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - 
> Marking the coordinator 
> my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 
> rack: null) dead for group 
> aggregate-all_server_measurements_combined-20180606-1000

And maybe your Kafka cluster/consumer can not recover from this situation.

Another thing to try (simpler) is to just trying upgrading Kafka cluster.

Piotrek

> On 11 Jun 2018, at 11:44, Juho Autio <juho.au...@rovio.com> wrote:
> 
> Hi Piotr, thanks for your insights.
> 
> > What’s your KafkaConsumer configuration?
> 
> We only set these in the properties that are passed to FlinkKafkaConsumer010 
> constructor:
> 
> auto.offset.reset=latest
> bootstrap.servers=my-kafka-host:9092
> group.id <http://group.id/>=my_group
> flink.partition-discovery.interval-millis=30000
> 
> > is checkpointing enabled?
> 
> No.
> 
> > enable.auto.commit (or auto.commit.enable for Kafka 0.8) / 
> > auto.commit.interval.ms <http://auto.commit.interval.ms/>
> 
> We have whatever is the default behaviour of Flink kafka consumer. It seems 
> to commit quite often, something like every 5 seconds.
> 
> > did you set setCommitOffsetsOnCheckpoints() ?
> 
> No. But I checked with debugger that apparently 
> enableCommitOnCheckpoints=true is the default.
> 
> I also checked with debugger that offsetCommitMode=KAFKA_PERIODIC.
> 
> So I guess you're right that this bug doesn't seem to be in Flink itself? I 
> wonder if it's a known issue in Kafka client lib..
> 
> I also took thread dump on one of the task managers in this broken state. But 
> I couldn't spot anything obvious when comparing the threads to a dump from a 
> job where offsets are being committed. Any way I've saved the thread dump in 
> case there's something to look for specifically.
> 
> Sharing the full logs of job & task managers would be a bit of a hassle, 
> because I don't have an automatic way to obfuscate the logs so that I'm sure 
> that there isn't anything sensitive left. Any way, there isn't anything else 
> to share really. I wrote: "As you can see, it didn't log anything until 
> ~2018-06-07 22:08. Also that's where the log ends".
> 
> Thanks once more.
> 
> On Mon, Jun 11, 2018 at 11:18 AM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> What’s your KafkaConsumer configuration? Especially values for:
> - is checkpointing enabled?
> - enable.auto.commit (or auto.commit.enable for Kafka 0.8) / 
> auto.commit.interval.ms <http://auto.commit.interval.ms/>
> - did you set setCommitOffsetsOnCheckpoints() ?
> 
> Please also refer to 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration>
>  , especially this part:
> 
> > Note that the Flink Kafka Consumer does not rely on the committed offsets 
> > for fault tolerance guarantees. The committed offsets are only a means to 
> > expose the consumer’s progress for monitoring purposes.
> 
> Can you post full logs from all TaskManagers/JobManager and can you 
> say/estimate when did the committing brake/stop? Did you check Kafka logs for 
> any errors?
> 
> To me it seems more like a Kafka issue/bug:
> https://community.cloudera.com/t5/Data-Ingestion-Integration/Cannot-auto-commit-offsets-for-group-console-consumer-79720/m-p/51188
>  
> <https://community.cloudera.com/t5/Data-Ingestion-Integration/Cannot-auto-commit-offsets-for-group-console-consumer-79720/m-p/51188>
> https://stackoverflow.com/questions/42362911/kafka-high-level-consumer-error-code-15/42416232#42416232
>  
> <https://stackoverflow.com/questions/42362911/kafka-high-level-consumer-error-code-15/42416232#42416232>
> Especially that in your case this offsets committing is superseded by Kafka 
> coordinator failure.
> 
> Piotrek
> 
> 
>> On 8 Jun 2018, at 10:05, Juho Autio <juho.au...@rovio.com 
>> <mailto:juho.au...@rovio.com>> wrote:
>> 
>> Hi,
>> 
>> We have a Flink stream job that uses Flink kafka consumer. Normally it 
>> commits consumer offsets to Kafka.
>> 
>> However this stream ended up in a state where it's otherwise working just 
>> fine, but it isn't committing offsets to Kafka any more. The job keeps 
>> writing correct aggregation results to the sink, though. At the time of 
>> writing this, the job has been running 14 hours without committing offsets.
>> 
>> Below is an extract from taskmanager.log. As you can see, it didn't log 
>> anything until ~2018-06-07 22:08. Also that's where the log ends, these are 
>> the last lines so far.
>> 
>> Could you help check if this is a know bug, possibly already fixed, or 
>> something new?
>> 
>> I'm using a self-built Flink package 1.5-SNAPSHOT, flink commit 
>> 8395508b0401353ed07375e22882e7581d46ac0e which is not super old.
>> 
>> Cheers,
>> Juho
>> 
>> 2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser    
>>                - Kafka version : 0.10.2.1
>> 2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser    
>>                - Kafka commitId : e89bffd6b2eff799
>> 2018-06-06 10:01:33,560 INFO  
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - 
>> Discovered coordinator 
>> my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 
>> <http://my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092/> (id: 
>> 2147483550 rack: null) for group 
>> aggregate-all_server_measurements_combined-20180606-1000.
>> 2018-06-06 10:01:33,563 INFO  
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - 
>> Discovered coordinator 
>> my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 
>> <http://my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092/> (id: 
>> 2147483550 rack: null) for group 
>> aggregate-all_server_measurements_combined-20180606-1000.
>> 2018-06-07 22:08:28,773 INFO  
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
>> the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 
>> <http://my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092/> (id: 
>> 2147483550 rack: null) dead for group 
>> aggregate-all_server_measurements_combined-20180606-1000
>> 2018-06-07 22:08:28,776 WARN  
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - 
>> Auto-commit of offsets {topic1-2=OffsetAndMetadata{offset=12300395550, 
>> metadata=''}, topic1-18=OffsetAndMetadata{offset=12299210444, metadata=''}, 
>> topic3-0=OffsetAndMetadata{offset=5064277287, metadata=''}, 
>> topic4-6=OffsetAndMetadata{offset=5492398559, metadata=''}, 
>> topic2-1=OffsetAndMetadata{offset=89817267, metadata=''}, 
>> topic1-10=OffsetAndMetadata{offset=12299742352, metadata=''}} failed for 
>> group aggregate-all_server_measurements_combined-20180606-1000: Offset 
>> commit failed with a retriable exception. You should retry committing 
>> offsets.
>> 2018-06-07 22:08:29,840 INFO  
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
>> the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 
>> <http://my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092/> (id: 
>> 2147483550 rack: null) dead for group 
>> aggregate-all_server_measurements_combined-20180606-1000
>> 2018-06-07 22:08:29,841 WARN  
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - 
>> Auto-commit of offsets {topic1-6=OffsetAndMetadata{offset=12298347875, 
>> metadata=''}, topic4-2=OffsetAndMetadata{offset=5492779112, metadata=''}, 
>> topic1-14=OffsetAndMetadata{offset=12299972108, metadata=''}} failed for 
>> group aggregate-all_server_measurements_combined-20180606-1000: Offset 
>> commit failed with a retriable exception. You should retry committing 
>> offsets.
>> 
> 
> 

Reply via email to