http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml index 16d33b2..439ea16 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml @@ -34,17 +34,17 @@ under the License. that allow other applications to interact with the Rya PCJ Fluo application while it is running on a cluster. </description> - + <dependencies> - <!-- Rya Runtime Dependencies. --> - <dependency> + <!-- Rya Runtime Dependencies. --> + <dependency> <groupId>org.apache.rya</groupId> <artifactId>rya.pcj.fluo.app</artifactId> </dependency> <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.periodic.service.api</artifactId> - </dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.periodic.notification.api</artifactId> + </dependency> <dependency> <groupId>org.apache.rya</groupId> <artifactId>rya.sail</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.api/.gitignore ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/.gitignore b/extras/rya.periodic.service/periodic.service.api/.gitignore deleted file mode 100644 index b83d222..0000000 --- a/extras/rya.periodic.service/periodic.service.api/.gitignore +++ /dev/null @@ -1 +0,0 @@ -/target/ http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.api/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/pom.xml b/extras/rya.periodic.service/periodic.service.api/pom.xml deleted file mode 100644 index b57beaf..0000000 --- a/extras/rya.periodic.service/periodic.service.api/pom.xml +++ /dev/null @@ -1,52 +0,0 @@ -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <!-- Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with this - work for additional information regarding copyright ownership. The ASF licenses - this file to you under the Apache License, Version 2.0 (the "License"); you - may not use this file except in compliance with the License. You may obtain - a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless - required by applicable law or agreed to in writing, software distributed - under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES - OR CONDITIONS OF ANY KIND, either express or implied. See the License for - the specific language governing permissions and limitations under the License. --> - - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.rya</groupId> - <artifactId>rya.periodic.service</artifactId> - <version>3.2.11-incubating-SNAPSHOT</version> - </parent> - - <artifactId>rya.periodic.service.api</artifactId> - - <name>Apache Rya Periodic Service API</name> - <description>API for Periodic Service Application</description> - - <dependencies> - - <dependency> - <groupId>com.google.code.gson</groupId> - <artifactId>gson</artifactId> - <version>2.8.0</version> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-query</artifactId> - </dependency> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka-clients</artifactId> - </dependency> - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.indexing.pcj</artifactId> - </dependency> - </dependencies> - -</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java deleted file mode 100644 index f4a083c..0000000 --- a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.api; - -import org.openrdf.query.Binding; -import org.openrdf.query.BindingSet; - -/** - * Object that cleans up old {@link BindingSet}s corresponding to the specified - * {@link NodeBin}. This class deletes all BindingSets with the bin - * indicated by {@link NodeBin#getBin()}. A BindingSet corresponds to a given - * bin if it contains a {@link Binding} with name {@link IncrementalUpdateConstants#PERIODIC_BIN_ID} - * and value equal to the given bin. - * - */ -public interface BinPruner { - - /** - * Cleans up all {@link BindingSet}s associated with the indicated {@link NodeBin}. - * @param bin - NodeBin that indicates which BindingSets to delete.. - */ - public void pruneBindingSetBin(NodeBin bin); - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java deleted file mode 100644 index 491576b..0000000 --- a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.api; - -import org.openrdf.query.BindingSet; - -/** - * An Object that is used to export {@link BindingSet}s to an external repository or queuing system. - * - */ -public interface BindingSetExporter { - - /** - * This method exports the BindingSet to the external repository or queuing system - * that this BindingSetExporter is configured to export to. - * @param bindingSet - {@link BindingSet} to be exported - * @throws ResultExportException - */ - public void exportNotification(BindingSetRecord bindingSet) throws BindingSetRecordExportException; - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java deleted file mode 100644 index c3f70f1..0000000 --- a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.api; - -import org.openrdf.query.BindingSet; - -import com.google.common.base.Objects; - -/** - * Object that associates a {@link BindingSet} with a given Kafka topic. - * This ensures that the {@link KafkaPeriodicBindingSetExporter} can export - * each BindingSet to its appropriate topic. - * - */ -public class BindingSetRecord { - - private BindingSet bs; - private String topic; - - public BindingSetRecord(BindingSet bs, String topic) { - this.bs = bs; - this.topic = topic; - } - - /** - * @return BindingSet in this BindingSetRecord - */ - public BindingSet getBindingSet() { - return bs; - } - - /** - * @return Kafka topic for this BindingSetRecord - */ - public String getTopic() { - return topic; - } - - @Override - public boolean equals(Object o) { - if(this == o) { - return true; - } - - if(o instanceof BindingSetRecord) { - BindingSetRecord record = (BindingSetRecord) o; - return Objects.equal(this.bs, record.bs)&&Objects.equal(this.topic,record.topic); - } - - return false; - } - - @Override - public int hashCode() { - return Objects.hashCode(bs, topic); - } - - @Override - public String toString() { - return new StringBuilder().append("Binding Set Record \n").append(" Topic: " + topic + "\n").append(" BindingSet: " + bs + "\n") - .toString(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java deleted file mode 100644 index 94e4980..0000000 --- a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.api; - -/** - * A result could not be exported. - */ -public class BindingSetRecordExportException extends Exception { - private static final long serialVersionUID = 1L; - - /** - * Constructs an instance of {@link BindingSetRecordExportException}. - * - * @param message - Explains why the exception was thrown. - */ - public BindingSetRecordExportException(final String message) { - super(message); - } - - /** - * Constructs an instance of {@link BindingSetRecordExportException}. - * - * @param message - Explains why the exception was thrown. - * @param cause - The exception that caused this one to be thrown. - */ - public BindingSetRecordExportException(final String message, final Throwable cause) { - super(message, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java deleted file mode 100644 index b1e8bad..0000000 --- a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.api; - -/** - * Interface providing basic life cycle functionality, - * including stopping and starting any class implementing this - * interface and checking whether is it running. - * - */ -public interface LifeCycle { - - /** - * Starts a running application. - */ - public void start(); - - /** - * Stops a running application. - */ - public void stop(); - - /** - * Determine if application is currently running. - * @return true if application is running and false otherwise. - */ - public boolean currentlyRunning(); - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java deleted file mode 100644 index 3ed7979..0000000 --- a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.api; - -import java.util.Objects; - -/** - * Object used to indicate the id of a given Periodic Query - * along with a particular bin of results. This Object is used - * by the {@link BinPruner} to clean up old query results after - * they have been processed. - * - */ -public class NodeBin { - - private long bin; - private String nodeId; - - public NodeBin(String nodeId, long bin) { - this.bin = bin; - this.nodeId = nodeId; - } - - /** - * @return id of Periodic Query - */ - public String getNodeId() { - return nodeId; - } -/** - * @return bin id of results for a given Periodic Query - */ - public long getBin() { - return bin; - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - - if (other instanceof NodeBin) { - NodeBin bin = (NodeBin) other; - return this.bin == bin.bin && this.nodeId.equals(bin.nodeId); - } - - return false; - } - - @Override - public int hashCode() { - return Objects.hash(bin, nodeId); - } - - @Override - public String toString() { - return new StringBuilder().append("Node Bin \n").append(" QueryId: " + nodeId + "\n").append(" Bin: " + bin + "\n").toString(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java deleted file mode 100644 index 3e9e0d1..0000000 --- a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.api; - -/** - * Notification Object used by the Periodic Query Service - * to inform workers to process results for a given Periodic - * Query with the indicated id. - * - */ -public interface Notification { - - /** - * @return id of a Periodic Query - */ - public String getId(); - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java deleted file mode 100644 index d53dc17..0000000 --- a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.api; - -import java.util.concurrent.ScheduledExecutorService; - -import org.apache.rya.periodic.notification.notification.CommandNotification; - -/** - * Object that manages the periodic notifications for the Periodic Query Service. - * This Object processes new requests for periodic updates by registering them with - * some sort of service that generates periodic updates (such as a {@link ScheduledExecutorService}). - * - */ -public interface NotificationCoordinatorExecutor extends LifeCycle { - - /** - * Registers or deletes a {@link CommandNotification}s with the periodic service to - * generate notifications at a regular interval indicated by the CommandNotification. - * @param notification - CommandNotification to be registered or deleted from the periodic update - * service. - */ - public void processNextCommandNotification(CommandNotification notification); - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java deleted file mode 100644 index 4ac9089..0000000 --- a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java +++ /dev/null @@ -1,41 +0,0 @@ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.api; - -import org.apache.rya.periodic.notification.notification.TimestampedNotification; - -/** - * Object that processes new {@link TimestampedNotification}s generated by {@link NotificationCoordinatorExecutor}. - * It is expected that the NotificationCoordinatorExecutor will this Object with notifications to perform work via some sort - * sort of queuing service such as a BlockingQueue or Kafka. This Object processes the notifications by retrieving - * query results associated with the Periodic Query id given by {@link TimestampedNotification#getId()}, parsing them - * and then providing them to another service to be exported. - * - */ -public interface NotificationProcessor { - - /** - * Processes {@link TimestampedNotification}s by retrieving the Periodic Query results - * associated the query id given by {@link TimestampedNotification#getId()}. - * @param notification - contains information about which query results to retrieve - */ - public void processNotification(TimestampedNotification notification); - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java deleted file mode 100644 index ff08733..0000000 --- a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.api; - -import java.util.concurrent.TimeUnit; - -import org.apache.rya.periodic.notification.notification.BasicNotification; -import org.apache.rya.periodic.notification.notification.PeriodicNotification; - -/** - * Object to register {@link PeriodicNotification}s with an external queuing - * service to be handled by a {@link NotificationCoordinatorExecutor} service. - * The service will generate notifications to process Periodic Query results at regular - * intervals corresponding the period of the PeriodicNotification. - * - */ -public interface PeriodicNotificationClient extends AutoCloseable { - - /** - * Adds a new notification to be registered with the {@link NotificationCoordinatorExecutor} - * @param notification - notification to be added - */ - public void addNotification(PeriodicNotification notification); - - /** - * Deletes a notification from the {@link NotificationCoordinatorExecutor}. - * @param notification - notification to be deleted - */ - public void deleteNotification(BasicNotification notification); - - /** - * Deletes a notification from the {@link NotificationCoordinatorExecutor}. - * @param notification - id corresponding to the notification to be deleted - */ - public void deleteNotification(String notificationId); - - /** - * Adds a new notification with the indicated id and period to the {@link NotificationCoordinatorExecutor} - * @param id - Periodic Query id - * @param period - period indicating frequency at which notifications will be generated - * @param delay - initial delay for starting periodic notifications - * @param unit - time unit of delay and period - */ - public void addNotification(String id, long period, long delay, TimeUnit unit); - - public void close(); - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java deleted file mode 100644 index c31a5c0..0000000 --- a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.notification; - -import org.apache.rya.periodic.notification.api.Notification; - -import com.google.common.base.Objects; - -/** - * Notification Object used by the Periodic Query Service - * to inform workers to process results for a given Periodic - * Query with the indicated id. - * - */ -public class BasicNotification implements Notification { - - private String id; - - /** - * Creates a BasicNotification - * @param id - Fluo query id associated with this Notification - */ - public BasicNotification(String id) { - this.id = id; - } - - /** - * @return the Fluo Query Id that this notification will generate results for - */ - @Override - public String getId() { - return id; - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - - if (other instanceof BasicNotification) { - BasicNotification not = (BasicNotification) other; - return Objects.equal(this.id, not.id); - } - - return false; - } - - @Override - public int hashCode() { - return Objects.hashCode(id); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - return builder.append("id").append("=").append(id).toString(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java deleted file mode 100644 index 597b228..0000000 --- a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.notification; - -import org.apache.rya.periodic.notification.api.Notification; - -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; - -/** - * This Object contains a Notification Object used by the Periodic Query Service - * to inform workers to process results for a given Periodic Query with the - * indicated id. Additionally, the CommandNotification contains a - * {@link Command} about which action the - * {@link NotificationCoordinatorExecutor} should take (adding or deleting). - * CommandNotifications are meant to be added to an external work queue (such as - * Kafka) to be processed by the NotificationCoordinatorExecutor. - * - */ -public class CommandNotification implements Notification { - - private Notification notification; - private Command command; - - public enum Command { - ADD, DELETE - }; - - /** - * Creates a new CommandNotification - * @param command - the command associated with this notification (either add, update, or delete) - * @param notification - the underlying notification associated with this command - */ - public CommandNotification(Command command, Notification notification) { - this.notification = Preconditions.checkNotNull(notification); - this.command = Preconditions.checkNotNull(command); - } - - @Override - public String getId() { - return notification.getId(); - } - - /** - * Returns {@link Notification} contained by this CommmandNotification. - * @return - Notification contained by this Object - */ - public Notification getNotification() { - return this.notification; - } - - /** - * @return Command contained by this Object (either add or delete) - */ - public Command getCommand() { - return this.command; - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (other instanceof CommandNotification) { - CommandNotification cn = (CommandNotification) other; - return Objects.equal(this.command, cn.command) && Objects.equal(this.notification, cn.notification); - } else { - return false; - } - } - - @Override - public int hashCode() { - return Objects.hashCode(command, notification); - } - - @Override - public String toString() { - return new StringBuilder().append("command").append("=").append(command.toString()).append(";") - .append(notification.toString()).toString(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java deleted file mode 100644 index aa9e581..0000000 --- a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.notification; - -import java.util.Objects; -import java.util.concurrent.TimeUnit; - -import org.apache.rya.periodic.notification.api.Notification; - -import com.google.common.base.Preconditions; - -/** - * Notification Object used by the Periodic Query Service to inform workers to - * process results for a given Periodic Query with the indicated id. - * Additionally, this Object contains a period that indicates a frequency at - * which regular updates are generated. - * - */ -public class PeriodicNotification implements Notification { - - private String id; - private long period; - private TimeUnit periodTimeUnit; - private long initialDelay; - - /** - * Creates a PeriodicNotification. - * @param id - Fluo Query Id that this notification is associated with - * @param period - period at which notifications are generated - * @param periodTimeUnit - time unit associated with the period and delay - * @param initialDelay - amount of time to wait before generating the first notification - */ - public PeriodicNotification(String id, long period, TimeUnit periodTimeUnit, long initialDelay) { - this.id = Preconditions.checkNotNull(id); - this.periodTimeUnit = Preconditions.checkNotNull(periodTimeUnit); - Preconditions.checkArgument(period > 0 && initialDelay >= 0); - this.period = period; - this.initialDelay = initialDelay; - } - - - /** - * Create a PeriodicNotification - * @param other - other PeriodicNotification used in copy constructor - */ - public PeriodicNotification(PeriodicNotification other) { - this(other.id, other.period, other.periodTimeUnit, other.initialDelay); - } - - public String getId() { - return id; - } - - /** - * @return - period at which regular notifications are generated - */ - public long getPeriod() { - return period; - } - - /** - * @return time unit of period and initial delay - */ - public TimeUnit getTimeUnit() { - return periodTimeUnit; - } - - /** - * @return amount of time to delay before beginning to generate notifications - */ - public long getInitialDelay() { - return initialDelay; - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - String delim = "="; - String delim2 = ";"; - return builder.append("id").append(delim).append(id).append(delim2).append("period").append(delim).append(period).append(delim2) - .append("periodTimeUnit").append(delim).append(periodTimeUnit).append(delim2).append("initialDelay").append(delim) - .append(initialDelay).toString(); - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - - if (!(other instanceof PeriodicNotification)) { - return false; - } - - PeriodicNotification notification = (PeriodicNotification) other; - return Objects.equals(this.id, notification.id) && (this.period == notification.period) - && Objects.equals(this.periodTimeUnit, notification.periodTimeUnit) && (this.initialDelay == notification.initialDelay); - } - - @Override - public int hashCode() { - return Objects.hash(id, period, periodTimeUnit, initialDelay); - } - - public static Builder builder() { - return new Builder(); - } - - public static class Builder { - - private String id; - private long period; - private TimeUnit periodTimeUnit; - private long initialDelay = 0; - - /** - * @param id - periodic query id - * @return - builder to chain method calls - */ - public Builder id(String id) { - this.id = id; - return this; - } - - /** - * @param period of the periodic notification for generating regular notifications - * @return - builder to chain method calls - */ - public Builder period(long period) { - this.period = period; - return this; - } - - /** - * @param timeUnit of period and initial delay - * @return - builder to chain method calls - */ - public Builder timeUnit(TimeUnit timeUnit) { - this.periodTimeUnit = timeUnit; - return this; - } - - /** - * @param initialDelay - amount of time to wait before generating notifications - * @return - builder to chain method calls - */ - public Builder initialDelay(long initialDelay) { - this.initialDelay = initialDelay; - return this; - } - - /** - * Builds PeriodicNotification - * @return PeriodicNotification constructed from Builder specified parameters - */ - public PeriodicNotification build() { - return new PeriodicNotification(id, period, periodTimeUnit, initialDelay); - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java deleted file mode 100644 index 38073ce..0000000 --- a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.notification; - -import java.util.Date; -import java.util.concurrent.TimeUnit; - -/** - * {@link PeriodicNotification} Object used by the Periodic Query Service to inform workers to - * process results for a given Periodic Query with the indicated id. Additionally - * this Object contains a {@link Date} object to indicate the date time at which this - * notification was generated. - * - */ -public class TimestampedNotification extends PeriodicNotification { - - private Date date; - - /** - * Constructs a TimestampedNotification - * @param id - Fluo Query Id associated with this Notification - * @param period - period at which notifications are generated - * @param periodTimeUnit - time unit associated with period and initial delay - * @param initialDelay - amount of time to wait before generating first notification - */ - public TimestampedNotification(String id, long period, TimeUnit periodTimeUnit, long initialDelay) { - super(id, period, periodTimeUnit, initialDelay); - date = new Date(); - } - - /** - * Creates a TimestampedNotification - * @param notification - PeriodicNotification used to create this TimestampedNotification. - * This constructor creates a time stamp for the TimestampedNotification. - */ - public TimestampedNotification(PeriodicNotification notification) { - super(notification); - date = new Date(); - } - - /** - * @return timestamp at which this notification was generated - */ - public Date getTimestamp() { - return date; - } - - @Override - public String toString() { - return super.toString() + ";date=" + date; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java deleted file mode 100644 index bb438be..0000000 --- a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.registration; - -import java.util.concurrent.TimeUnit; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.rya.periodic.notification.api.Notification; -import org.apache.rya.periodic.notification.api.PeriodicNotificationClient; -import org.apache.rya.periodic.notification.notification.BasicNotification; -import org.apache.rya.periodic.notification.notification.CommandNotification; -import org.apache.rya.periodic.notification.notification.CommandNotification.Command; -import org.apache.rya.periodic.notification.notification.PeriodicNotification; - -/** - * Implementation of {@link PeriodicNotificaitonClient} used to register new notification - * requests with the PeriodicQueryService. - * - */ -public class KafkaNotificationRegistrationClient implements PeriodicNotificationClient { - - private KafkaProducer<String, CommandNotification> producer; - private String topic; - - public KafkaNotificationRegistrationClient(String topic, KafkaProducer<String, CommandNotification> producer) { - this.topic = topic; - this.producer = producer; - } - - @Override - public void addNotification(PeriodicNotification notification) { - processNotification(new CommandNotification(Command.ADD, notification)); - - } - - @Override - public void deleteNotification(BasicNotification notification) { - processNotification(new CommandNotification(Command.DELETE, notification)); - } - - @Override - public void deleteNotification(String notificationId) { - processNotification(new CommandNotification(Command.DELETE, new BasicNotification(notificationId))); - } - - @Override - public void addNotification(String id, long period, long delay, TimeUnit unit) { - Notification notification = PeriodicNotification.builder().id(id).period(period).initialDelay(delay).timeUnit(unit).build(); - processNotification(new CommandNotification(Command.ADD, notification)); - } - - - private void processNotification(CommandNotification notification) { - producer.send(new ProducerRecord<String, CommandNotification>(topic, notification.getId(), notification)); - } - - @Override - public void close() { - producer.close(); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java deleted file mode 100644 index bd29d29..0000000 --- a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.serialization; - -import java.lang.reflect.Type; - -import org.apache.rya.periodic.notification.notification.BasicNotification; - -import com.google.gson.JsonDeserializationContext; -import com.google.gson.JsonDeserializer; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParseException; -import com.google.gson.JsonPrimitive; -import com.google.gson.JsonSerializationContext; -import com.google.gson.JsonSerializer; - -/** - * {@link TypeAdapter} for {@link BasicNotification}s. Used in {@link CommandNotificationTypeAdapter} to - * serialize {@link CommandNotification}s. - * - */ -public class BasicNotificationTypeAdapter implements JsonDeserializer<BasicNotification>, JsonSerializer<BasicNotification> { - - @Override - public JsonElement serialize(BasicNotification arg0, Type arg1, JsonSerializationContext arg2) { - JsonObject result = new JsonObject(); - result.add("id", new JsonPrimitive(arg0.getId())); - return result; - } - - @Override - public BasicNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2) throws JsonParseException { - JsonObject json = arg0.getAsJsonObject(); - String id = json.get("id").getAsString(); - return new BasicNotification(id); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java deleted file mode 100644 index 50180ad..0000000 --- a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.serialization; - -import java.io.UnsupportedEncodingException; -import java.util.Arrays; -import java.util.Map; - -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer; -import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; -import org.openrdf.query.BindingSet; -import org.openrdf.query.algebra.evaluation.QueryBindingSet; - -import com.google.common.base.Joiner; -import com.google.common.primitives.Bytes; - -/** - * Kafka {@link Serializer} and {@link Deserializer} for producing and consuming messages - * from Kafka. - * - */ -public class BindingSetSerDe implements Serializer<BindingSet>, Deserializer<BindingSet> { - - private static final Logger log = Logger.getLogger(BindingSetSerDe.class); - private static final AccumuloPcjSerializer serializer = new AccumuloPcjSerializer(); - private static final byte[] DELIM_BYTE = "\u0002".getBytes(); - - private byte[] toBytes(BindingSet bindingSet) { - try { - return getBytes(getVarOrder(bindingSet), bindingSet); - } catch(Exception e) { - log.trace("Unable to serialize BindingSet: " + bindingSet); - return new byte[0]; - } - } - - private BindingSet fromBytes(byte[] bsBytes) { - try{ - int firstIndex = Bytes.indexOf(bsBytes, DELIM_BYTE); - byte[] varOrderBytes = Arrays.copyOf(bsBytes, firstIndex); - byte[] bsBytesNoVarOrder = Arrays.copyOfRange(bsBytes, firstIndex + 1, bsBytes.length); - VariableOrder varOrder = new VariableOrder(new String(varOrderBytes,"UTF-8").split(";")); - return getBindingSet(varOrder, bsBytesNoVarOrder); - } catch(Exception e) { - log.trace("Unable to deserialize BindingSet: " + bsBytes); - return new QueryBindingSet(); - } - } - - private VariableOrder getVarOrder(BindingSet bs) { - return new VariableOrder(bs.getBindingNames()); - } - - private byte[] getBytes(VariableOrder varOrder, BindingSet bs) throws UnsupportedEncodingException, BindingSetConversionException { - byte[] bsBytes = serializer.convert(bs, varOrder); - String varOrderString = Joiner.on(";").join(varOrder.getVariableOrders()); - byte[] varOrderBytes = varOrderString.getBytes("UTF-8"); - return Bytes.concat(varOrderBytes, DELIM_BYTE, bsBytes); - } - - private BindingSet getBindingSet(VariableOrder varOrder, byte[] bsBytes) throws BindingSetConversionException { - return serializer.convert(bsBytes, varOrder); - } - - @Override - public BindingSet deserialize(String topic, byte[] bytes) { - return fromBytes(bytes); - } - - @Override - public void close() { - // Do nothing. Nothing to close. - } - - @Override - public void configure(Map<String, ?> arg0, boolean arg1) { - // Do nothing. Nothing to configure. - } - - @Override - public byte[] serialize(String topic, BindingSet bs) { - return toBytes(bs); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java deleted file mode 100644 index 302e1be..0000000 --- a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.serialization; - -import java.io.UnsupportedEncodingException; -import java.util.Map; - -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.rya.periodic.notification.api.Notification; -import org.apache.rya.periodic.notification.notification.CommandNotification; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; - -/** - * Kafka {@link Serializer} and {@link Deserializer} for producing and consuming {@link CommandNotification}s - * to and from Kafka. - * - */ -public class CommandNotificationSerializer implements Serializer<CommandNotification>, Deserializer<CommandNotification> { - - private static Gson gson = new GsonBuilder() - .registerTypeHierarchyAdapter(Notification.class, new CommandNotificationTypeAdapter()).create(); - private static final Logger LOG = LoggerFactory.getLogger(CommandNotificationSerializer.class); - - @Override - public CommandNotification deserialize(String topic, byte[] bytes) { - String json = null; - try { - json = new String(bytes, "UTF-8"); - } catch (UnsupportedEncodingException e) { - LOG.info("Unable to deserialize notification for topic: " + topic); - } - return gson.fromJson(json, CommandNotification.class); - } - - @Override - public byte[] serialize(String topic, CommandNotification command) { - try { - return gson.toJson(command).getBytes("UTF-8"); - } catch (UnsupportedEncodingException e) { - LOG.info("Unable to serialize notification: " + command + "for topic: " + topic); - throw new RuntimeException(e); - } - } - - @Override - public void close() { - // Do nothing. Nothing to close - } - - @Override - public void configure(Map<String, ?> arg0, boolean arg1) { - // Do nothing. Nothing to configure - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java deleted file mode 100644 index a9fb7e1..0000000 --- a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.serialization; - -import java.lang.reflect.Type; - -import org.apache.rya.periodic.notification.api.Notification; -import org.apache.rya.periodic.notification.notification.BasicNotification; -import org.apache.rya.periodic.notification.notification.CommandNotification; -import org.apache.rya.periodic.notification.notification.PeriodicNotification; -import org.apache.rya.periodic.notification.notification.CommandNotification.Command; - -import com.google.gson.JsonDeserializationContext; -import com.google.gson.JsonDeserializer; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParseException; -import com.google.gson.JsonPrimitive; -import com.google.gson.JsonSerializationContext; -import com.google.gson.JsonSerializer; - -/** - * {@link TypeAdapter} used to serialize and deserialize {@link CommandNotification}s. - * This TypeAdapter is used in {@link CommandNotificationSerializer} for producing and - * consuming messages to and from Kafka. - * - */ -public class CommandNotificationTypeAdapter - implements JsonDeserializer<CommandNotification>, JsonSerializer<CommandNotification> { - - @Override - public JsonElement serialize(CommandNotification arg0, Type arg1, JsonSerializationContext arg2) { - JsonObject result = new JsonObject(); - result.add("command", new JsonPrimitive(arg0.getCommand().name())); - Notification notification = arg0.getNotification(); - if (notification instanceof PeriodicNotification) { - result.add("type", new JsonPrimitive(PeriodicNotification.class.getSimpleName())); - PeriodicNotificationTypeAdapter adapter = new PeriodicNotificationTypeAdapter(); - result.add("notification", - adapter.serialize((PeriodicNotification) notification, PeriodicNotification.class, arg2)); - } else if (notification instanceof BasicNotification) { - result.add("type", new JsonPrimitive(BasicNotification.class.getSimpleName())); - BasicNotificationTypeAdapter adapter = new BasicNotificationTypeAdapter(); - result.add("notification", - adapter.serialize((BasicNotification) notification, BasicNotification.class, arg2)); - } else { - throw new IllegalArgumentException("Invalid notification type."); - } - return result; - } - - @Override - public CommandNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2) - throws JsonParseException { - - JsonObject json = arg0.getAsJsonObject(); - Command command = Command.valueOf(json.get("command").getAsString()); - String type = json.get("type").getAsString(); - Notification notification = null; - if (type.equals(PeriodicNotification.class.getSimpleName())) { - notification = (new PeriodicNotificationTypeAdapter()).deserialize(json.get("notification"), - PeriodicNotification.class, arg2); - } else if (type.equals(BasicNotification.class.getSimpleName())) { - notification = (new BasicNotificationTypeAdapter()).deserialize(json.get("notification"), - BasicNotification.class, arg2); - } else { - throw new JsonParseException("Cannot deserialize Json"); - } - - return new CommandNotification(command, notification); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java deleted file mode 100644 index fcc0ba2..0000000 --- a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.serialization; - -import java.lang.reflect.Type; -import java.util.concurrent.TimeUnit; - -import org.apache.rya.periodic.notification.notification.PeriodicNotification; -import org.apache.rya.periodic.notification.notification.PeriodicNotification.Builder; - -import com.google.gson.JsonDeserializationContext; -import com.google.gson.JsonDeserializer; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParseException; -import com.google.gson.JsonPrimitive; -import com.google.gson.JsonSerializationContext; -import com.google.gson.JsonSerializer; - -/** - * {@link TypeAdapter} used to serialize and deserialize {@link PeriodicNotification}s. - * This TypeAdapter is used in {@link CommandNotificationTypeAdapter} which is used in - * {@link CommandNotificationSerializer} for producing and consuming messages to and from - * Kafka. - * - */ -public class PeriodicNotificationTypeAdapter - implements JsonSerializer<PeriodicNotification>, JsonDeserializer<PeriodicNotification> { - - @Override - public PeriodicNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2) - throws JsonParseException { - - JsonObject json = arg0.getAsJsonObject(); - String id = json.get("id").getAsString(); - long period = json.get("period").getAsLong(); - TimeUnit periodTimeUnit = TimeUnit.valueOf(json.get("timeUnit").getAsString()); - long initialDelay = json.get("initialDelay").getAsLong(); - Builder builder = PeriodicNotification.builder().id(id).period(period) - .initialDelay(initialDelay).timeUnit(periodTimeUnit); - - return builder.build(); - } - - @Override - public JsonElement serialize(PeriodicNotification arg0, Type arg1, JsonSerializationContext arg2) { - - JsonObject result = new JsonObject(); - result.add("id", new JsonPrimitive(arg0.getId())); - result.add("period", new JsonPrimitive(arg0.getPeriod())); - result.add("initialDelay", new JsonPrimitive(arg0.getInitialDelay())); - result.add("timeUnit", new JsonPrimitive(arg0.getTimeUnit().name())); - - return result; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml b/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml deleted file mode 100644 index 402f81d..0000000 --- a/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml +++ /dev/null @@ -1,62 +0,0 @@ -<?xml version="1.0" encoding="utf-8"?> -<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor - license agreements. See the NOTICE file distributed with this work for additional - information regarding copyright ownership. The ASF licenses this file to - you under the Apache License, Version 2.0 (the "License"); you may not use - this file except in compliance with the License. You may obtain a copy of - the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required - by applicable law or agreed to in writing, software distributed under the - License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS - OF ANY KIND, either express or implied. See the License for the specific - language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.rya</groupId> - <artifactId>rya.periodic.service</artifactId> - <version>3.2.11-incubating-SNAPSHOT</version> - </parent> - - <artifactId>rya.periodic.service.integration.tests</artifactId> - - <name>Apache Rya Periodic Service Integration Tests</name> - <description>Integration Tests for Rya Periodic Service</description> - - <dependencies> - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.pcj.fluo.test.base</artifactId> - <exclusions> - <exclusion> - <artifactId>log4j-1.2-api</artifactId> - <groupId>org.apache.logging.log4j</groupId> - </exclusion> - <exclusion> - <artifactId>log4j-api</artifactId> - <groupId>org.apache.logging.log4j</groupId> - </exclusion> - <exclusion> - <artifactId>log4j-core</artifactId> - <groupId>org.apache.logging.log4j</groupId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.periodic.service.notification</artifactId> - <exclusions> - <exclusion> - <artifactId>logback-classic</artifactId> - <groupId>ch.qos.logback</groupId> - </exclusion> - <exclusion> - <artifactId>logback-core</artifactId> - <groupId>ch.qos.logback</groupId> - </exclusion> - </exclusions> - </dependency> - </dependencies> - -</project> \ No newline at end of file