Github user ghelmling commented on a diff in the pull request:

    https://github.com/apache/incubator-twill/pull/51#discussion_r33214142
  
    --- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
 ---
    @@ -29,20 +34,71 @@
      */
     public final class EmbeddedKafkaServer extends AbstractIdleService {
     
    -  private final KafkaServerStartable server;
    +  public static final String START_TIMEOUT_RETRIES = 
"twill.kafka.start.timeout.retries";
    +
    +  private static final Logger LOG = 
LoggerFactory.getLogger(EmbeddedKafkaServer.class);
    +  private static final String DEFAULT_START_TIMEOUT_RETRIES = "5";
    +
    +  private final int startTimeoutRetries;
    +  private final KafkaConfig kafkaConfig;
    +  private KafkaServer server;
     
       public EmbeddedKafkaServer(Properties properties) {
    -    server = new KafkaServerStartable(new KafkaConfig(properties));
    +    this.startTimeoutRetries = 
Integer.parseInt(properties.getProperty(START_TIMEOUT_RETRIES,
    +                                                                       
DEFAULT_START_TIMEOUT_RETRIES));
    +    this.kafkaConfig = new KafkaConfig(properties);
       }
     
       @Override
       protected void startUp() throws Exception {
    -    server.startup();
    +    int tries = 0;
    +    do {
    +      KafkaServer kafkaServer = createKafkaServer(kafkaConfig);
    +      try {
    +        kafkaServer.startup();
    +        server = kafkaServer;
    +      } catch (Exception e) {
    +        kafkaServer.shutdown();
    +        kafkaServer.awaitShutdown();
    +
    +        Throwable rootCause = Throwables.getRootCause(e);
    +        if (rootCause instanceof ZkTimeoutException) {
    +          // Potentially caused by race condition bug described in 
TWILL-139.
    +          LOG.warn("Timeout when connecting to ZooKeeper from KafkaServer. 
Attempt number {}.", tries, rootCause);
    +        }
    +      }
    +    } while (server == null && ++tries < startTimeoutRetries);
       }
    --- End diff --
    
    Seems like we should throw an exception here if server is still null, 
saying that we were unable to initialize the kafka server.  Otherwise we'll 
just throw an NPE later on, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to