Hi Mahesh, this is a known limitation of Apache Kafka: https://www.mail-archive.com/users@kafka.apache.org/msg22595.html You could implement a tool that is manually retrieving the latest offset for the group from the __offsets topic.
On Tue, Feb 7, 2017 at 6:24 PM, MAHESH KUMAR <r.mahesh.kumar....@gmail.com> wrote: > Hi Team, > > Kindly let me know if I am doing something wrong. > > Kafka Version - kafka_2.11-0.10.1.1 > Flink Version - flink-1.2.0 > Using the latest Kafka Connector - FlinkKafkaConsumer010 - > flink-connector-kafka-0.10_2.11 > > Issue Faced: Not able to get the consumer offsets from Kafka when using > Flink with Flink-Kafka Connector > > $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh > --bootstrap-server localhost:9092 --list > console-consumer-19886 > console-consumer-89637 > $ > > It does not show the consumer group "test" > > For a group that does not exist, the message is as follows: > > $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh > --bootstrap-server localhost:9092 --group test1 --describe > Consumer group `test1` does not exist. > $ > > For the "test" group the error message is as follows > > $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh > --bootstrap-server localhost:9092 --group test --describe > Error while executing consumer group command Group test with protocol type > '' is not a valid consumer group > java.lang.IllegalArgumentException: Group test with protocol type '' is > not a valid consumer group > at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:152) > at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.d > escribeGroup(ConsumerGroupCommand.scala:308) > at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class. > describe(ConsumerGroupCommand.scala:89) > at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.d > escribe(ConsumerGroupCommand.scala:296) > at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:68) > at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala) > $ > > The error is from the AdminClient.scala (https://github.com/apache/kaf > ka/blob/trunk/core/src/main/scala/kafka/admin/AdminClient.scala) > > if (metadata.state != "Dead" && metadata.state != "Empty" && > metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE) > throw new IllegalArgumentException(s"Consumer Group $groupId with > protocol type '${metadata.protocolType}' is not a valid consumer group") > > Code: > > import java.util.Properties; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import org.apache.flink.streaming.api.environment.StreamExecutionEn > vironment; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; > import org.apache.flink.streaming.util.serialization.SimpleStringSchema; > > public class KafkaFlinkOutput { > private static final String LOCAL_ZOOKEEPER_HOST = "localhost:2181"; > private static final String LOCAL_KAFKA_BROKER = "localhost:9092"; > private static final String CONSUMER_GROUP = "test"; > > public KafkaFlinkOutput() { > } > > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = StreamExecutionEnvironment.get > ExecutionEnvironment(); > Properties kafkaProps = new Properties(); > kafkaProps.setProperty("zookeeper.connect", "localhost:2181"); > kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); > kafkaProps.setProperty("group.id", "test"); > kafkaProps.setProperty("auto.offset.reset", "latest"); > env.enableCheckpointing(1000L); > FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010("testIn1", > new SimpleStringSchema(), kafkaProps); > DataStreamSource consumerData = env.addSource(consumer); > consumerData.print(); > System.out.println("Streaming Kafka in Flink"); > env.execute("Starting now!"); > } > } > > Debug Logs that show that Kafka Connector does commit to Kafka: > > 2017-02-07 09:52:38,851 INFO org.apache.kafka.clients.consumer.ConsumerConfig > - ConsumerConfig values: > metric.reporters = [] > metadata.max.age.ms = 300000 > partition.assignment.strategy = [org.apache.kafka.clients.cons > umer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 1048576 > bootstrap.servers = [localhost:9092] > ssl.keystore.type = JKS > enable.auto.commit = true > sasl.mechanism = GSSAPI > interceptor.classes = null > exclude.internal.topics = true > ssl.truststore.password = null > client.id = > ssl.endpoint.identification.algorithm = null > max.poll.records = 2147483647 <02147%20483%20647> > check.crcs = true > request.timeout.ms = 40000 > heartbeat.interval.ms = 3000 > auto.commit.interval.ms = 5000 > receive.buffer.bytes = 65536 > ssl.truststore.type = JKS > ssl.truststore.location = null > ssl.keystore.password = null > fetch.min.bytes = 1 > send.buffer.bytes = 131072 > value.deserializer = class org.apache.kafka.common.serial > ization.ByteArrayDeserializer > group.id = test > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.trustmanager.algorithm = PKIX > ssl.key.password = null > fetch.max.wait.ms = 500 > sasl.kerberos.min.time.before.relogin = 60000 > connections.max.idle.ms = 540000 > session.timeout.ms = 30000 > metrics.num.samples = 2 > key.deserializer = class org.apache.kafka.common.serial > ization.ByteArrayDeserializer > ssl.protocol = TLS > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.keystore.location = null > ssl.cipher.suites = null > security.protocol = PLAINTEXT > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 30000 > auto.offset.reset = latest > > > 2017-02-07 09:53:38,524 DEBUG org.apache.flink.runtime.taskmanager.TaskManager > - Sending heartbeat to JobManager > 2017-02-07 09:53:38,731 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Triggering checkpoint 12 @ 1486486418731 > 2017-02-07 09:53:38,731 DEBUG org.apache.flink.runtime.taskmanager.TaskManager > - Receiver TriggerCheckpoint 12@1486486418731 for > cfe59bc4aadc96e5a2235581460e9f3d. > 2017-02-07 09:53:38,731 DEBUG org.apache.flink.runtime.taskmanager.Task > - Invoking async call Checkpoint Trigger for Source: > Custom Source -> Sink: Unnamed (1/4) (cfe59bc4aadc96e5a2235581460e9f3d). > on task Source: Custom Source -> Sink: Unnamed (1/4) > 2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.TaskManager > - Receiver TriggerCheckpoint 12@1486486418731 for > 5611d20817d9a49680117c9ab000116c. > 2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.Task > - Invoking async call Checkpoint Trigger for Source: > Custom Source -> Sink: Unnamed (2/4) (5611d20817d9a49680117c9ab000116c). > on task Source: Custom Source -> Sink: Unnamed (2/4) > 2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.TaskManager > - Receiver TriggerCheckpoint 12@1486486418731 for > 58e5bc1040fc99f8d3f0a32c2bd524b6. > 2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.Task > - Invoking async call Checkpoint Trigger for Source: > Custom Source -> Sink: Unnamed (3/4) (58e5bc1040fc99f8d3f0a32c2bd524b6). > on task Source: Custom Source -> Sink: Unnamed (3/4) > 2017-02-07 09:53:38,732 INFO org.apache.flink.core.fs.FileSystem > - Created new CloseableRegistry > org.apache.flink.core.fs.SafetyNetCloseableRegistry@26590268 for Async > calls on Source: Custom Source -> Sink: Unnamed (1/4) > 2017-02-07 09:53:38,732 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Starting checkpoint 12 on task Source: Custom Source -> Sink: > Unnamed (1/4) > 2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.TaskManager > - Receiver TriggerCheckpoint 12@1486486418731 for > 95ad7256919d3296b37d17693cd0ba71. > 2017-02-07 09:53:38,732 DEBUG org.apache.flink.runtime.taskmanager.Task > - Invoking async call Checkpoint Trigger for Source: > Custom Source -> Sink: Unnamed (4/4) (95ad7256919d3296b37d17693cd0ba71). > on task Source: Custom Source -> Sink: Unnamed (4/4) > 2017-02-07 09:53:38,732 INFO org.apache.flink.core.fs.FileSystem > - Created new CloseableRegistry > org.apache.flink.core.fs.SafetyNetCloseableRegistry@2b3c1766 for Async > calls on Source: Custom Source -> Sink: Unnamed (2/4) > 2017-02-07 09:53:38,732 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Starting checkpoint 12 on task Source: Custom Source -> Sink: > Unnamed (2/4) > 2017-02-07 09:53:38,732 INFO org.apache.flink.core.fs.FileSystem > - Created new CloseableRegistry > org.apache.flink.core.fs.SafetyNetCloseableRegistry@2c4eb75a for Async > calls on Source: Custom Source -> Sink: Unnamed (4/4) > 2017-02-07 09:53:38,732 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Starting checkpoint 12 on task Source: Custom Source -> Sink: > Unnamed (4/4) > 2017-02-07 09:53:38,732 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Finished synchronous checkpoints for checkpoint 12 on task > Source: Custom Source -> Sink: Unnamed (2/4) > 2017-02-07 09:53:38,732 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Source: Custom Source -> Sink: Unnamed (2/4) - finished > synchronous part of checkpoint 12.Alignment duration: 0 ms, snapshot > duration 0 ms > 2017-02-07 09:53:38,732 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Finished synchronous checkpoints for checkpoint 12 on task > Source: Custom Source -> Sink: Unnamed (1/4) > 2017-02-07 09:53:38,732 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Async > calls on Source: Custom Source -> Sink: Unnamed (2/4) > 2017-02-07 09:53:38,732 INFO org.apache.flink.core.fs.FileSystem > - Created new CloseableRegistry > org.apache.flink.core.fs.SafetyNetCloseableRegistry@b0b9bf0 for Async > calls on Source: Custom Source -> Sink: Unnamed (3/4) > 2017-02-07 09:53:38,733 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Starting checkpoint 12 on task Source: Custom Source -> Sink: > Unnamed (3/4) > 2017-02-07 09:53:38,733 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Source: Custom Source -> Sink: Unnamed (1/4) - finished > asynchronous part of checkpoint 12. Asynchronous duration: 0 ms > 2017-02-07 09:53:38,732 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Finished synchronous checkpoints for checkpoint 12 on task > Source: Custom Source -> Sink: Unnamed (4/4) > 2017-02-07 09:53:38,733 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Finished synchronous checkpoints for checkpoint 12 on task > Source: Custom Source -> Sink: Unnamed (3/4) > 2017-02-07 09:53:38,733 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Source: Custom Source -> Sink: Unnamed (3/4) - finished > synchronous part of checkpoint 12.Alignment duration: 0 ms, snapshot > duration 0 ms > 2017-02-07 09:53:38,733 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Async > calls on Source: Custom Source -> Sink: Unnamed (3/4) > 2017-02-07 09:53:38,733 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Source: Custom Source -> Sink: Unnamed (2/4) - finished > asynchronous part of checkpoint 12. Asynchronous duration: 0 ms > 2017-02-07 09:53:38,733 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Source: Custom Source -> Sink: Unnamed (3/4) - finished > asynchronous part of checkpoint 12. Asynchronous duration: 0 ms > 2017-02-07 09:53:38,732 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Source: Custom Source -> Sink: Unnamed (1/4) - finished > synchronous part of checkpoint 12.Alignment duration: 0 ms, snapshot > duration 0 ms > 2017-02-07 09:53:38,733 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Async > calls on Source: Custom Source -> Sink: Unnamed (1/4) > 2017-02-07 09:53:38,733 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Source: Custom Source -> Sink: Unnamed (4/4) - finished > asynchronous part of checkpoint 12. Asynchronous duration: 0 ms > 2017-02-07 09:53:38,733 DEBUG > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Received acknowledge message for checkpoint 12 from task > 5611d20817d9a49680117c9ab000116c of job ef5998a3d1cbba24cb1790564f3037bf. > 2017-02-07 09:53:38,733 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Source: Custom Source -> Sink: Unnamed (4/4) - finished > synchronous part of checkpoint 12.Alignment duration: 0 ms, snapshot > duration 0 ms > 2017-02-07 09:53:38,734 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Async > calls on Source: Custom Source -> Sink: Unnamed (4/4) > 2017-02-07 09:53:38,734 DEBUG > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Received acknowledge message for checkpoint 12 from task > 58e5bc1040fc99f8d3f0a32c2bd524b6 of job ef5998a3d1cbba24cb1790564f3037bf. > 2017-02-07 09:53:38,734 DEBUG > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Received acknowledge message for checkpoint 12 from task > 95ad7256919d3296b37d17693cd0ba71 of job ef5998a3d1cbba24cb1790564f3037bf. > 2017-02-07 09:53:38,734 DEBUG > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Received acknowledge message for checkpoint 12 from task > cfe59bc4aadc96e5a2235581460e9f3d of job ef5998a3d1cbba24cb1790564f3037bf. > 2017-02-07 09:53:38,734 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Completed checkpoint 12 (4490 bytes in 3 ms). > 2017-02-07 09:53:38,734 DEBUG > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Checkpoint state: TaskState(jobVertexID: > cbc357ccb763df2852fee8c4fc7d55f2, parallelism: 4, sub task states: 4, > total size (bytes): 4490) > 2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.TaskManager > - Receiver ConfirmCheckpoint 12@1486486418731 for > cfe59bc4aadc96e5a2235581460e9f3d. > 2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.Task > - Invoking async call Checkpoint Confirmation for Source: > Custom Source -> Sink: Unnamed (1/4) on task Source: Custom Source -> Sink: > Unnamed (1/4) > 2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.TaskManager > - Receiver ConfirmCheckpoint 12@1486486418731 for > 5611d20817d9a49680117c9ab000116c. > 2017-02-07 09:53:38,734 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Notification of complete checkpoint for task Source: Custom > Source -> Sink: Unnamed (1/4) > 2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.Task > - Invoking async call Checkpoint Confirmation for Source: > Custom Source -> Sink: Unnamed (2/4) on task Source: Custom Source -> Sink: > Unnamed (2/4) > 2017-02-07 09:53:38,734 DEBUG org.apache.flink.streaming.con > nectors.kafka.FlinkKafkaConsumerBase - Committing offsets to > Kafka/ZooKeeper for checkpoint 12 > 2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.TaskManager > - Receiver ConfirmCheckpoint 12@1486486418731 for > 58e5bc1040fc99f8d3f0a32c2bd524b6. > 2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.Task > - Invoking async call Checkpoint Confirmation for Source: > Custom Source -> Sink: Unnamed (3/4) on task Source: Custom Source -> Sink: > Unnamed (3/4) > 2017-02-07 09:53:38,734 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Notification of complete checkpoint for task Source: Custom > Source -> Sink: Unnamed (2/4) > 2017-02-07 09:53:38,734 DEBUG org.apache.flink.streaming.con > nectors.kafka.FlinkKafkaConsumerBase - Committing offsets to > Kafka/ZooKeeper for checkpoint 12 > 2017-02-07 09:53:38,735 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Notification of complete checkpoint for task Source: Custom > Source -> Sink: Unnamed (3/4) > 2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.con > nectors.kafka.FlinkKafkaConsumerBase - Committing offsets to > Kafka/ZooKeeper for checkpoint 12 > 2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.con > nectors.kafka.internal.Kafka09Fetcher - Sending async offset commit > request to Kafka broker > 2017-02-07 09:53:38,734 DEBUG org.apache.flink.runtime.taskmanager.TaskManager > - Receiver ConfirmCheckpoint 12@1486486418731 for > 95ad7256919d3296b37d17693cd0ba71. > 2017-02-07 09:53:38,735 DEBUG org.apache.flink.runtime.taskmanager.Task > - Invoking async call Checkpoint Confirmation for Source: > Custom Source -> Sink: Unnamed (4/4) on task Source: Custom Source -> Sink: > Unnamed (4/4) > 2017-02-07 09:53:38,735 DEBUG org.apache.kafka.clients.consumer.KafkaConsumer > - Committing offsets: {testIn1-1=OffsetAndMetadata{offset=17421, > metadata=''}, testIn1-5=OffsetAndMetadata{offset=0, metadata=''}, > testIn1-9=OffsetAndMetadata{offset=17493, metadata=''}} > 2017-02-07 09:53:38,735 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask > - Notification of complete checkpoint for task Source: Custom > Source -> Sink: Unnamed (4/4) > 2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.con > nectors.kafka.FlinkKafkaConsumerBase - Committing offsets to > Kafka/ZooKeeper for checkpoint 12 > 2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.con > nectors.kafka.internal.Kafka09Fetcher - Sending async offset commit > request to Kafka broker > 2017-02-07 09:53:38,735 DEBUG org.apache.kafka.clients.consumer.KafkaConsumer > - Committing offsets: {testIn1-0=OffsetAndMetadata{offset=34926, > metadata=''}, testIn1-4=OffsetAndMetadata{offset=17325, metadata=''}, > testIn1-8=OffsetAndMetadata{offset=17564, metadata=''}} > 2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.con > nectors.kafka.internal.Kafka09Fetcher - Sending async offset commit > request to Kafka broker > 2017-02-07 09:53:38,736 DEBUG org.apache.kafka.clients.consumer.KafkaConsumer > - Committing offsets: {testIn1-3=OffsetAndMetadata{offset=52292, > metadata=''}, testIn1-7=OffsetAndMetadata{offset=0, metadata=''}} > 2017-02-07 09:53:38,735 DEBUG org.apache.flink.streaming.con > nectors.kafka.internal.Kafka09Fetcher - Sending async offset commit > request to Kafka broker > 2017-02-07 09:53:38,736 DEBUG org.apache.kafka.clients.consumer.KafkaConsumer > - Committing offsets: {testIn1-2=OffsetAndMetadata{offset=0, > metadata=''}, testIn1-6=OffsetAndMetadata{offset=17438, metadata=''}} > 2017-02-07 09:53:38,737 DEBUG org.apache.kafka.clients.consu > mer.internals.ConsumerCoordinator - Group test committed offset 0 for > partition testIn1-5 > 2017-02-07 09:53:38,737 DEBUG org.apache.kafka.clients.consu > mer.internals.ConsumerCoordinator - Group test committed offset 17493 > for partition testIn1-9 > 2017-02-07 09:53:38,737 DEBUG org.apache.kafka.clients.consu > mer.internals.ConsumerCoordinator - Group test committed offset 17421 > for partition testIn1-1 > 2017-02-07 09:53:38,737 DEBUG org.apache.kafka.clients.consu > mer.internals.ConsumerCoordinator - Group test committed offset 17325 > for partition testIn1-4 > 2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consu > mer.internals.ConsumerCoordinator - Group test committed offset 17564 > for partition testIn1-8 > 2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consu > mer.internals.ConsumerCoordinator - Group test committed offset 34926 > for partition testIn1-0 > 2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consu > mer.internals.ConsumerCoordinator - Group test committed offset 0 for > partition testIn1-7 > 2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consu > mer.internals.ConsumerCoordinator - Group test committed offset 52292 > for partition testIn1-3 > 2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consu > mer.internals.ConsumerCoordinator - Group test committed offset 17438 > for partition testIn1-6 > 2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consu > mer.internals.ConsumerCoordinator - Group test committed offset 0 for > partition testIn1-2 > 2017-02-07 09:53:43,524 DEBUG org.apache.flink.runtime.taskmanager.TaskManager > - Sending heartbeat to JobManager > 2017-02-07 09:53:43,730 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Triggering checkpoint 13 @ 1486486423730 > > -- > > Mahesh Kumar Ravindranathan > Data Streaming Engineer > Oracle Marketing Cloud - Social Platform > Contact No:+1(720)492-4445 > >