[ 
https://issues.apache.org/jira/browse/TWILL-122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938510#comment-15938510
 ] 

ASF GitHub Bot commented on TWILL-122:
--------------------------------------

Github user hsaputra commented on a diff in the pull request:

    https://github.com/apache/twill/pull/40#discussion_r107690293
  
    --- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java 
---
    @@ -71,21 +71,37 @@
       private static final Logger LOG = 
LoggerFactory.getLogger(AbstractTwillController.class);
       private static final Gson GSON = new Gson();
     
    +  private final String appName;
    +  private final RunId runId;
       private final Queue<LogHandler> logHandlers;
       private final KafkaClientService kafkaClient;
       private ZKDiscoveryService discoveryServiceClient;
       private Cancellable logCancellable;
     
    -  public AbstractTwillController(RunId runId, ZKClient zkClient, 
Iterable<LogHandler> logHandlers) {
    +  public AbstractTwillController(String appName, RunId runId, ZKClient 
zkClient, boolean logCollectionEnabled,
    +                                 Iterable<LogHandler> logHandlers) {
         super(runId, zkClient);
    +    this.appName = appName;
    +    this.runId = runId;
         this.logHandlers = new ConcurrentLinkedQueue<>();
    -    this.kafkaClient = new 
ZKKafkaClientService(ZKClients.namespace(zkClient, "/" + runId.getId() + 
"/kafka"));
    -    Iterables.addAll(this.logHandlers, logHandlers);
    +
    +    // When addressing TWILL-147, need to check if the given ZKClient is
    +    // actually used by the Kafka used for log collection
    +    if (logCollectionEnabled) {
    +      this.kafkaClient = new 
ZKKafkaClientService(ZKClients.namespace(zkClient, "/" + runId.getId() + 
"/kafka"));
    +      Iterables.addAll(this.logHandlers, logHandlers);
    +    } else {
    +      this.kafkaClient = null;
    +      if (!Iterables.isEmpty(logHandlers)) {
    +        LOG.warn("Log collection is disabled for application {} with runId 
{}. " +
    +                   "Adding log handler won't get any logs.", appName, 
runId);
    +      }
    +    }
       }
     
       @Override
       protected synchronized void doStartUp() {
    -    if (!logHandlers.isEmpty()) {
    +    if (kafkaClient != null && !logHandlers.isEmpty()) {
    --- End diff --
    
    You are right here, there  is a chance that even log is enabled but caller 
can pass empty `logHandlers`.
    +1 for current check


> Allow disabling the log transport
> ---------------------------------
>
>                 Key: TWILL-122
>                 URL: https://issues.apache.org/jira/browse/TWILL-122
>             Project: Apache Twill
>          Issue Type: Improvement
>          Components: core
>            Reporter: Terence Yim
>            Assignee: Terence Yim
>             Fix For: 0.11.0
>
>
> Currently transporting logs to Kafka is mandatory. It should be optionally so 
> that application can have its own way of log collection.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to