Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167242987 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java --- @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.JAXBException; + +import org.apache.commons.daemon.Daemon; +import org.apache.commons.daemon.DaemonContext; +import org.apache.commons.daemon.DaemonInitException; +import org.apache.rya.streams.kafka.KafkaStreamsFactory; +import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory; +import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic; +import org.apache.rya.streams.querymanager.kafka.KafkaQueryChangeLogSource; +import org.apache.rya.streams.querymanager.kafka.LocalQueryExecutor; +import org.apache.rya.streams.querymanager.xml.Kafka; +import org.apache.rya.streams.querymanager.xml.QueryManagerConfig; +import org.apache.rya.streams.querymanager.xml.QueryManagerConfig.PerformanceTunning.QueryChanngeLogDiscoveryPeriod; +import org.apache.rya.streams.querymanager.xml.QueryManagerConfigUnmarshaller; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xml.sax.SAXException; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * JSVC integration code for a {@link QueryManager} to be used as a non-Windows daemon. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManagerDaemon implements Daemon { + + private static final Logger log = LoggerFactory.getLogger(QueryManagerDaemon.class); + + /** + * The default configuration file's path for the application. + */ + private static final Path DEFAULT_CONFIGURATION_PATH = Paths.get("config/configuration.xml"); + + /** + * Command line parameters that are used by all commands that interact with Kafka. + */ + class DaemonParameters { + @Parameter(names = {"--config", "-c"}, required = false, description = "The path to the application's configuration file.") + public String config; + } + + private QueryManager manager = null; + + @Override + public void init(final DaemonContext context) throws DaemonInitException, Exception { + requireNonNull(context); + + // Parse the command line arguments for the configuration file to use. + final String[] args = context.getArguments(); + final DaemonParameters params = new DaemonParameters(); + try { + new JCommander(params).parse(args); + } catch(final ParameterException e) { + throw new DaemonInitException("Unable to parse the command line arguments.", e); + } + final Path configFile = params.config != null ? Paths.get(params.config) : DEFAULT_CONFIGURATION_PATH; + log.info("Loading the following configuration file: " + configFile); + + // Unmarshall the configuration file into an object. + final QueryManagerConfig config; + try(InputStream stream = Files.newInputStream(configFile)) { --- End diff -- final
---