[ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10370:
-------------------------------
    Description: 
In WorkerSinkTask.java, when we want the consumer to start consuming from 
certain offsets, rather than from the last committed offset, 
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
 is used to carry the offsets from external world (e.g. implementation of 
SinkTask).

In the [poll() 
method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
 it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
(2) consumer.seek(tp, offset) to rewind the consumer.

when running (2), we saw the following IllegalStateException:

{code:java}
[2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind 
test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
[2020-08-07 23:53:55,752] INFO [Consumer 
clientId=connector-consumer-MirrorSinkConnector-0, 
groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
(org.apache.kafka.clients.consumer.KafkaConsumer:1592)
[2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask:187)
java.lang.IllegalStateException: No current assignment for partition test-1
        at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
        at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
is being killed and will not recover until manually restarted 
(org.apache.kafka.connect.runtime.WorkerTask:188)
{code}

As suggested in 
https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
 the resolution that has been initially verified is to use *consumer.assign* 
with *consumer.seek* , instead of *consumer.subscribe*. 

  was:
In WorkerSinkTask.java, when we want the consumer to start consuming from 
certain offsets, rather than from the last committed offset, 
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
 is used to carry the offsets from external world (e.g. implementation of 
SinkTask).

In the [poll() 
method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
 it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
(2) consumer.seek(tp, offset) to rewind the consumer.

when running (2), we saw the following IllegalStateException:

{code:java}
java.lang.IllegalStateException: No current assignment for partition mytopic-1
    at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
    at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
{code}

As suggested in 
https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
 the resolution that has been initially verified is to use *consumer.assign* 
with *consumer.seek* , instead of *consumer.subscribe*. 


> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) 
> when (tp, offsets) are supplied by WorkerSinkTaskContext
> ----------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10370
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10370
>             Project: Kafka
>          Issue Type: New Feature
>          Components: KafkaConnect
>    Affects Versions: 2.5.0
>            Reporter: Ning Zhang
>            Assignee: Ning Zhang
>            Priority: Major
>             Fix For: 2.6.0
>
>
> In WorkerSinkTask.java, when we want the consumer to start consuming from 
> certain offsets, rather than from the last committed offset, 
> [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
>  is used to carry the offsets from external world (e.g. implementation of 
> SinkTask).
> In the [poll() 
> method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
>  it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
> (2) consumer.seek(tp, offset) to rewind the consumer.
> when running (2), we saw the following IllegalStateException:
> {code:java}
> [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} 
> Rewind test-1 to offset 3 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
> [2020-08-07 23:53:55,752] INFO [Consumer 
> clientId=connector-consumer-MirrorSinkConnector-0, 
> groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
> (org.apache.kafka.clients.consumer.KafkaConsumer:1592)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:187)
> java.lang.IllegalStateException: No current assignment for partition test-1
>         at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
>         at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> is being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask:188)
> {code}
> As suggested in 
> https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
>  the resolution that has been initially verified is to use *consumer.assign* 
> with *consumer.seek* , instead of *consumer.subscribe*. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to