http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/src/main/java/org/apache/rya/periodic/notification/twill/yarn/PeriodicNotificationTwillRunner.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/twill.yarn/src/main/java/org/apache/rya/periodic/notification/twill/yarn/PeriodicNotificationTwillRunner.java b/extras/periodic.notification/twill.yarn/src/main/java/org/apache/rya/periodic/notification/twill/yarn/PeriodicNotificationTwillRunner.java new file mode 100644 index 0000000..552324e --- /dev/null +++ b/extras/periodic.notification/twill.yarn/src/main/java/org/apache/rya/periodic/notification/twill/yarn/PeriodicNotificationTwillRunner.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.twill.yarn; + +import java.io.File; +import java.io.FileInputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import javax.annotation.Nullable; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration; +import org.apache.rya.periodic.notification.twill.PeriodicNotificationTwillApp; +import org.apache.rya.periodic.notification.twill.PeriodicNotificationTwillRunnable; +import org.apache.twill.api.ClassAcceptor; +import org.apache.twill.api.ResourceReport; +import org.apache.twill.api.TwillController; +import org.apache.twill.api.TwillRunResources; +import org.apache.twill.api.TwillRunnerService; +import org.apache.twill.api.logging.PrinterLogHandler; +import org.apache.twill.yarn.YarnTwillRunnerService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; +import com.beust.jcommander.Parameters; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; + +/** + * This class is responsible for starting and stopping the {@link PeriodicNotificationTwillApp} on a Hadoop YARN cluster. + */ +public class PeriodicNotificationTwillRunner implements AutoCloseable { + + public static final Logger LOG = LoggerFactory.getLogger(PeriodicNotificationTwillRunner.class); + + private final YarnConfiguration yarnConfiguration; + private final TwillRunnerService twillRunner; + private final File configFile; + + /** + * + * @param yarnZookeepers - The zookeeper connect string used by the Hadoop YARN cluster. + * @param configFile - The config file used by {@link PeriodicNotificationTwillApp}. Typically notification.properties. + */ + public PeriodicNotificationTwillRunner(final String yarnZookeepers, final File configFile) { + Preconditions.checkArgument(configFile.exists(), "Config File must exist"); + Objects.requireNonNull(yarnZookeepers, "YARN Zookeepers must not be null."); + this.configFile = configFile; + yarnConfiguration = new YarnConfiguration(); + twillRunner = new YarnTwillRunnerService(yarnConfiguration, yarnZookeepers); + twillRunner.start(); + + // sleep to give the YarnTwillRunnerService time to retrieve state from zookeeper + try { + Thread.sleep(1000); + } catch (final InterruptedException e) { + throw new IllegalStateException(e); + } + } + + /** + * Start an instance of the {@link PeriodicNotificationTwillApp}. + * + * @param interactive - If true, this method will block until the user terminates this JVM, at which point the + * {@link PeriodicNotificationTwillApp} on the YARN cluster will also be terminated. If false, this + * method will return after startup. + */ + public void startApp(final boolean interactive) { + final String yarnClasspath = yarnConfiguration.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH, + Joiner.on(",").join(YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)); + final List<String> applicationClassPaths = Lists.newArrayList(); + Iterables.addAll(applicationClassPaths, Splitter.on(",").split(yarnClasspath)); + final TwillController controller = twillRunner + .prepare(new PeriodicNotificationTwillApp(configFile)) + .addLogHandler(new PrinterLogHandler(new PrintWriter(new OutputStreamWriter(System.out, StandardCharsets.UTF_8), true))) + .withApplicationClassPaths(applicationClassPaths) + //.withApplicationArguments(args) + //.withArguments(runnableName, args) + // .withBundlerClassAcceptor(new HadoopClassExcluder()) + .start(); + + final ResourceReport r = getResourceReport(controller, 5, TimeUnit.MINUTES); + LOG.info("Received ResourceReport: {}", r); + LOG.info("{} started successfully!", PeriodicNotificationTwillApp.APPLICATION_NAME); + + if(interactive) { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + Futures.getUnchecked(controller.terminate()); + } finally { + twillRunner.stop(); + } + } + }); + + try { + LOG.info("{} waiting termination by user. Type ctrl-c to terminate.", PeriodicNotificationTwillApp.class.getSimpleName()); + controller.awaitTerminated(); + } catch (final ExecutionException e) { + e.printStackTrace(); + } + } + } + + /** + * Terminates all instances of the {@link PeriodicNotificationTwillApp} on the YARN cluster. + */ + public void stopApp() { + LOG.info("Stopping any running instances..."); + + int counter = 0; + // It is possible that we have launched multiple instances of the app. For now, stop them all, one at a time. + for(final TwillController c : twillRunner.lookup(PeriodicNotificationTwillApp.APPLICATION_NAME)) { + final ResourceReport report = c.getResourceReport(); + LOG.info("Attempting to stop {} with YARN ApplicationId: {} and Twill RunId: {}", PeriodicNotificationTwillApp.APPLICATION_NAME, report.getApplicationId(), c.getRunId()); + Futures.getUnchecked(c.terminate()); + LOG.info("Stopped {} with YARN ApplicationId: {} and Twill RunId: {}", PeriodicNotificationTwillApp.APPLICATION_NAME, report.getApplicationId(), c.getRunId()); + counter++; + } + + LOG.info("Stopped {} instance(s) of {}", counter, PeriodicNotificationTwillApp.APPLICATION_NAME); + } + + /** + * Blocks until a non-null Resource report is returned. + * @param controller - The controller to interrogate. + * @param timeout - The maximum time to poll {@controller}. Use -1 for infinite polling. + * @param timeoutUnits - The units of {@code timeout}. + * @return The ResourceReport for the application. + * @throws IllegalStateException If a timeout occurs before a ResourceReport is returned. + */ + private ResourceReport getResourceReport(final TwillController controller, final long timeout, final TimeUnit timeoutUnits) { + Preconditions.checkArgument(timeout >= -1, "timeout cannot be less than -1"); + final long timeoutMillis = TimeUnit.MILLISECONDS.convert(timeout, timeoutUnits); + final long sleepMillis = 1000; // how long to sleep between retrieval attempts. + long totalElapsedMillis = 0; + ResourceReport report = controller.getResourceReport(); + while (reportIsLoading(report)) { + try { + Thread.sleep(sleepMillis); + } catch (final InterruptedException e) { + throw new IllegalStateException(e); + } + totalElapsedMillis += sleepMillis; + if ((timeout != -1) && (totalElapsedMillis >= timeoutMillis)) { + final String errorMessage = "Timeout while waiting for the Twill Application to start on YARN. Total elapsed time: " + TimeUnit.SECONDS.convert(totalElapsedMillis, TimeUnit.MILLISECONDS) + "s."; + LOG.error(errorMessage); + throw new IllegalStateException(errorMessage); + } + if ((totalElapsedMillis % 5000) == 0) { + LOG.info("Waiting for the Twill Application to start on YARN... Total elapsed time: {}s.", TimeUnit.SECONDS.convert(totalElapsedMillis, TimeUnit.MILLISECONDS)); + } + report = controller.getResourceReport(); + } + return report; + } + + /** + * Checks to see if the report has loaded. + * @param report - The {@link ResourceReport} for this Twill Application. + * @return Return true if the report is null or incomplete. Return false if the report is completely loaded. + */ + private boolean reportIsLoading(@Nullable final ResourceReport report) { + if(report == null) { + return true; + } + + final String yarnApplicationID = report.getApplicationId(); + final Collection<TwillRunResources> runnableResources = report.getResources().get(PeriodicNotificationTwillRunnable.TWILL_RUNNABLE_NAME); + + if(runnableResources == null || runnableResources.isEmpty()) { + LOG.info("Received Resource Report for YARN ApplicationID: {}, runnable resources are still loading...", yarnApplicationID); + return true; + } else { + LOG.info("Received Resource Report for YARN ApplicationID: {}, runnable resources are loaded.", yarnApplicationID); + return false; + } + } + + @Override + public void close() throws Exception { + if(twillRunner != null) { + twillRunner.stop(); + } + } + + private static class MainOptions { + @Parameter(names = { "-c", "--config-file" }, description = "PeriodicNotification Application config file", required = true) + private File configFile; + + @Parameter(names = { "-z", "--yarn-zookeepers" }, description = "(Optional) YARN Zookeepers connect string. If not specified, the value of 'accumulo.zookeepers' from the specified '--config-file' will be reused.", required = false) + private String zookeepers; + } + + + @Parameters(commandNames = { "start" }, separators = "=", commandDescription = "Start the PeriodicNotification Application on YARN") + private static class CommandStart { + @Parameter(names = { "-i", "--interactive" }, description = "(Optional) Interactive. If specified, blocks the console until the user types ctrl-c.", required = false) + private boolean interactive; + + //TODO future feature. + //@Parameter(names = { "-p", "--accumulo.password"}, description = "Leave value blank to be prompted interactively for the 'accumulo.password' of the 'accumulo.user' specified by the '--config-file'", password = true, required = true) + //private String password; + } + + @Parameters(commandNames = { "stop" }, commandDescription = "Stops PeriodicNotification Applications on YARN") + private static class CommandStop { + //TODO future feature. + //@Parameter(names = { "-a", "--all" }, description = "Stops all PeriodicNotification Application instances.", required = false) + //private boolean all = true; + + //@Parameter(names = { "-i", "--instances"}, description = "CSV List of application instances to be stopped", required = false) + //private List<String> instanceList; + } + + + public static void main(final String[] args) { + + final MainOptions options = new MainOptions(); + final String START = "start"; + final String STOP = "stop"; + String parsedCommand = null; + final CommandStart commandStart = new CommandStart(); + final CommandStop commandStop = new CommandStop(); + final JCommander cli = new JCommander(options); + cli.addCommand(START, commandStart); + cli.addCommand(STOP, commandStop); + cli.setProgramName(PeriodicNotificationTwillRunner.class.getName()); + try { + cli.parse(args); + parsedCommand = cli.getParsedCommand(); + if(parsedCommand == null) { + throw new ParameterException("A command must be specified."); + } + } catch (final ParameterException e) { + System.err.println("Error! Invalid input: " + e.getMessage()); + cli.usage(); + System.exit(1); + } + + // load the config file + PeriodicNotificationApplicationConfiguration conf = null; + try (FileInputStream fin = new FileInputStream(options.configFile)) { + final Properties p = new Properties(); + p.load(fin); + conf = new PeriodicNotificationApplicationConfiguration(p); + } catch (final Exception e) { + LOG.warn("Unable to load specified properties file", e); + System.exit(1); + } + + // pick the correct zookeepers + String zookeepers; + if(options.zookeepers != null && !options.zookeepers.isEmpty()) { + zookeepers = options.zookeepers; + } else { + zookeepers = conf.getAccumuloZookeepers(); + } + + try (final PeriodicNotificationTwillRunner app = new PeriodicNotificationTwillRunner(zookeepers, options.configFile)) { + if(START.equals(parsedCommand)) { + app.startApp(commandStart.interactive); + } else if(STOP.equals(parsedCommand)) { + app.stopApp(); + } else { + throw new IllegalStateException("Invalid Command."); // this state should be impossible. + } + } catch (final Exception e) { + LOG.warn("Error occurred.", e); + System.exit(1); + } + } + + static class HadoopClassExcluder extends ClassAcceptor { + @Override + public boolean accept(final String className, final URL classUrl, final URL classPathUrl) { + // exclude hadoop but not hbase package + return !(className.startsWith("org.apache.hadoop") && !className.startsWith("org.apache.hadoop.hbase")); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/src/main/scripts/periodicNotificationTwillApp.sh ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/twill.yarn/src/main/scripts/periodicNotificationTwillApp.sh b/extras/periodic.notification/twill.yarn/src/main/scripts/periodicNotificationTwillApp.sh new file mode 100644 index 0000000..8fa3497 --- /dev/null +++ b/extras/periodic.notification/twill.yarn/src/main/scripts/periodicNotificationTwillApp.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# navigate to the project directory +PROJECT_HOME=$(dirname $(cd $(dirname $0) && pwd)) +cd $PROJECT_HOME + +# setup the twill classpath +. conf/twill-env.sh + +# echo "Using classpath: $TWILL_CP" + +# run the program +$JAVA_HOME/bin/java -cp $TWILL_CP \ + -Dlogback.configurationFile=conf/logback.xml \ + org.apache.rya.periodic.notification.twill.yarn.PeriodicNotificationTwillRunner "$@" \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill/README.md ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/twill/README.md b/extras/periodic.notification/twill/README.md new file mode 100644 index 0000000..c87ad2e --- /dev/null +++ b/extras/periodic.notification/twill/README.md @@ -0,0 +1,36 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +## rya.periodic.notification.twill + +This project serves two purposes: + +1) Store all `org.apache.twill:twill-api` specific code to decouple this execution +environment dependency from the rest of Rya's implemenation logic. This way it +should be easy to integrate Rya code with alternative execution environments. + +2) It can be tricky to shield Twill applications from the constraints of the Twill +runtime environment (specifically a Guava 13.0 dependency, among potentially others). +By controlling the packaging of this project and leveraging the maven-shade-plugin's +relocation capability, we can avoid current and future classpath conflicts and allow +for a cleaner integration with Twill than we might get with the `BundledJarRunner` +from `org.apache.twill:twill-ext`. + +Note, the distribution of this twill application can be found in +`rya.periodic.notification.twill.yarn`. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill/pom.xml ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/twill/pom.xml b/extras/periodic.notification/twill/pom.xml new file mode 100644 index 0000000..e22dd45 --- /dev/null +++ b/extras/periodic.notification/twill/pom.xml @@ -0,0 +1,177 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.periodic.notification.parent</artifactId> + <version>3.2.12-incubating-SNAPSHOT</version> + </parent> + + <artifactId>rya.periodic.notification.twill</artifactId> + + <name>Apache Rya Periodic Notification Service on Twill </name> + <description>Twill Application for executing the Apache Rya Periodic Notification Service</description> + + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <!-- redirect any other logging frameworks to slf4j within the Twill runtime --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>jcl-over-slf4j</artifactId> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>jul-to-slf4j</artifactId> + <scope>runtime</scope> + </dependency> + + <!-- + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.periodic.notification.api</artifactId> + </dependency> + --> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.periodic.notification.service</artifactId> + <exclusions> + <!-- exclude logging implementations --> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>org.apache.twill</groupId> + <artifactId>twill-api</artifactId> + <version>0.12.0</version> + </dependency> + + <!-- Mark Accumulo Hadoop and Zookeeper as provided dependencies --> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-core</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <scope>provided</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <!-- + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <version>2.3.7</version> + <extensions>true</extensions> + <executions> + <execution> + <id>bundle</id> + <phase>package</phase> + <goals> + <goal>bundle</goal> + </goals> + <configuration> + <classifier>bundle</classifier> + <instructions> + <Embed-Dependency>*;inline=false</Embed-Dependency> + <Embed-Transitive>true</Embed-Transitive> + <Embed-Directory>lib</Embed-Directory> + </instructions> + </configuration> + </execution> + </executions> + </plugin> + --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>twill-app</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <shadedClassifierName>twill-app</shadedClassifierName> + <createDependencyReducedPom>false</createDependencyReducedPom> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" /> + </transformers> + <relocations> + <!-- relocate the more modern version of guava to avoid classpath conflicts --> + <relocation> + <pattern>com.google.common</pattern> + <shadedPattern>org.apache.rya.shaded.com.google.common</shadedPattern> + </relocation> + </relocations> + <artifactSet> + <excludes> + <!-- exclude logging implementations if the above exclusions/scoping don't catch them --> + <exclude>commons-logging:commons-logging</exclude> + <exclude>org.slf4j:slf4j-log4j12</exclude> + <exclude>log4j:log4j</exclude> + </excludes> + </artifactSet> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillApp.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillApp.java b/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillApp.java new file mode 100644 index 0000000..d2a6125 --- /dev/null +++ b/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillApp.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.twill; + +import java.io.File; + +import org.apache.twill.api.ResourceSpecification; +import org.apache.twill.api.ResourceSpecification.SizeUnit; +import org.apache.twill.api.TwillApplication; +import org.apache.twill.api.TwillSpecification; + +public class PeriodicNotificationTwillApp implements TwillApplication { + + + private final File configFile; + public static final String APPLICATION_NAME = PeriodicNotificationTwillApp.class.getSimpleName(); + + public PeriodicNotificationTwillApp(final File configFile) { + this.configFile = configFile; + } + + @Override + public TwillSpecification configure() { + return TwillSpecification.Builder.with() + .setName(APPLICATION_NAME) + .withRunnable() + .add(PeriodicNotificationTwillRunnable.TWILL_RUNNABLE_NAME, + new PeriodicNotificationTwillRunnable(), + ResourceSpecification.Builder.with() + .setVirtualCores(2) + .setMemory(2, SizeUnit.GIGA) + .setInstances(1) + .build()) + .withLocalFiles() + .add(PeriodicNotificationTwillRunnable.CONFIG_FILE_NAME, configFile) + .apply() + .anyOrder() + .build(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillRunnable.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillRunnable.java b/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillRunnable.java new file mode 100644 index 0000000..8695655 --- /dev/null +++ b/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillRunnable.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.twill; + +import java.io.File; +import java.io.FileInputStream; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import org.apache.rya.periodic.notification.application.PeriodicApplicationException; +import org.apache.rya.periodic.notification.application.PeriodicNotificationApplication; +import org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration; +import org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationFactory; +import org.apache.twill.api.AbstractTwillRunnable; +import org.apache.twill.api.Command; +import org.apache.twill.api.TwillContext; +import org.apache.twill.api.TwillRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PeriodicNotificationTwillRunnable extends AbstractTwillRunnable { + + private static final Logger logger = LoggerFactory.getLogger(PeriodicNotificationTwillRunnable.class); + + public static final String TWILL_RUNNABLE_NAME = PeriodicNotificationTwillRunnable.class.getSimpleName(); + public static final String CONFIG_FILE_NAME = "notification.properties"; + + private PeriodicNotificationApplication app; + + /** + * Called when the container process starts. Executed in container machine. If any exception is thrown from this + * method, this runnable won't get retry. + * + * @param context Contains information about the runtime context. + */ + @Override + public void initialize(final TwillContext context) { + logger.info("Initializing the PeriodicNotificationApplication."); + + final File propsFile = new File(CONFIG_FILE_NAME); + final PeriodicNotificationApplicationConfiguration conf; + try (final FileInputStream fin = new FileInputStream(propsFile)) { + final Properties p = new Properties(); + p.load(fin); + logger.debug("Loaded properties: {}", p); + conf = new PeriodicNotificationApplicationConfiguration(p); + } catch (final Exception e) { + logger.error("Error loading notification properties", e); + throw new RuntimeException(e); // kill the Runnable + } + + try { + this.app = PeriodicNotificationApplicationFactory.getPeriodicApplication(conf); + } catch (final PeriodicApplicationException e) { + logger.error("Error occurred creating PeriodicNotificationApplication", e); + throw new RuntimeException(e); // kill the Runnable + } + } + + /** + * Called when a command is received. A normal return denotes the command has been processed successfully, otherwise + * {@link Exception} should be thrown. + * @param command Contains details of the command. + * @throws Exception + */ + @Override + public void handleCommand(final Command command) throws Exception { + // no-op + } + + @Override + public void run() { + logger.info("Starting up the PeriodicNotificationApplication."); + app.start(); + try { + logger.info("Blocking thread termination until the PeriodicNotificationApplication is stopped."); + app.blockUntilFinished(); + } catch (IllegalStateException | ExecutionException | InterruptedException e) { + logger.error("An error occurred while blocking on the PeriodicNotificationApplication", e); + } + logger.info("Exiting the PeriodicNotificationApplication."); + } + + /** + * Requests to stop the running service. + */ + @Override + public void stop() { + logger.info("Stopping the PeriodicNotificationApplication..."); + app.stop(); + } + + /** + * Called when the {@link TwillRunnable#run()} completed. Useful for doing + * resource cleanup. This method would only get called if the call to {@link #initialize(TwillContext)} was + * successful. + */ + @Override + public void destroy() { + // no-op + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/README.md ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/README.md b/extras/rya.benchmark/README.md new file mode 100644 index 0000000..228dfaf --- /dev/null +++ b/extras/rya.benchmark/README.md @@ -0,0 +1,77 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +Benchmark Optimizations + + +## KafkaLatencyBenchmark + +Several strategies for partitioning the rya_pcj_updater table. If other tablet start hot spotting, they can be further split similar to how `STATEMENT_PATTERN_` is shown. +``` +addsplits AGGREGATION_ JOIN_ PROJECTION_ QUERY_ STATEMENT_PATTERN_ urn -t rya_pcj_updater + +# or +addsplits AGGREGATION_ JOIN_ PROJECTION_ QUERY_ STATEMENT_PATTERN_0 STATEMENT_PATTERN_4 STATEMENT_PATTERN_8 STATEMENT_PATTERN_c urn -t rya_pcj_updater + +# or +addsplits AGGREGATION_ JOIN_ PROJECTION_ QUERY_ STATEMENT_PATTERN_0 STATEMENT_PATTERN_1 STATEMENT_PATTERN_2 STATEMENT_PATTERN_3 STATEMENT_PATTERN_4 STATEMENT_PATTERN_5 STATEMENT_PATTERN_6 STATEMENT_PATTERN_7 STATEMENT_PATTERN_8 STATEMENT_PATTERN_9 STATEMENT_PATTERN_a STATEMENT_PATTERN_b STATEMENT_PATTERN_c STATEMENT_PATTERN_d STATEMENT_PATTERN_e STATEMENT_PATTERN_f urn -t rya_pcj_updater + +# then ensure the splits have been applied. +compact -t rya_pcj_updater +``` + +It is also possible to lower the table's split threshold to generate more tablets. +``` +root@accumulo> config -t rya_pcj_updater -s table.split.threshold=100M +``` + +Identify which tablets are on what hosts and what particular data you might be +hotspotting on. Note that the tablet splits are ordered lexicographically, and +the split point is exclusive. So the tablet that contains AGGREGATION_ data is + actually contained on the tablet with the split point label: 4qn;JOIN_. +``` +root@accumulo> tables -l +... +rya_osp => 4qr +rya_pcj_updater => 4qn +rya_po => 4qq +... + +root@accumulo> scan -t accumulo.metadata -b 4qn; -e 4qn< -c loc +4qn;AGGREGATION_ loc:25e09c3a40b000e [] 10.10.10.10:9997 +4qn;JOIN_ loc:45e09c3a2260012 [] 10.10.10.11:9997 +4qn;PROJECTION_ loc:55e09c3a2cf0014 [] 10.10.10.12:9997 +4qn;QUERY_ loc:35e09c3a2080021 [] 10.10.10.13:9997 +4qn;STATEMENT_PATTERN_0 loc:16e09c3a436001d [] 10.10.10.14:9997 +4qn;STATEMENT_PATTERN_4 loc:17e09c3a436001d [] 10.10.10.15:9997 +4qn;STATEMENT_PATTERN_8 loc:18e09c3a436001d [] 10.10.10.16:9997 +4qn;STATEMENT_PATTERN_c loc:19e09c3a436001d [] 10.10.10.17:9997 +4qn;urn loc:15e09c3a4360019 [] 10.10.10.18:9997 +4qn< loc:55e09c3a2cf0012 [] 10.10.10.19:9997 +``` + +Use the `RegexGroupBalancer` to ensure all STATEMENT_PATTERN_x tablets are evenly distributed between all available tablet servers. This distribution strategy will also apply to other groups that are specified in the regex. +``` +root@accumulo> config -t rya_pcj_updater -s table.custom.balancer.group.regex.pattern=(AGGREGATION_|JOIN_|PROJECTION_|QUERY_|STATEMENT_PATTERN_|urn).* +#root@accumulo> config -t rya_pcj_updater -s table.custom.balancer.group.regex.default=AGGREGATION_ +root@accumulo> config -t rya_pcj_updater -s table.balancer=org.apache.accumulo.server.master.balancer.RegexGroupBalancer +``` +References: +https://blogs.apache.org/accumulo/entry/balancing_groups_of_tablets +https://reviews.apache.org/r/29230/diff/2#index_header http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/pom.xml b/extras/rya.benchmark/pom.xml index fce434b..8468290 100644 --- a/extras/rya.benchmark/pom.xml +++ b/extras/rya.benchmark/pom.xml @@ -55,6 +55,13 @@ <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> + + <!-- Fluo runtime dependency --> + <dependency> + <groupId>org.apache.fluo</groupId> + <artifactId>fluo-core</artifactId> + <scope>runtime</scope> + </dependency> <!-- Testing --> <dependency> @@ -140,7 +147,6 @@ <goal>shade</goal> </goals> <configuration> - <finalName>benchmarks</finalName> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>org.openjdk.jmh.Main</mainClass> @@ -164,6 +170,23 @@ </execution> </executions> </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <id>create-binary-distribution</id> + <goals> + <goal>single</goal> + </goals> + <phase>package</phase> + <configuration> + <descriptors> + <descriptor>src/main/assembly/binary-release.xml</descriptor> + </descriptors> + </configuration> + </execution> + </executions> + </plugin> </plugins> </build> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/assembly/binary-release.xml ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/main/assembly/binary-release.xml b/extras/rya.benchmark/src/main/assembly/binary-release.xml new file mode 100644 index 0000000..374213f --- /dev/null +++ b/extras/rya.benchmark/src/main/assembly/binary-release.xml @@ -0,0 +1,33 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<assembly + xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd"> + <id>bin</id> + <formats> + <format>tar.gz</format> + </formats> + <includeBaseDirectory>true</includeBaseDirectory> + <componentDescriptors> + <componentDescriptor>src/main/assembly/component-release.xml</componentDescriptor> + </componentDescriptors> +</assembly> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/assembly/component-release.xml ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/main/assembly/component-release.xml b/extras/rya.benchmark/src/main/assembly/component-release.xml new file mode 100644 index 0000000..0d99717 --- /dev/null +++ b/extras/rya.benchmark/src/main/assembly/component-release.xml @@ -0,0 +1,81 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<component + xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/component/1.1.3" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/component/1.1.3 http://maven.apache.org/xsd/component-1.1.3.xsd"> + <fileSets> + <fileSet> + <directory>src/main/config</directory> + <outputDirectory>conf</outputDirectory> + <directoryMode>0755</directoryMode> + <fileMode>0644</fileMode> + <lineEnding>unix</lineEnding> + <filtered>false</filtered> + <includes> + <include>*.options</include> + <include>*.properties</include> + </includes> + </fileSet> + <fileSet> + <directory>src/main/scripts</directory> + <outputDirectory>bin</outputDirectory> + <directoryMode>0755</directoryMode> + <fileMode>0755</fileMode> + <includes> + <include>*.sh</include> + </includes> + <lineEnding>unix</lineEnding> + <filtered>true</filtered> + </fileSet> + <fileSet> + <directory>src/main/scripts</directory> + <outputDirectory>bin</outputDirectory> + <directoryMode>0755</directoryMode> + <fileMode>0644</fileMode> + <includes> + <include>*.bat</include> + </includes> + <lineEnding>dos</lineEnding> + <filtered>true</filtered> + </fileSet> + + <!-- create an empty directory for log files --> + <fileSet> + <directory>src/main/assembly</directory> + <outputDirectory>logs</outputDirectory> + <directoryMode>755</directoryMode> + <excludes> + <exclude>*</exclude> + </excludes> + </fileSet> + + <fileSet> + <directory>${project.build.directory}</directory> + <outputDirectory>lib</outputDirectory> + <directoryMode>755</directoryMode> + <fileMode>0644</fileMode> + <includes> + <include>${project.artifactId}-${project.version}-shaded.jar</include> + </includes> + </fileSet> + </fileSets> +</component> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/config/common.options ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/main/config/common.options b/extras/rya.benchmark/src/main/config/common.options new file mode 100644 index 0000000..9a0c7c1 --- /dev/null +++ b/extras/rya.benchmark/src/main/config/common.options @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This file contains example configuration values and should +# be modified to target your Rya/Kafka environment. + +--zookeepers +zoo1,zoo2,zoo3,zoo4,zoo5 + +--kafka-bootstrap-servers +kafka1:9092,kafka2:9092 + +--accumulo-instance +accumuloInstance + +--rya-instance +rya_ + +--username +accumuloUser + +# specifying --password here prompts the user to enter +# the accumulo password interactively on the shell +--password + +# local directory for storing benchmark output +--output-directory +results \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/config/log4j.properties ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/main/config/log4j.properties b/extras/rya.benchmark/src/main/config/log4j.properties new file mode 100644 index 0000000..1101514 --- /dev/null +++ b/extras/rya.benchmark/src/main/config/log4j.properties @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Valid levels: +# TRACE, DEBUG, INFO, WARN, ERROR and FATAL +log4j.rootCategory=INFO, CONSOLE, LOGFILE + +# CONSOLE is set to be a ConsoleAppender using a PatternLayout. +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +#log4j.appender.CONSOLE.layout.ConversionPattern=[%p] %m%n +log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c %x - %m%n + +# LOGFILE is set to be a File appender using a PatternLayout. +log4j.appender.LOGFILE=org.apache.log4j.FileAppender +log4j.appender.LOGFILE.File=logs/benchmark.log +#log4j.appender.LOGFILE.Threshold=DEBUG +log4j.appender.LOGFILE.Append=true + +log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout +log4j.appender.LOGFILE.layout.ConversionPattern=%d [%t] %-5p %c - %m%n + +#log4j.appender.LOGFILE.layout=org.apache.log4j.EnhancedPatternLayout +#log4j.appender.LOGFILE.layout.ConversionPattern=%d [%t] %-5p %c{1.} - %m%n + http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/config/periodic.options ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/main/config/periodic.options b/extras/rya.benchmark/src/main/config/periodic.options new file mode 100644 index 0000000..51ac8ac --- /dev/null +++ b/extras/rya.benchmark/src/main/config/periodic.options @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +--ingest-iterations +1000 + +--ingest-observations-per-type +10 + +--ingest-types +5 + +--ingest-type-prefix +car_ + +--ingest-period-sec +30 + +--report-period-sec +10 + +--periodic-query-window +15 + +#every 30 seconds +--periodic-query-period +.5 + +--periodic-query-time-units +minutes + +--periodic-query-registration-topic +notifications \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/config/projection.options ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/main/config/projection.options b/extras/rya.benchmark/src/main/config/projection.options new file mode 100644 index 0000000..9bba2de --- /dev/null +++ b/extras/rya.benchmark/src/main/config/projection.options @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +--ingest-iterations +1000 + +--ingest-observations-per-type +10 + +--ingest-types +5 + +--ingest-type-prefix +car_ + +--ingest-period-sec +30 + +--report-period-sec +10 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkOptions.java ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkOptions.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkOptions.java new file mode 100644 index 0000000..a848ebb --- /dev/null +++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkOptions.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.benchmark.periodic; + +import com.beust.jcommander.Parameter; +import com.google.common.base.Objects; + +public class BenchmarkOptions { + @Parameter(names = { "-ii", "--ingest-iterations" }, description = "Number of ingest iterations. Total data published is -i x -obs x -t", required = true) + private int numIterations; + + @Parameter(names = { "-iobs", "--ingest-observations-per-type" }, description = "Observations per Type per Iteration to generate.", required = true) + private int observationsPerTypePerIteration; + + @Parameter(names = { "-it", "--ingest-types" }, description = "The number of unique types to generate.", required = true) + private int numTypes; + + @Parameter(names = { "-itp", "--ingest-type-prefix" }, description = "The prefix to use for a type, for example 'car_'", required = true) + private String typePrefix; + + @Parameter(names = { "-ip", "--ingest-period-sec" }, description = "The period, in seconds between ingests of the data generated for one iteration.", required = true) + private int ingestPeriodSeconds; + + @Parameter(names = { "-rp", "--report-period-sec" }, description = "The period, in seconds between persisting reports of the current state.", required = true) + private int resultPeriodSeconds; + + public int getNumIterations() { + return numIterations; + } + + public int getObservationsPerTypePerIteration() { + return observationsPerTypePerIteration; + } + + public int getNumTypes() { + return numTypes; + } + + public String getTypePrefix() { + return typePrefix; + } + + public int getIngestPeriodSeconds() { + return ingestPeriodSeconds; + } + + public int getResultPeriodSeconds() { + return resultPeriodSeconds; + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("numIterations", numIterations) + .add("observationsPerTypePerIteration", observationsPerTypePerIteration) + .add("numTypes", numTypes) + .add("typePrefix", typePrefix) + .add("ingestPeriodSeconds", ingestPeriodSeconds) + .add("resultPeriodSeconds", resultPeriodSeconds) + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkStatementGenerator.java ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkStatementGenerator.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkStatementGenerator.java new file mode 100644 index 0000000..fdd3b63 --- /dev/null +++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkStatementGenerator.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.benchmark.periodic; + +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; + +import javax.xml.datatype.DatatypeConfigurationException; +import javax.xml.datatype.DatatypeFactory; + +import org.openrdf.model.Literal; +import org.openrdf.model.Statement; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +/** + * Generates sets of statements used for benchmarking + */ +public class BenchmarkStatementGenerator { + + private static final Logger logger = LoggerFactory.getLogger(BenchmarkStatementGenerator.class); + + private final ValueFactory vf; + private final DatatypeFactory dtf; + + public BenchmarkStatementGenerator() throws DatatypeConfigurationException { + vf = new ValueFactoryImpl(); + dtf = DatatypeFactory.newInstance(); + } + + /** + * Generates (numObservationsPerType x numTypes) statements of the form: + * + * <pre> + * urn:obs_n uri:hasTime zonedTime + * urn:obs_n uri:hasObsType typePrefix_m + * </pre> + * + * Where the n in urn:obs_n is the ith value in 0 to (numObservationsPerType x numTypes) with an offset specified by + * observationOffset, and where m in typePrefix_m is the jth value in 0 to numTypes. + * + * @param numObservationsPerType - The quantity of observations per type to generate. + * @param numTypes - The number of types to generate observations for. + * @param typePrefix - The prefix to be used for the type literal in the statement. + * @param observationOffset - The offset to be used for determining the value of n in the above statements. + * @param zonedTime - The time to be used for all observations generated. + * @return A new list of all generated Statements. + */ + public List<Statement> generate(final long numObservationsPerType, final int numTypes, final String typePrefix, final long observationOffset, final ZonedDateTime zonedTime) { + final String time = zonedTime.format(DateTimeFormatter.ISO_INSTANT); + final Literal litTime = vf.createLiteral(dtf.newXMLGregorianCalendar(time)); + final List<Statement> statements = Lists.newArrayList(); + + for (long i = 0; i < numObservationsPerType; i++) { + for(int j = 0; j < numTypes; j++) { + final long observationId = observationOffset + i*numTypes + j; + //final String obsId = "urn:obs_" + Long.toHexString(observationId) + "_" + observationId; + //final String obsId = "urn:obs_" + observationId; + final String obsId = "urn:obs_" + String.format("%020d", observationId); + final String type = typePrefix + j; + //logger.info(obsId + " " + type + " " + litTime); + statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasTime"), litTime)); + statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasObsType"), vf.createLiteral(type))); + } + } + + return statements; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/CommonOptions.java ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/CommonOptions.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/CommonOptions.java new file mode 100644 index 0000000..e87e6a9 --- /dev/null +++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/CommonOptions.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.benchmark.periodic; + +import java.io.File; +import java.util.Properties; +import java.util.UUID; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; + +import com.beust.jcommander.Parameter; +import com.google.common.base.Objects; + +public class CommonOptions { + + @Parameter(names = { "-u", "--username" }, description = "Accumulo Username", required = true) + private String username; + + @Parameter(names = { "-p", "--password" }, description = "Accumulo Password", required = true, password=true) + private String password; + + @Parameter(names = { "-ai", "--accumulo-instance" }, description = "Accumulo Instance", required = true) + private String accumuloInstance; + + @Parameter(names = { "-ri", "--rya-instance" }, description = "Rya Instance", required = true) + private String ryaInstance; + + @Parameter(names = { "-z", "--zookeepers" }, description = "Accumulo Zookeepers", required = true) + private String zookeepers; + + @Parameter(names = { "-k", "--kafka-bootstrap-servers" }, description = "Kafka bootstrap server string, for example: kafka1:9092,kafka2:9092", required = true) + private String kafkaBootstrap; + + @Parameter(names = { "-o", "--output-directory" }, description = "The directory that output should be persisted to.", required = true) + private File outputDirectory; + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + public String getAccumuloInstance() { + return accumuloInstance; + } + + public String getRyaInstance() { + return ryaInstance; + } + + public String getZookeepers() { + return zookeepers; + } + + public String getKafkaBootstrap() { + return kafkaBootstrap; + } + + public File getOutputDirectory() { + return outputDirectory; + } + + public RyaClient buildRyaClient() throws AccumuloException, AccumuloSecurityException { + final Instance instance = new ZooKeeperInstance(accumuloInstance, zookeepers); + final AccumuloConnectionDetails accumuloDetails = new AccumuloConnectionDetails(username, password.toCharArray(), accumuloInstance, zookeepers); + return AccumuloRyaClientFactory.build(accumuloDetails, instance.getConnector(username, new PasswordToken(password))); + } + + public Properties getKafkaConsumerProperties() { + final Properties consumerProps = new Properties(); + consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrap); + consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); + consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); + consumerProps.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "5000"); // reduce this value to 5 seconds for the scenario where we subscribe before the topic exists. + consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return consumerProps; + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("username", username) + .add("password", "[redacted]") + .add("accumuloInstance", accumuloInstance) + .add("ryaInstance", ryaInstance) + .add("zookeepers", zookeepers) + .add("kafkaBootstrap", kafkaBootstrap) + .add("outputDirectory", outputDirectory) + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/KafkaLatencyBenchmark.java ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/KafkaLatencyBenchmark.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/KafkaLatencyBenchmark.java new file mode 100644 index 0000000..454f7e0 --- /dev/null +++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/KafkaLatencyBenchmark.java @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.benchmark.periodic; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.LocalDateTime; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.List; +import java.util.LongSummaryStatistics; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import javax.xml.datatype.DatatypeConfigurationException; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.commons.io.FileUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; +import org.apache.rya.api.client.InstanceDoesNotExistException; +import org.apache.rya.api.client.LoadStatements; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.periodic.notification.serialization.BindingSetSerDe; +import org.openrdf.model.Statement; +import org.openrdf.query.BindingSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.ParameterException; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * This benchmark is useful for determining the performance characteristics of a Rya Triplestore under continuous ingest + * that has a PCJ Query that is incrementally updated by the Rya PCJ Fluo App (aka PCJ Updater). + * <p> + * This benchmark periodically loads a batch of data into Rya and reports the delay of the following query + * + * <pre> + * PREFIX time: <http://www.w3.org/2006/time#> + * SELECT ?type (count(?obs) as ?total) + * WHERE { + * ?obs <uri:hasTime> ?time . + * ?obs <uri:hasObsType> ?type . + * } + * GROUP BY ?type + * </pre> + * <p> + * This benchmark is useful for characterizing any latency between Truth (data ingested to Rya) and Reported (query + * result published to Kafka). + * <p> + * This benchmark is also useful for stress testing a Fluo App configuration and Accumulo Tablet configuration. + * <p> + * This benchmark expects the provided RyaInstance to have already been constructed and an appropriately configured Rya + * PCJ Fluo App for that RyaInstance to be deployed on your YARN cluster. + */ +public class KafkaLatencyBenchmark implements AutoCloseable { + + public static final Logger logger = LoggerFactory.getLogger(KafkaLatencyBenchmark.class); + + /** + * Data structure for storing Type + */ + private final Map<String, Stat> typeToStatMap = Maps.newTreeMap(); + + /** + * ThreadPool for publishing data and logging stats. + */ + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(20); + + private final CommonOptions options; + private final BenchmarkOptions benchmarkOptions; + private final RyaClient client; + DateTimeFormatter fsFormatter = DateTimeFormatter.ofPattern( "uuuu-MM-dd'T'HH-mm-ss" ); + private final LocalDateTime startTime; + List<ScheduledFuture<?>> futureList = Lists.newArrayList(); + + public KafkaLatencyBenchmark(final CommonOptions options, final BenchmarkOptions benchmarkOptions) throws AccumuloException, AccumuloSecurityException { + this.options = Objects.requireNonNull(options); + this.benchmarkOptions = Objects.requireNonNull(benchmarkOptions); + this.client = Objects.requireNonNull(options.buildRyaClient()); + this.startTime = LocalDateTime.now(); + + logger.info("Running {} with the following input parameters:\n{}\n{}", this.getClass(), options, benchmarkOptions); + } + + @Override + public void close() throws Exception { + logger.info("Stopping threads."); + scheduler.shutdown(); + + cancelAllScheduledTasks(); + logger.info("Waiting for all threads to terminate..."); + scheduler.awaitTermination(1, TimeUnit.DAYS); + logger.info("All threads terminated."); + } + + private void cancelAllScheduledTasks() { + logger.info("Canceling all tasks"); + for(final ScheduledFuture<?> task : futureList) { + task.cancel(false); + } + futureList.clear(); + } + + + public void start() throws InstanceDoesNotExistException, RyaClientException { + logger.info("Issuing Query"); + String topic; + boolean periodic; + if(benchmarkOptions instanceof PeriodicQueryCommand) { + topic = issuePeriodicQuery((PeriodicQueryCommand) benchmarkOptions); + periodic = true; + } else { + topic = issueQuery(); + periodic = false; + } + logger.info("Query Issued. Received PCJ ID: {}", topic); + startDataIngestTask(); + startStatsPrinterTask(); + startCsvPrinterTask(); + + if(periodic) { + updatePeriodicStatsFromKafka(topic); // blocking operation. + } else { + updateStatsFromKafka(topic); // blocking operation. + } + + } + + private String issueQuery() throws InstanceDoesNotExistException, RyaClientException { + final String sparql = "prefix time: <http://www.w3.org/2006/time#> " + + "select ?type (count(?obs) as ?total) where { " + + " ?obs <uri:hasTime> ?time. " + + " ?obs <uri:hasObsType> ?type " + + "} " + + "group by ?type"; + + logger.info("Query: {}", sparql); + return client.getCreatePCJ().createPCJ(options.getRyaInstance(), sparql, ImmutableSet.of(ExportStrategy.KAFKA)); + } + + private String issuePeriodicQuery(final PeriodicQueryCommand periodicOptions) throws InstanceDoesNotExistException, RyaClientException { + final String sparql = "prefix function: <http://org.apache.rya/function#> " + + "prefix time: <http://www.w3.org/2006/time#> " + + "select ?type (count(?obs) as ?total) where {" + + "Filter(function:periodic(?time, " + periodicOptions.getPeriodicQueryWindow() + ", " + periodicOptions.getPeriodicQueryPeriod() + ", time:" + periodicOptions.getPeriodicQueryTimeUnits() + ")) " + + "?obs <uri:hasTime> ?time. " + + "?obs <uri:hasObsType> ?type } " + + "group by ?type"; + logger.info("Query: {}", sparql); + final String queryId = client.getCreatePeriodicPCJ().createPeriodicPCJ(options.getRyaInstance(), sparql, periodicOptions.getPeriodicQueryRegistrationTopic(), options.getKafkaBootstrap()); + logger.info("Received query id: {}", queryId); + return queryId.substring("QUERY_".length()); // remove the QUERY_ prefix. + } + + + private void startDataIngestTask() { + final int initialPublishDelaySeconds = 1; + + final LoadStatements loadCommand = client.getLoadStatements(); + + // initialize the stats map + for(int typeId = 0; typeId < benchmarkOptions.getNumTypes(); typeId++) { + final String type = benchmarkOptions.getTypePrefix() + typeId; + typeToStatMap.put(type, new Stat(type)); + } + + final LoaderTask loaderTask = new LoaderTask(benchmarkOptions.getNumIterations(), + benchmarkOptions.getObservationsPerTypePerIteration(), benchmarkOptions.getNumTypes(), + benchmarkOptions.getTypePrefix(), loadCommand, options.getRyaInstance()); + + final ScheduledFuture<?> loaderTaskFuture = scheduler.scheduleAtFixedRate(loaderTask, initialPublishDelaySeconds, benchmarkOptions.getIngestPeriodSeconds(), TimeUnit.SECONDS); + futureList.add(loaderTaskFuture); + + loaderTask.setShutdownOperation(() -> { + cancelAllScheduledTasks(); + }); + } + + private void startStatsPrinterTask() { + final Runnable statLogger = () -> { + final StringBuilder sb = new StringBuilder(); + sb.append("Results\n"); + for(final Stat s : typeToStatMap.values()) { + sb.append(s).append("\n"); + } + logger.info("{}",sb); + }; + + final int initialPrintDelaySeconds = 11; + + final ScheduledFuture<?> statLoggerFuture = scheduler.scheduleAtFixedRate(statLogger, initialPrintDelaySeconds, benchmarkOptions.getResultPeriodSeconds(), TimeUnit.SECONDS); + futureList.add(statLoggerFuture); + } + + private void startCsvPrinterTask() { + + final int initialPrintDelaySeconds = 11; + final long printPeriodSeconds = benchmarkOptions.getResultPeriodSeconds(); + + final Runnable csvPrinterTask = new Runnable() { + private final AtomicInteger printCounter = new AtomicInteger(0); + private final File outFile = new File(options.getOutputDirectory(), "run-" + fsFormatter.format(startTime) + ".csv"); + + @Override + public void run() { + final int count = printCounter.getAndIncrement(); + final StringBuilder sb = new StringBuilder(); + if(count == 0) { + sb.append("elapsed-seconds"); + for(final Stat s : typeToStatMap.values()) { + sb.append(",").append(s.getCsvStringHeader()); + } + sb.append("\n"); + } + + sb.append(count*printPeriodSeconds); + for(final Stat s : typeToStatMap.values()) { + sb.append(",").append(s.getCsvString()); + } + sb.append("\n"); + try { + FileUtils.write(outFile, sb.toString(), StandardCharsets.UTF_8, true); + } catch (final IOException e) { + logger.warn("Error writing to file " + outFile, e); + } + } + }; + + final ScheduledFuture<?> csvPrinterFuture = scheduler.scheduleAtFixedRate(csvPrinterTask, initialPrintDelaySeconds, printPeriodSeconds, TimeUnit.SECONDS); + futureList.add(csvPrinterFuture); + } + + + + private void updateStatsFromKafka(final String topic) { + try (KafkaConsumer<String, VisibilityBindingSet> consumer = new KafkaConsumer<>(options.getKafkaConsumerProperties(), new StringDeserializer(), new KryoVisibilityBindingSetSerializer())) { + consumer.subscribe(Arrays.asList(topic)); + while (!futureList.isEmpty()) { + final ConsumerRecords<String, VisibilityBindingSet> records = consumer.poll(500); // check kafka at most twice a second. + handle(records); + } + } catch (final Exception e) { + logger.warn("Exception occurred", e); + } + } + + private void updatePeriodicStatsFromKafka(final String topic) { + try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(options.getKafkaConsumerProperties(), new StringDeserializer(), new BindingSetSerDe())) { + consumer.subscribe(Arrays.asList(topic)); + while (!futureList.isEmpty()) { + final ConsumerRecords<String, BindingSet> records = consumer.poll(500); // check kafka at most twice a second. + handle(records); + } + } catch (final Exception e) { + logger.warn("Exception occurred", e); + } + } + + private void handle(final ConsumerRecords<String, ? extends BindingSet> records) { + if(records.count() > 0) { + logger.debug("Received {} records", records.count()); + } + for(final ConsumerRecord<String, ? extends BindingSet> record: records){ + final BindingSet result = record.value(); + logger.debug("Received BindingSet: {}", result); + + final String type = result.getBinding("type").getValue().stringValue(); + final long total = Long.parseLong(result.getBinding("total").getValue().stringValue()); + + final Stat stat = typeToStatMap.get(type); + if(stat == null) { + logger.warn("Not expecting to receive type: {}", type); + } else { + stat.fluoTotal.set(total); + } + } + } + + private class LoaderTask implements Runnable { + private final AtomicLong iterations = new AtomicLong(0); + private final int numIterations; + private final int numTypes; + private final String typePrefix; + private final long observationsPerTypePerIteration; + + private final LoadStatements loadStatements; + private final String ryaInstanceName; + private Runnable shutdownOperation; + + public LoaderTask(final int numIterations, final long observationsPerTypePerIteration, final int numTypes, final String typePrefix, final LoadStatements loadStatements, final String ryaInstanceName) { + this.numIterations = numIterations; + this.observationsPerTypePerIteration = observationsPerTypePerIteration; + this.numTypes = numTypes; + this.typePrefix = typePrefix; + this.loadStatements = loadStatements; + this.ryaInstanceName = ryaInstanceName; + } + + @Override + public void run() { + try { + final BenchmarkStatementGenerator gen = new BenchmarkStatementGenerator(); + + final long i = iterations.getAndIncrement(); + logger.info("Publishing iteration [{} of {}]", i, numIterations); + if(i >= numIterations) { + logger.info("Reached maximum iterations..."); + shutdownOperation.run(); + return; + } + final long observationsPerIteration = observationsPerTypePerIteration * numTypes; + final long iterationOffset = i * observationsPerIteration; + logger.info("Generating {} Observations", observationsPerIteration); + final Iterable<Statement> statements = gen.generate(observationsPerTypePerIteration, numTypes, typePrefix, iterationOffset, ZonedDateTime.now()); + logger.info("Publishing {} Observations", observationsPerIteration); + final long t1 = System.currentTimeMillis(); + loadStatements.loadStatements(ryaInstanceName, statements); + logger.info("Published {} observations in in {}s", observationsPerIteration, ((System.currentTimeMillis() - t1)/1000.0)); + logger.info("Updating published totals..."); + for(int typeId = 0; typeId < numTypes; typeId++) { + typeToStatMap.get(typePrefix + typeId).total.addAndGet(observationsPerTypePerIteration); + } + logger.info("Finished publishing."); + } catch (final RyaClientException e) { + logger.warn("Error while writing statements", e); + } catch (final DatatypeConfigurationException e) { + logger.warn("Error creating generator", e); + } + + } + + public void setShutdownOperation(final Runnable f) { + this.shutdownOperation = f; + } + } + + + /** + * Simple data structure for storing and reporting statistics for a Type. + */ + private class Stat { + protected final AtomicLong fluoTotal = new AtomicLong(0); + protected final AtomicLong total = new AtomicLong(0); + private final String type; + private final LongSummaryStatistics diffStats = new LongSummaryStatistics(); + public Stat(final String type) { + this.type = type; + } + + @Override + public String toString() { + final long t = total.get(); + final long ft = fluoTotal.get(); + final long diff = t - ft; + diffStats.accept(diff); + return type + " published total: " + t + " fluo: " + ft + " difference: " + diff + " diffStats: " + diffStats; + } + + public String getCsvString() { + final long t = total.get(); + final long ft = fluoTotal.get(); + final long diff = t - ft; + return Joiner.on(",").join(type, t, ft, diff); + } + + public String getCsvStringHeader() { + return "type,published_total,fluo_total_" + type + ",difference_" + type; + } + } + + public static void main(final String[] args) { + + final CommonOptions options = new CommonOptions(); + final ProjectionQueryCommand projectionCommand = new ProjectionQueryCommand(); + final PeriodicQueryCommand periodicCommand = new PeriodicQueryCommand(); + + BenchmarkOptions parsedCommand = null; + final JCommander cli = new JCommander(); + cli.addObject(options); + cli.addCommand(projectionCommand); + cli.addCommand(periodicCommand); + cli.setProgramName(KafkaLatencyBenchmark.class.getName()); + + try { + cli.parse(args); + final String parsedName = cli.getParsedCommand(); + if ("periodic".equals(parsedName)) { + parsedCommand = periodicCommand; + } + if ("projection".equals(parsedName)) { + parsedCommand = projectionCommand; + } + if (parsedCommand == null) { + throw new ParameterException("A command must be specified."); + } + } catch (final ParameterException e) { + System.err.println("Error! Invalid input: " + e.getMessage()); + cli.usage(); + System.exit(1); + } + + try (KafkaLatencyBenchmark benchmark = new KafkaLatencyBenchmark(options, parsedCommand)) { + benchmark.start(); + } catch (final Exception e) { + logger.warn("Exception occured.", e); + } + } +}