[ https://issues.apache.org/jira/browse/TWILL-122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938510#comment-15938510 ]
ASF GitHub Bot commented on TWILL-122: -------------------------------------- Github user hsaputra commented on a diff in the pull request: https://github.com/apache/twill/pull/40#discussion_r107690293 --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java --- @@ -71,21 +71,37 @@ private static final Logger LOG = LoggerFactory.getLogger(AbstractTwillController.class); private static final Gson GSON = new Gson(); + private final String appName; + private final RunId runId; private final Queue<LogHandler> logHandlers; private final KafkaClientService kafkaClient; private ZKDiscoveryService discoveryServiceClient; private Cancellable logCancellable; - public AbstractTwillController(RunId runId, ZKClient zkClient, Iterable<LogHandler> logHandlers) { + public AbstractTwillController(String appName, RunId runId, ZKClient zkClient, boolean logCollectionEnabled, + Iterable<LogHandler> logHandlers) { super(runId, zkClient); + this.appName = appName; + this.runId = runId; this.logHandlers = new ConcurrentLinkedQueue<>(); - this.kafkaClient = new ZKKafkaClientService(ZKClients.namespace(zkClient, "/" + runId.getId() + "/kafka")); - Iterables.addAll(this.logHandlers, logHandlers); + + // When addressing TWILL-147, need to check if the given ZKClient is + // actually used by the Kafka used for log collection + if (logCollectionEnabled) { + this.kafkaClient = new ZKKafkaClientService(ZKClients.namespace(zkClient, "/" + runId.getId() + "/kafka")); + Iterables.addAll(this.logHandlers, logHandlers); + } else { + this.kafkaClient = null; + if (!Iterables.isEmpty(logHandlers)) { + LOG.warn("Log collection is disabled for application {} with runId {}. " + + "Adding log handler won't get any logs.", appName, runId); + } + } } @Override protected synchronized void doStartUp() { - if (!logHandlers.isEmpty()) { + if (kafkaClient != null && !logHandlers.isEmpty()) { --- End diff -- You are right here, there is a chance that even log is enabled but caller can pass empty `logHandlers`. +1 for current check > Allow disabling the log transport > --------------------------------- > > Key: TWILL-122 > URL: https://issues.apache.org/jira/browse/TWILL-122 > Project: Apache Twill > Issue Type: Improvement > Components: core > Reporter: Terence Yim > Assignee: Terence Yim > Fix For: 0.11.0 > > > Currently transporting logs to Kafka is mandatory. It should be optionally so > that application can have its own way of log collection. -- This message was sent by Atlassian JIRA (v6.3.15#6346)