Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/40#discussion_r107344859
  
    --- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java 
---
    @@ -71,21 +71,37 @@
       private static final Logger LOG = 
LoggerFactory.getLogger(AbstractTwillController.class);
       private static final Gson GSON = new Gson();
     
    +  private final String appName;
    +  private final RunId runId;
       private final Queue<LogHandler> logHandlers;
       private final KafkaClientService kafkaClient;
       private ZKDiscoveryService discoveryServiceClient;
       private Cancellable logCancellable;
     
    -  public AbstractTwillController(RunId runId, ZKClient zkClient, 
Iterable<LogHandler> logHandlers) {
    +  public AbstractTwillController(String appName, RunId runId, ZKClient 
zkClient, boolean logCollectionEnabled,
    +                                 Iterable<LogHandler> logHandlers) {
         super(runId, zkClient);
    +    this.appName = appName;
    +    this.runId = runId;
         this.logHandlers = new ConcurrentLinkedQueue<>();
    -    this.kafkaClient = new 
ZKKafkaClientService(ZKClients.namespace(zkClient, "/" + runId.getId() + 
"/kafka"));
    -    Iterables.addAll(this.logHandlers, logHandlers);
    +
    +    // When addressing TWILL-147, need to check if the given ZKClient is
    +    // actually used by the Kafka used for log collection
    +    if (logCollectionEnabled) {
    +      this.kafkaClient = new 
ZKKafkaClientService(ZKClients.namespace(zkClient, "/" + runId.getId() + 
"/kafka"));
    +      Iterables.addAll(this.logHandlers, logHandlers);
    +    } else {
    +      this.kafkaClient = null;
    +      if (!Iterables.isEmpty(logHandlers)) {
    +        LOG.warn("Log collection is disabled for application {} with runId 
{}. " +
    +                   "Adding log handler won't get any logs.", appName, 
runId);
    +      }
    +    }
       }
     
       @Override
       protected synchronized void doStartUp() {
    -    if (!logHandlers.isEmpty()) {
    +    if (kafkaClient != null && !logHandlers.isEmpty()) {
    --- End diff --
    
    `logCollectionEnabled` is not a field. I used `kafkaClient == null` to 
indicate disabling of log collection.
    Also, the `logHandlers` collection can be empty during startup of the 
controller, even log collection is enabled, hence the check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to