[ 
https://issues.apache.org/jira/browse/KAFKA-6783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-6783:
----------------------------------
    Fix Version/s:     (was: 2.0.0)
      Description: 
{code:java}
    @Test
    public void testPollWithAllBootstrapServersDown() throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        try {
            final long pollTimeout = 1000;
            final AtomicBoolean pollComplete = new AtomicBoolean();
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    Properties props = new Properties();
                    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:29092");
                    try (KafkaConsumer<byte[], byte[]> consumer = 
newConsumer(props)) {
                        consumer.subscribe(Arrays.asList(topic));
                        try {
                            consumer.poll(pollTimeout);
                        } catch (Exception ex) {
                            ex.printStackTrace();
                        } finally {
                            pollComplete.set(true);
                        }
                    }
                }
            });

            Thread.sleep(pollTimeout * 2);
            Assert.assertTrue("poll timeout not work when all servers down", 
pollComplete.get());
        } finally {
            executor.shutdown();
        }
    }
{code}


  was:

{code:java}
    @Test
    public void testPollWithAllBootstrapServersDown() throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        try {
            final long pollTimeout = 1000;
            final AtomicBoolean pollComplete = new AtomicBoolean();
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    Properties props = new Properties();
                    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:29092");
                    try (KafkaConsumer<byte[], byte[]> consumer = 
newConsumer(props)) {
                        consumer.subscribe(Arrays.asList(topic));
                        try {
                            consumer.poll(pollTimeout);
                        } catch (Exception ex) {
                            ex.printStackTrace();
                        } finally {
                            pollComplete.set(true);
                        }
                    }
                }
            });

            Thread.sleep(pollTimeout * 2);
            Assert.assertTrue("poll timeout not work when all servers down", 
pollComplete.get());
        } finally {
            executor.shutdown();
        }
    }
{code}



> consumer poll(timeout) blocked infinitely when no available bootstrap server
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-6783
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6783
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 1.1.0
>            Reporter: Qiang Zhao
>            Priority: Major
>              Labels: features
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> {code:java}
>     @Test
>     public void testPollWithAllBootstrapServersDown() throws Exception {
>         ExecutorService executor = Executors.newSingleThreadExecutor();
>         try {
>             final long pollTimeout = 1000;
>             final AtomicBoolean pollComplete = new AtomicBoolean();
>             executor.submit(new Runnable() {
>                 @Override
>                 public void run() {
>                     Properties props = new Properties();
>                     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:29092");
>                     try (KafkaConsumer<byte[], byte[]> consumer = 
> newConsumer(props)) {
>                         consumer.subscribe(Arrays.asList(topic));
>                         try {
>                             consumer.poll(pollTimeout);
>                         } catch (Exception ex) {
>                             ex.printStackTrace();
>                         } finally {
>                             pollComplete.set(true);
>                         }
>                     }
>                 }
>             });
>             Thread.sleep(pollTimeout * 2);
>             Assert.assertTrue("poll timeout not work when all servers down", 
> pollComplete.get());
>         } finally {
>             executor.shutdown();
>         }
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to