RYA-377 Add ListQueries command Added command for listing queries Added Integration Test for the command Added default implementation for the interacot
Added default usage to command interface Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/adc44fd3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/adc44fd3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/adc44fd3 Branch: refs/heads/master Commit: adc44fd369b4f54497fa7a4d1ef099d166effbf3 Parents: a8b511b Author: Andrew Smith <smith...@gmail.com> Authored: Fri Oct 27 17:08:47 2017 -0400 Committer: caleb <caleb.me...@parsons.com> Committed: Tue Jan 9 15:13:00 2018 -0500 ---------------------------------------------------------------------- .../rya/streams/api/interactor/ListQueries.java | 4 +- .../interactor/defaults/DefaultListQueries.java | 54 ++++++++ .../rya/streams/client/RyaStreamsCommand.java | 9 +- .../client/command/ListQueriesCommand.java | 124 +++++++++++++++++++ .../client/command/ListQueryCommandIT.java | 60 +++++++++ 5 files changed, 248 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/adc44fd3/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/ListQueries.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/ListQueries.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/ListQueries.java index 4cab856..5d03f5c 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/ListQueries.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/ListQueries.java @@ -18,7 +18,7 @@ */ package org.apache.rya.streams.api.interactor; -import java.util.List; +import java.util.Set; import org.apache.rya.streams.api.entity.StreamsQuery; import org.apache.rya.streams.api.exception.RyaStreamsException; @@ -38,5 +38,5 @@ public interface ListQueries { * @return All of the queries that are managed. * @throws RyaStreamsException The queries could not be listed. */ - public List<StreamsQuery> all() throws RyaStreamsException; + public Set<StreamsQuery> all() throws RyaStreamsException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/adc44fd3/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultListQueries.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultListQueries.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultListQueries.java new file mode 100644 index 0000000..82ca691 --- /dev/null +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultListQueries.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.api.interactor.defaults; + +import static java.util.Objects.requireNonNull; + +import java.util.Set; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.exception.RyaStreamsException; +import org.apache.rya.streams.api.interactor.ListQueries; +import org.apache.rya.streams.api.queries.QueryRepository; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Lists all queries in Rya Streams. + */ +@DefaultAnnotation(NonNull.class) +public class DefaultListQueries implements ListQueries { + private final QueryRepository repository; + + /** + * Creates a new {@link DefaultAddQuery}. + * + * @param repository - The {@link QueryRepository} to add a query to. (not + * null) + */ + public DefaultListQueries(final QueryRepository repository) { + this.repository = requireNonNull(repository); + } + + @Override + public Set<StreamsQuery> all() throws RyaStreamsException { + return repository.list(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/adc44fd3/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java index c4c55e8..967b79e 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java @@ -18,6 +18,7 @@ */ package org.apache.rya.streams.client; +import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.google.common.base.Strings; @@ -80,7 +81,13 @@ public interface RyaStreamsCommand { /** * @return Describes what arguments may be provided to the command. */ - public String getUsage(); + default public String getUsage() { + final JCommander parser = new JCommander(new Parameters()); + + final StringBuilder usage = new StringBuilder(); + parser.usage(usage); + return usage.toString(); + } /** * Execute the command using the command line arguments. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/adc44fd3/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java new file mode 100644 index 0000000..ec40b50 --- /dev/null +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.client.command; + +import static java.util.Objects.requireNonNull; + +import java.util.Properties; +import java.util.Set; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.exception.RyaStreamsException; +import org.apache.rya.streams.api.interactor.ListQueries; +import org.apache.rya.streams.api.interactor.defaults.DefaultListQueries; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.client.RyaStreamsCommand; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.ParameterException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A command that lists all queries currently in Rya Streams. + */ +@DefaultAnnotation(NonNull.class) +public class ListQueriesCommand implements RyaStreamsCommand { + private static final Logger log = LoggerFactory.getLogger(ListQueriesCommand.class); + + @Override + public String getCommand() { + return "list-queries"; + } + + @Override + public String getDescription() { + return "Lists all queries currently in Rya Streams."; + } + + @Override + public void execute(final String[] args) throws ArgumentsException, ExecutionException { + requireNonNull(args); + + // Parse the command line arguments. + final Parameters params = new Parameters(); + try { + new JCommander(params, args); + } catch (final ParameterException e) { + throw new ArgumentsException("Could not list the queries because of invalid command line parameters.", e); + } + log.trace("Executing the List Query Command.\n" + params.toString()); + + // Create properties for interacting with Kafka. + final Properties producerProperties = new Properties(); + producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort); + producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName()); + final Properties consumerProperties = new Properties(); + consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort); + consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName()); + final Producer<?, QueryChange> queryProducer = new KafkaProducer<>(producerProperties); + final Consumer<?, QueryChange> queryConsumer = new KafkaConsumer<>(consumerProperties); + + final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, params.topicName); + final QueryRepository repo = new InMemoryQueryRepository(changeLog); + final ListQueries listQueries = new DefaultListQueries(repo); + try { + final Set<StreamsQuery> queries = listQueries.all(); + logQueries(queries); + } catch (final RyaStreamsException e) { + log.error("Unable to retrieve the queries.", e); + } + } + + private void logQueries(final Set<StreamsQuery> queries) { + final StringBuilder sb = new StringBuilder(); + sb.append("\n"); + sb.append("Queries in Rya Streams:\n"); + sb.append("---------------------------------------------------------\n"); + queries.forEach(query -> { + sb.append("ID: "); + sb.append(query.getQueryId()); + sb.append("\t\t"); + sb.append("Query: "); + sb.append(query.getSparql()); + sb.append("\n"); + }); + log.trace(sb.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/adc44fd3/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java new file mode 100644 index 0000000..be90c5f --- /dev/null +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.client.command; + +import static org.junit.Assert.assertEquals; + +import java.util.Properties; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.rya.test.kafka.KafkaITBase; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +/** + * integration Test for listing queries through a command. + */ +public class ListQueryCommandIT extends KafkaITBase { + private String[] args; + + @Rule + public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true); + + @Before + public void setup() { + final Properties props = rule.createBootstrapServerConfig(); + final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); + final String[] tokens = location.split(":"); + args = new String[] { + "-t", rule.getKafkaTopicName(), + "-p", tokens[1], + "-i", tokens[0] + }; + } + + @Test + public void happyListQueriesTest() throws Exception { + final ListQueriesCommand command = new ListQueriesCommand(); + command.execute(args); + // not sure what to assert here. + assertEquals(true, true); + } +}