This is an automated email from the ASF dual-hosted git repository. djkevincr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/gora.git
The following commit(s) were added to refs/heads/master by this push: new f2bab9c [GORA-527] Redis Datastore (#198) f2bab9c is described below commit f2bab9cebfe8514b9e67004f0b3c84158bf2b7cb Author: Kevin Ratnasekera <djkevi...@yahoo.com> AuthorDate: Tue Oct 22 15:50:11 2019 +0530 [GORA-527] Redis Datastore (#198) * add redis support * implement mentor sugggestions * remove duplicated dependency * upgrade versions * Fix proper ending xml tag * Fix test failues due to non existent mapping * Disable Hadoop tests due to failures * Improve documentation in disabled tests --- gora-redis/pom.xml | 196 ++++++++ .../java/org/apache/gora/redis/package-info.java | 20 + .../org/apache/gora/redis/query/RedisQuery.java | 46 ++ .../org/apache/gora/redis/query/RedisResult.java | 91 ++++ .../org/apache/gora/redis/query/package-info.java | 21 + .../org/apache/gora/redis/store/RedisMapping.java | 103 ++++ .../gora/redis/store/RedisMappingBuilder.java | 76 +++ .../org/apache/gora/redis/store/RedisStore.java | 540 +++++++++++++++++++++ .../org/apache/gora/redis/store/RedisType.java | 39 ++ .../org/apache/gora/redis/store/package-info.java | 20 + .../org/apache/gora/redis/util/DatumHandler.java | 399 +++++++++++++++ .../gora/redis/util/RedisStoreConstants.java | 42 ++ .../org/apache/gora/redis/util/ServerMode.java | 41 ++ .../org/apache/gora/redis/util/StorageMode.java | 34 ++ .../org/apache/gora/redis/util/package-info.java | 20 + .../org/apache/gora/redis/GoraRedisTestDriver.java | 115 +++++ .../redis/mapreduce/RedisStoreMapReduceTest.java | 70 +++ .../apache/gora/redis/mapreduce/package-info.java | 17 + .../java/org/apache/gora/redis/package-info.java | 21 + .../gora/redis/store/RedisStoreClusterTest.java | 57 +++ .../gora/redis/store/RedisStoreHashTest.java | 58 +++ .../gora/redis/store/RedisStoreStringTest.java | 58 +++ .../org/apache/gora/redis/store/package-info.java | 21 + .../redis/util/RedisStartupLogWaitStrategy.java | 54 +++ .../org/apache/gora/redis/util/package-info.java | 20 + .../src/test/resources/gora-redis-mapping.xml | 40 ++ gora-redis/src/test/resources/gora.properties | 34 ++ pom.xml | 43 +- 28 files changed, 2285 insertions(+), 11 deletions(-) diff --git a/gora-redis/pom.xml b/gora-redis/pom.xml new file mode 100755 index 0000000..ee33cb1 --- /dev/null +++ b/gora-redis/pom.xml @@ -0,0 +1,196 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.gora</groupId> + <artifactId>gora</artifactId> + <version>1.0-SNAPSHOT</version> + <relativePath>../</relativePath> + </parent> + <artifactId>gora-redis</artifactId> + <packaging>bundle</packaging> + + <name>Apache Gora :: Redis</name> + <url>http://gora.apache.org</url> + <description>The Apache Gora open source framework provides an in-memory data model and + persistence for big data. Gora supports persisting to column stores, key value stores, + document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce + support.</description> + <inceptionYear>2010</inceptionYear> + <organization> + <name>The Apache Software Foundation</name> + <url>http://www.apache.org/</url> + </organization> + <issueManagement> + <system>JIRA</system> + <url>https://issues.apache.org/jira/browse/GORA</url> + </issueManagement> + <ciManagement> + <system>Jenkins</system> + <url>https://builds.apache.org/job/Gora-trunk/</url> + </ciManagement> + + <properties> + <osgi.import>*</osgi.import> + <osgi.export>org.apache.gora.redis*;version="${project.version}";-noimport:=true</osgi.export> + </properties> + + <profiles> + <profile> + <id>redis-with-test</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.4.2</version> + <configuration> + <skipTests>false</skipTests> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> + + <build> + <directory>target</directory> + <outputDirectory>target/classes</outputDirectory> + <finalName>${project.artifactId}-${project.version}</finalName> + <testOutputDirectory>target/test-classes</testOutputDirectory> + <testSourceDirectory>src/test/java</testSourceDirectory> + <sourceDirectory>src/main/java</sourceDirectory> + <testResources> + <testResource> + <directory>${project.basedir}/src/test/resources</directory> + <includes> + <include>**/*</include> + </includes> + <!--targetPath>${project.basedir}/target/classes/</targetPath --> + </testResource> + </testResources> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.4.2</version> + <configuration> + <skipTests>true</skipTests> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>${build-helper-maven-plugin.version}</version> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/examples/java</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <!-- Gora Internal Dependencies --> + <dependency> + <groupId>org.apache.gora</groupId> + <artifactId>gora-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.gora</groupId> + <artifactId>gora-core</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <!--Redis Dependency --> + <dependency> + <groupId>org.redisson</groupId> + <artifactId>redisson-all</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + <version>4.1.36.Final</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> + + <!-- Logging Dependencies --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <exclusions> + <exclusion> + <groupId>javax.jms</groupId> + <artifactId>jms</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- Testing Dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + </dependency> + + </dependencies> + +</project> diff --git a/gora-redis/src/main/java/org/apache/gora/redis/package-info.java b/gora-redis/src/main/java/org/apache/gora/redis/package-info.java new file mode 100755 index 0000000..d8ccc55 --- /dev/null +++ b/gora-redis/src/main/java/org/apache/gora/redis/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains Redis datastore related all classes. + */ +package org.apache.gora.redis; diff --git a/gora-redis/src/main/java/org/apache/gora/redis/query/RedisQuery.java b/gora-redis/src/main/java/org/apache/gora/redis/query/RedisQuery.java new file mode 100755 index 0000000..5a8d2eb --- /dev/null +++ b/gora-redis/src/main/java/org/apache/gora/redis/query/RedisQuery.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.redis.query; + +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.impl.QueryBase; +import org.apache.gora.store.DataStore; + +/** + * Redis specific implementation of the {@link org.apache.gora.query.Query} + * interface. + */ +public class RedisQuery<K, T extends PersistentBase> extends QueryBase<K, T> { + + /** + * Constructor for the query + */ + public RedisQuery() { + super(null); + } + + /** + * Constructor for the query + * + * @param dataStore Data store used + * + */ + public RedisQuery(DataStore<K, T> dataStore) { + super(dataStore); + } + +} diff --git a/gora-redis/src/main/java/org/apache/gora/redis/query/RedisResult.java b/gora-redis/src/main/java/org/apache/gora/redis/query/RedisResult.java new file mode 100755 index 0000000..5c9656f --- /dev/null +++ b/gora-redis/src/main/java/org/apache/gora/redis/query/RedisResult.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.redis.query; + +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.Query; +import org.apache.gora.query.impl.ResultBase; +import org.apache.gora.redis.store.RedisStore; +import org.apache.gora.store.DataStore; + +/** + * Redis specific implementation of the {@link org.apache.gora.query.Result} + * interface. + */ +public class RedisResult<K, T extends PersistentBase> extends ResultBase<K, T> { + + private Iterator<K> range; + private final int size; + + /** + * Constructor of RedisResult + * + * @param dataStore Query's data store + * @param query Query + * @param idsRange Collection of found keys + */ + public RedisResult(DataStore<K, T> dataStore, Query<K, T> query, Collection<K> idsRange) { + super(dataStore, query); + this.size = idsRange.size(); + this.range = idsRange.iterator(); + } + + /** + * Gets the items reading progress + * + * @return a float value representing progress of the job + * @throws java.io.IOException if there is an error obtaining progress + */ + @Override + public float getProgress() throws IOException { + if (this.limit != -1) { + return (float) this.offset / (float) this.limit; + } else { + return 0; + } + } + + /** + * Gets the next item + * + * @return true if another result exists + * @throws java.io.IOException if for some reason we reach a result which does + * not exist + */ + @Override + protected boolean nextInner() throws IOException { + if (range == null) { + return false; + } + boolean next = range.hasNext(); + if (next) { + key = (K) range.next(); + persistent = ((RedisStore<K, T>) getDataStore()).get(key, query.getFields()); + } + + return next; + } + + @Override + public int size() { + return this.size; + } + +} diff --git a/gora-redis/src/main/java/org/apache/gora/redis/query/package-info.java b/gora-redis/src/main/java/org/apache/gora/redis/query/package-info.java new file mode 100755 index 0000000..9270674 --- /dev/null +++ b/gora-redis/src/main/java/org/apache/gora/redis/query/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains all the Redis store query representation class as well as Result set representing class + * when query is executed over the Redis dataStore. + */ +package org.apache.gora.redis.query; diff --git a/gora-redis/src/main/java/org/apache/gora/redis/store/RedisMapping.java b/gora-redis/src/main/java/org/apache/gora/redis/store/RedisMapping.java new file mode 100755 index 0000000..09be3d8 --- /dev/null +++ b/gora-redis/src/main/java/org/apache/gora/redis/store/RedisMapping.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.redis.store; + +import java.util.Map; + +/** + * Mapping definitions for Redis. + */ +public class RedisMapping { + + private int database; + private String prefix; + private Map<String, String> fields; + private Map<String, RedisType> types; + + /** + * Gets database number + * + * @return database number + */ + public int getDatabase() { + return database; + } + + /** + * Sets database number + * + * @param datebase database number + */ + public void setDatabase(int datebase) { + this.database = datebase; + } + + /** + * Gets key prefix + * + * @return prefix + */ + public String getPrefix() { + return prefix; + } + + /** + * Sets key prefix + * + * @param prefix String prefix for the creation of redis keys. + */ + public void setPrefix(String prefix) { + this.prefix = prefix; + } + + /** + * Gets mapped fields + * + * @return mapped fields + */ + public Map<String, String> getFields() { + return fields; + } + + /** + * Sets mapped fields + * + * @param fields mapped fields + */ + public void setFields(Map<String, String> fields) { + this.fields = fields; + } + + /** + * Gets types mapping + * + * @return Types mapping + */ + public Map<String, RedisType> getTypes() { + return types; + } + + /** + * Sets types mapping + * + * @param types Types mapping + */ + public void setTypes(Map<String, RedisType> types) { + this.types = types; + } + +} diff --git a/gora-redis/src/main/java/org/apache/gora/redis/store/RedisMappingBuilder.java b/gora-redis/src/main/java/org/apache/gora/redis/store/RedisMappingBuilder.java new file mode 100755 index 0000000..f2e10c1 --- /dev/null +++ b/gora-redis/src/main/java/org/apache/gora/redis/store/RedisMappingBuilder.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.redis.store; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import org.apache.gora.persistency.impl.PersistentBase; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.NodeList; + +/** + * Mapping builder for Redis + */ +public class RedisMappingBuilder<K, T extends PersistentBase> { + + private final RedisStore<K, T> dataStore; + + public RedisMappingBuilder(RedisStore<K, T> dataStore) { + this.dataStore = dataStore; + } + + public RedisMapping readMapping(InputStream inputStream) throws IOException { + try { + RedisMapping redisMapping = new RedisMapping(); + DocumentBuilder db = DocumentBuilderFactory.newInstance().newDocumentBuilder(); + Document dom = db.parse(inputStream); + Element root = dom.getDocumentElement(); + NodeList classesNodes = root.getElementsByTagName("class"); + for (int indexClasses = 0; indexClasses < classesNodes.getLength(); indexClasses++) { + Element classElement = (Element) classesNodes.item(indexClasses); + if (classElement.getAttribute("keyClass").equals(dataStore.getKeyClass().getCanonicalName()) + && classElement.getAttribute("name").equals(dataStore.getPersistentClass().getCanonicalName())) { + redisMapping.setDatabase(Integer.parseInt(classElement.getAttribute("database"))); + redisMapping.setPrefix(classElement.getAttribute("prefix")); + NodeList elementsByTagName = classElement.getElementsByTagName("field"); + Map<String, String> mapFields = new HashMap<>(); + Map<String, RedisType> mapTypes = new HashMap<>(); + for (int indexFields = 0; indexFields < elementsByTagName.getLength(); indexFields++) { + Element item = (Element) elementsByTagName.item(indexFields); + String name = item.getAttribute("name"); + String column = item.getAttribute("column"); + String type = item.getAttribute("type"); + mapFields.put(name, column); + mapTypes.put(name, RedisType.valueOf(type)); + } + redisMapping.setTypes(mapTypes); + redisMapping.setFields(mapFields); + } + } + return redisMapping; + } catch (Exception ex) { + throw new IOException(ex); + } + } + +} diff --git a/gora-redis/src/main/java/org/apache/gora/redis/store/RedisStore.java b/gora-redis/src/main/java/org/apache/gora/redis/store/RedisStore.java new file mode 100755 index 0000000..14334e5 --- /dev/null +++ b/gora-redis/src/main/java/org/apache/gora/redis/store/RedisStore.java @@ -0,0 +1,540 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.redis.store; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.invoke.MethodHandles; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; +import org.apache.avro.Schema; +import org.apache.commons.io.IOUtils; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.PartitionQuery; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.query.impl.PartitionQueryImpl; +import org.apache.gora.redis.query.RedisQuery; +import org.apache.gora.redis.query.RedisResult; +import org.apache.gora.redis.util.DatumHandler; +import static org.apache.gora.redis.util.RedisStoreConstants.PREFIX; +import static org.apache.gora.redis.util.RedisStoreConstants.END_TAG; +import static org.apache.gora.redis.util.RedisStoreConstants.FIELD_SEPARATOR; +import static org.apache.gora.redis.util.RedisStoreConstants.GORA_REDIS_ADDRESS; +import static org.apache.gora.redis.util.RedisStoreConstants.GORA_REDIS_MASTERNAME; +import static org.apache.gora.redis.util.RedisStoreConstants.GORA_REDIS_MODE; +import static org.apache.gora.redis.util.RedisStoreConstants.GORA_REDIS_READMODE; +import static org.apache.gora.redis.util.RedisStoreConstants.GORA_REDIS_STORAGE; +import static org.apache.gora.redis.util.RedisStoreConstants.INDEX; +import static org.apache.gora.redis.util.RedisStoreConstants.START_TAG; +import static org.apache.gora.redis.util.RedisStoreConstants.WILDCARD; +import org.apache.gora.redis.util.ServerMode; +import org.apache.gora.redis.util.StorageMode; +import org.apache.gora.store.impl.DataStoreBase; +import org.apache.gora.util.GoraException; +import org.redisson.Redisson; +import org.redisson.api.RBatch; +import org.redisson.api.RBucket; +import org.redisson.api.RBucketAsync; +import org.redisson.api.RFuture; +import org.redisson.api.RLexSortedSet; +import org.redisson.api.RLexSortedSetAsync; +import org.redisson.api.RList; +import org.redisson.api.RListAsync; +import org.redisson.api.RMap; +import org.redisson.api.RMapAsync; +import org.redisson.api.RScoredSortedSet; +import org.redisson.api.RScoredSortedSetAsync; +import org.redisson.api.RedissonClient; +import org.redisson.client.protocol.ScoredEntry; +import org.redisson.config.Config; +import org.redisson.config.ReadMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of a Redis data store to be used by gora. + * + * @param <K> class to be used for the key + * @param <T> class to be persisted within the store + */ +public class RedisStore<K, T extends PersistentBase> extends DataStoreBase<K, T> { + + protected static final String PARSE_MAPPING_FILE_KEY = "gora.redis.mapping.file"; + protected static final String DEFAULT_MAPPING_FILE = "gora-redis-mapping.xml"; + protected static final String XML_MAPPING_DEFINITION = "gora.mapping"; + private RedissonClient redisInstance; + private RedisMapping mapping; + public static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final DatumHandler handler = new DatumHandler(); + private StorageMode mode; + + /** + * Initialize the data store by reading the credentials, setting the client's + * properties up and reading the mapping file. Initialize is called when then + * the call to {@link org.apache.gora.store.DataStoreFactory#createDataStore} + * is made. + * + * @param keyClass Gora's key class + * @param persistentClass Persistent class + * @param properties Configurations for the data store + * @throws org.apache.gora.util.GoraException Unexpected exception during initialization + */ + @Override + public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException { + try { + super.initialize(keyClass, persistentClass, properties); + + InputStream mappingStream; + if (properties.containsKey(XML_MAPPING_DEFINITION)) { + if (LOG.isTraceEnabled()) { + LOG.trace("{} = {}", XML_MAPPING_DEFINITION, properties.getProperty(XML_MAPPING_DEFINITION)); + } + mappingStream = IOUtils.toInputStream(properties.getProperty(XML_MAPPING_DEFINITION), (Charset) null); + } else { + mappingStream = getClass().getClassLoader().getResourceAsStream(getConf().get(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE)); + } + RedisMappingBuilder mappingBuilder = new RedisMappingBuilder(this); + mapping = mappingBuilder.readMapping(mappingStream); + Config config = new Config(); + String storage = getConf().get(GORA_REDIS_STORAGE, properties.getProperty(GORA_REDIS_STORAGE)); + mode = StorageMode.valueOf(storage); + String modeString = getConf().get(GORA_REDIS_MODE, properties.getProperty(GORA_REDIS_MODE)); + ServerMode connectionMode = ServerMode.valueOf(modeString); + String name = getConf().get(GORA_REDIS_MASTERNAME, properties.getProperty(GORA_REDIS_MASTERNAME)); + String readm = getConf().get(GORA_REDIS_READMODE, properties.getProperty(GORA_REDIS_READMODE)); + //Override address in tests + String[] hosts = getConf().get(GORA_REDIS_ADDRESS, properties.getProperty(GORA_REDIS_ADDRESS)).split(","); + for (int indexHosts = 0; indexHosts < hosts.length; indexHosts++) { + hosts[indexHosts] = PREFIX + hosts[indexHosts]; + } + switch (connectionMode) { + case SINGLE: + config.useSingleServer() + .setAddress(hosts[0]) + .setDatabase(mapping.getDatabase()); + break; + case CLUSTER: + config.useClusterServers() + .addNodeAddress(hosts); + break; + case REPLICATED: + config.useReplicatedServers() + .addNodeAddress(hosts) + .setDatabase(mapping.getDatabase()); + break; + case SENTINEL: + config.useSentinelServers() + .setMasterName(name) + .setReadMode(ReadMode.valueOf(readm)) + .addSentinelAddress(hosts); + break; + default: + throw new AssertionError(connectionMode.name()); + } + redisInstance = Redisson.create(config); + if (autoCreateSchema && !schemaExists()) { + createSchema(); + } + } catch (IOException ex) { + throw new GoraException(ex); + } + } + + /** + * Redis, being a schemaless database does not support explicit schema + * creation. When the records are added to the database, the schema is created + * on the fly. Thus, schema operations are unavailable in gora-redis module. + * + * @return null + */ + @Override + public String getSchemaName() { + return null; + } + + /** + * Redis, being a schemaless database does not support explicit schema + * creation. When the records are added to the database, the schema is created + * on the fly. Thus, schema operations are unavailable in gora-redis module. + * + * @throws org.apache.gora.util.GoraException Unexpected exception. + */ + @Override + public void createSchema() throws GoraException { + } + + @Override + public void deleteSchema() throws GoraException { + redisInstance.getKeys().deleteByPattern(mapping.getPrefix() + FIELD_SEPARATOR + WILDCARD); + } + + /** + * Redis, being a schemaless database does not support explicit schema + * creation. When the records are added to the database, the schema is created + * on the fly. Thus, schema operations are unavailable in gora-redis module. + * + * @return true + * @throws org.apache.gora.util.GoraException Unexpected exception. + */ + @Override + public boolean schemaExists() throws GoraException { + return true; + } + + private String generateKeyHash(K baseKey) { + return mapping.getPrefix() + FIELD_SEPARATOR + START_TAG + baseKey + END_TAG; + } + + private String generateKeyString(String field, K baseKey) { + return generateKeyStringBase(baseKey) + field; + } + + private String generateKeyStringBase(K baseKey) { + return mapping.getPrefix() + FIELD_SEPARATOR + START_TAG + baseKey + END_TAG + FIELD_SEPARATOR; + } + + private String generateIndexKey() { + return mapping.getPrefix() + FIELD_SEPARATOR + INDEX; + } + + public T newInstanceFromString(K key, String[] fields) throws GoraException, IOException { + fields = getFieldsToQuery(fields); + T persistent = newPersistent(); + int countRetrieved = 0; + for (String f : fields) { + Schema.Field field = fieldMap.get(f); + String redisField = mapping.getFields().get(field.name()); + RedisType redisType = mapping.getTypes().get(field.name()); + Object redisVal = null; + switch (redisType) { + case STRING: + RBucket<Object> bucket = redisInstance.getBucket(generateKeyString(redisField, key)); + redisVal = bucket.isExists() ? handler.deserializeFieldValue(field, field.schema(), bucket.get(), persistent) : null; + break; + case LIST: + RList<Object> list = redisInstance.getList(generateKeyString(redisField, key)); + redisVal = list.isExists() ? handler.deserializeFieldList(field, field.schema(), list, persistent) : null; + break; + case HASH: + RMap<Object, Object> map = redisInstance.getMap(generateKeyString(redisField, key)); + redisVal = map.isExists() ? handler.deserializeFieldMap(field, field.schema(), map, persistent) : null; + break; + default: + throw new AssertionError(redisType.name()); + } + if (redisVal == null) { + continue; + } + countRetrieved++; + persistent.put(field.pos(), redisVal); + persistent.setDirty(field.pos()); + } + return countRetrieved > 0 ? persistent : null; + } + + public T newInstanceFromHash(RMap<String, Object> map, String[] fields) throws GoraException, IOException { + fields = getFieldsToQuery(fields); + T persistent = newPersistent(); + for (String fieldName : fields) { + Schema.Field field = fieldMap.get(fieldName); + Object fValue = map.get(mapping.getFields().get(field.name())); + if (fValue == null) { + continue; + } + Object fieldValue = handler.deserializeFieldValue(field, field.schema(), fValue, persistent); + persistent.put(field.pos(), fieldValue); + persistent.setDirty(field.pos()); + } + return persistent; + } + + @Override + public T get(K key, String[] fields) throws GoraException { + try { + if (mode == StorageMode.SINGLEKEY) { + RMap<String, Object> map = redisInstance.getMap(generateKeyHash(key)); + if (!map.isEmpty()) { + return newInstanceFromHash(map, fields); + } else { + return null; + } + } else { + return newInstanceFromString(key, fields); + } + } catch (IOException ex) { + throw new GoraException(ex); + } + } + + @Override + public void put(K key, T obj) throws GoraException { + try { + if (obj.isDirty()) { + Schema objectSchema = obj.getSchema(); + List<Schema.Field> fields = objectSchema.getFields(); + RBatch batchInstance = redisInstance.createBatch(); + //update secundary index + if (isNumericKey()) { + RScoredSortedSetAsync<Object> secundaryIndex = batchInstance.getScoredSortedSet(generateIndexKey()); + secundaryIndex.addAsync(obtainDoubleValue(key), key); + } else { + RLexSortedSetAsync secundaryIndex = batchInstance.getLexSortedSet(generateIndexKey()); + secundaryIndex.addAsync(key.toString()); + } + if (mode == StorageMode.SINGLEKEY) { + RMapAsync<Object, Object> map = batchInstance.getMap(generateKeyHash(key)); + fields.forEach((field) -> { + Object fieldValue = handler.serializeFieldValue(field.schema(), obj.get(field.pos())); + if (fieldValue != null) { + map.fastPutAsync(mapping.getFields().get(field.name()), fieldValue); + } else { + map.fastRemoveAsync(mapping.getFields().get(field.name())); + } + }); + } else { + for (Schema.Field field : fields) { + Object fieldValue = obj.get(field.pos()); + String redisField = mapping.getFields().get(field.name()); + RedisType redisType = mapping.getTypes().get(field.name()); + switch (redisType) { + case STRING: + RBucketAsync<Object> bucket = batchInstance.getBucket(generateKeyString(redisField, key)); + bucket.deleteAsync(); + if (fieldValue != null) { + fieldValue = handler.serializeFieldValue(field.schema(), fieldValue); + bucket.setAsync(fieldValue); + } + break; + case LIST: + RListAsync<Object> rlist = batchInstance.getList(generateKeyString(redisField, key)); + rlist.deleteAsync(); + if (fieldValue != null) { + List<Object> list = handler.serializeFieldList(field.schema(), fieldValue); + rlist.addAllAsync(list); + } + break; + case HASH: + RMapAsync<Object, Object> map = batchInstance.getMap(generateKeyString(redisField, key)); + map.deleteAsync(); + if (fieldValue != null) { + Map<Object, Object> mp = handler.serializeFieldMap(field.schema(), fieldValue); + map.putAllAsync(mp); + } + break; + default: + throw new AssertionError(redisType.name()); + } + } + } + batchInstance.execute(); + } else { + LOG.info("Ignored putting object {} in the store as it is neither " + + "new, neither dirty.", new Object[]{obj}); + } + } catch (Exception e) { + throw new GoraException(e); + } + } + + @Override + public boolean delete(K key) throws GoraException { + try { + RBatch batchInstance = redisInstance.createBatch(); + //update secundary index + if (isNumericKey()) { + RScoredSortedSetAsync<Object> secundaryIndex = batchInstance.getScoredSortedSet(generateIndexKey()); + secundaryIndex.removeAsync(key); + } else { + RLexSortedSetAsync secundaryIndex = batchInstance.getLexSortedSet(generateIndexKey()); + secundaryIndex.removeAsync(key.toString()); + } + if (mode == StorageMode.SINGLEKEY) { + RMapAsync<Object, Object> map = batchInstance.getMap(generateKeyHash(key)); + RFuture<Boolean> deleteAsync = map.deleteAsync(); + batchInstance.execute(); + return deleteAsync.get(); + } else { + batchInstance.execute(); + return redisInstance.getKeys().deleteByPattern(generateKeyStringBase(key) + WILDCARD) > 0; + } + } catch (Exception ex) { + throw new GoraException(ex); + } + } + + @Override + public long deleteByQuery(Query<K, T> query) throws GoraException { + Collection<K> range = runQuery(query); + RBatch batchInstance = redisInstance.createBatch(); + RLexSortedSetAsync secundaryIndex = batchInstance.getLexSortedSet(generateIndexKey()); + if (query.getFields() != null && query.getFields().length < mapping.getFields().size()) { + List<String> dbFields = new ArrayList<>(); + List<RedisType> dbTypes = new ArrayList<>(); + for (String af : query.getFields()) { + dbFields.add(mapping.getFields().get(af)); + dbTypes.add(mapping.getTypes().get(af)); + } + for (K key : range) { + if (mode == StorageMode.SINGLEKEY) { + RMapAsync<Object, Object> map = batchInstance.getMap(generateKeyHash(key)); + dbFields.forEach((field) -> { + map.removeAsync(field); + }); + } else { + for (int indexField = 0; indexField < dbFields.size(); indexField++) { + String field = dbFields.get(indexField); + RedisType type = dbTypes.get(indexField); + switch (type) { + case STRING: + RBucketAsync<Object> bucket = batchInstance.getBucket(generateKeyString(field, key)); + bucket.deleteAsync(); + break; + case LIST: + RListAsync<Object> rlist = batchInstance.getList(generateKeyString(field, key)); + rlist.deleteAsync(); + break; + case HASH: + RMapAsync<Object, Object> map = batchInstance.getMap(generateKeyString(field, key)); + map.deleteAsync(); + break; + default: + throw new AssertionError(type.name()); + } + } + } + } + } else { + range.stream().map((key) -> { + secundaryIndex.removeAsync(key); + return key; + }).forEachOrdered((key) -> { + if (mode == StorageMode.SINGLEKEY) { + RMapAsync<Object, Object> map = batchInstance.getMap(generateKeyHash(key)); + map.deleteAsync(); + } else { + redisInstance.getKeys().deleteByPattern(generateKeyStringBase(key) + WILDCARD); + } + }); + } + batchInstance.execute(); + return range.size(); + } + + /** + * Execute the query and return the result. + * + * @param query Query sent to Redis + * @return Query result + * @throws org.apache.gora.util.GoraException Unexpected exception in querying process. + */ + @Override + public Result<K, T> execute(Query<K, T> query) throws GoraException { + return new RedisResult<>(this, query, runQuery(query)); + } + + @Override + public Query<K, T> newQuery() { + RedisQuery<K, T> query = new RedisQuery<>(this); + query.setFields(getFieldsToQuery(null)); + return query; + } + + @Override + public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws GoraException { + List<PartitionQuery<K, T>> partitions = new ArrayList<>(); + PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>( + query); + partitionQuery.setConf(getConf()); + partitions.add(partitionQuery); + return partitions; + } + + @Override + public void flush() throws GoraException { + } + + @Override + public void close() { + redisInstance.shutdown(); + } + + @Override + public boolean exists(K key) throws GoraException { + if (mode == StorageMode.SINGLEKEY) { + return redisInstance.getKeys().countExists(generateKeyHash(key)) != 0; + } else { + Iterator<String> respKeys = redisInstance.getKeys().getKeysByPattern(generateKeyStringBase(key) + WILDCARD, 1).iterator(); + return respKeys.hasNext(); + } + } + + private Collection<K> runQuery(Query<K, T> query) { + Collection<K> range; + if (isNumericKey()) { + RScoredSortedSet<Object> index = redisInstance.getScoredSortedSet(generateIndexKey()); + Collection<ScoredEntry<Object>> rangeResponse; + int limit = query.getLimit() > -1 ? (int) query.getLimit() : Integer.MAX_VALUE; + if (query.getStartKey() != null && query.getEndKey() != null) { + rangeResponse = index.entryRange(obtainDoubleValue(query.getStartKey()), true, obtainDoubleValue(query.getEndKey()), true, 0, limit); + } else if (query.getStartKey() != null && query.getEndKey() == null) { + rangeResponse = index.entryRange(obtainDoubleValue(query.getStartKey()), true, Double.MAX_VALUE, true, 0, limit); + } else if (query.getStartKey() == null && query.getEndKey() != null) { + rangeResponse = index.entryRange(Double.MIN_VALUE, true, obtainDoubleValue(query.getEndKey()), true, 0, limit); + } else { + rangeResponse = index.entryRange(Double.MIN_VALUE, true, Double.MAX_VALUE, true, 0, limit); + } + range = new ArrayList<>(); + for (ScoredEntry<Object> indexVal : rangeResponse) { + range.add((K) indexVal.getValue()); + } + } else { + RLexSortedSet index = redisInstance.getLexSortedSet(generateIndexKey()); + Collection<String> rangeResponse; + int limit = query.getLimit() > -1 ? (int) query.getLimit() : Integer.MAX_VALUE; + if (query.getStartKey() != null && query.getEndKey() != null) { + rangeResponse = index.range(query.getStartKey().toString(), true, query.getEndKey().toString(), true, 0, limit); + } else if (query.getStartKey() != null && query.getEndKey() == null) { + rangeResponse = index.rangeTail(query.getStartKey().toString(), true, 0, limit); + } else if (query.getStartKey() == null && query.getEndKey() != null) { + rangeResponse = index.rangeHead(query.getEndKey().toString(), true, 0, limit); + } else { + rangeResponse = index.stream().limit(limit).collect(Collectors.toList()); + } + range = new ArrayList<>(); + for (String indexVal : rangeResponse) { + range.add((K) indexVal); + } + } + return range; + } + + private boolean isNumericKey() { + return Number.class.isAssignableFrom(keyClass); + } + + private double obtainDoubleValue(K key) { + return Double.parseDouble(key.toString()); + } +} diff --git a/gora-redis/src/main/java/org/apache/gora/redis/store/RedisType.java b/gora-redis/src/main/java/org/apache/gora/redis/store/RedisType.java new file mode 100755 index 0000000..7e8f832 --- /dev/null +++ b/gora-redis/src/main/java/org/apache/gora/redis/store/RedisType.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.redis.store; + +/** + * Supported data types for Redis. + * + * Refer to: https://redis.io/topics/data-types + */ +public enum RedisType { + /** + * Strings are the most basic kind of Redis value. Redis Strings are binary + * safe, this means that a Redis string can contain any kind of data. + */ + STRING, + /** + * Redis Lists are simply lists of strings, sorted by insertion order. + */ + LIST, + /** + * Redis Hashes are maps between string fields and string values, so they are + * the perfect data type to represent objects. + */ + HASH +} diff --git a/gora-redis/src/main/java/org/apache/gora/redis/store/package-info.java b/gora-redis/src/main/java/org/apache/gora/redis/store/package-info.java new file mode 100755 index 0000000..e3cc092 --- /dev/null +++ b/gora-redis/src/main/java/org/apache/gora/redis/store/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains all the Redis store related classes. + */ +package org.apache.gora.redis.store; diff --git a/gora-redis/src/main/java/org/apache/gora/redis/util/DatumHandler.java b/gora-redis/src/main/java/org/apache/gora/redis/util/DatumHandler.java new file mode 100755 index 0000000..6a1cfa9 --- /dev/null +++ b/gora-redis/src/main/java/org/apache/gora/redis/util/DatumHandler.java @@ -0,0 +1,399 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.redis.util; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.util.Utf8; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.persistency.impl.DirtyListWrapper; +import org.apache.gora.persistency.impl.DirtyMapWrapper; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.util.AvroUtils; +import org.apache.gora.util.IOUtils; +import org.redisson.api.RList; +import org.redisson.api.RMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class for serialization and deserialization of values from redis. + */ +public class DatumHandler<T extends PersistentBase> { + + public static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final ConcurrentHashMap<Schema, SpecificDatumReader<?>> readerMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<Schema, SpecificDatumWriter<?>> writerMap = new ConcurrentHashMap<>(); + + public DatumHandler() { + } + + /** + * Serialize an object + * + * @param fieldSchema The avro schema to be used. + * @param fieldValue The object to be serialized. + * @return Serialized object. + */ + @SuppressWarnings("unchecked") + public Object serializeFieldValue(Schema fieldSchema, Object fieldValue) { + Object output = fieldValue; + switch (fieldSchema.getType()) { + case ARRAY: + case MAP: + case RECORD: + byte[] data = null; + try { + @SuppressWarnings("rawtypes") + SpecificDatumWriter writer = getDatumWriter(fieldSchema); + data = IOUtils.serialize(writer, fieldValue); + } catch (IOException e) { + LOG.error(e.getMessage(), e); + } + output = data; + break; + case UNION: + if (fieldSchema.getTypes().size() == 2 && isNullable(fieldSchema)) { + int schemaPos = getUnionSchema(fieldValue, fieldSchema); + Schema unionSchema = fieldSchema.getTypes().get(schemaPos); + output = serializeFieldValue(unionSchema, fieldValue); + } else { + data = null; + try { + @SuppressWarnings("rawtypes") + SpecificDatumWriter writer = getDatumWriter(fieldSchema); + data = IOUtils.serialize(writer, fieldValue); + } catch (IOException e) { + LOG.error(e.getMessage(), e); + } + output = data; + } + break; + case FIXED: + break; + case ENUM: + case STRING: + output = fieldValue.toString(); + break; + case BYTES: + output = ((ByteBuffer) fieldValue).array(); + break; + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case BOOLEAN: + output = fieldValue; + break; + case NULL: + break; + default: + throw new AssertionError(fieldSchema.getType().name()); + } + return output; + } + + /** + * Serialize an object as a Map + * + * @param fieldSchema The avro schema to be used. + * @param fieldValue The object to be serialized. + * @return Serialized object as a map. + */ + @SuppressWarnings("unchecked") + public Map<Object, Object> serializeFieldMap(Schema fieldSchema, Object fieldValue) { + Map<Object, Object> map = new HashMap(); + switch (fieldSchema.getType()) { + case UNION: + for (Schema sc : fieldSchema.getTypes()) { + if (sc.getType() == Schema.Type.MAP) { + map = serializeFieldMap(sc, fieldValue); + } + } + break; + case MAP: + Map<CharSequence, ?> mp = (Map<CharSequence, ?>) fieldValue; + for (Entry<CharSequence, ?> e : mp.entrySet()) { + String mapKey = e.getKey().toString(); + Object mapValue = e.getValue(); + mapValue = serializeFieldValue(fieldSchema.getValueType(), mapValue); + map.put(mapKey, mapValue); + } + break; + default: + throw new AssertionError(fieldSchema.getType().name()); + } + return map; + } + + /** + * Serialize an object as a List + * + * @param fieldSchema The avro schema to be used. + * @param fieldValue The object to be serialized. + * @return Serialized object as a List. + */ + @SuppressWarnings("unchecked") + public List<Object> serializeFieldList(Schema fieldSchema, Object fieldValue) { + List<Object> serializedList = new ArrayList(); + switch (fieldSchema.getType()) { + case ARRAY: + List<?> rawdataList = (List<?>) fieldValue; + rawdataList.stream().map((lsValue) -> serializeFieldValue(fieldSchema.getElementType(), lsValue)).forEachOrdered((lsValue_) -> { + serializedList.add(lsValue_); + }); + break; + default: + throw new AssertionError(fieldSchema.getType().name()); + } + return serializedList; + } + + /** + * Deserialize an object into a gora bean using avro + * + * @param field The field schema. + * @param fieldSchema The object schema. + * @param redisValue Object from redis. + * @param persistent Persistent object + * @return Deserialized object + * @throws java.io.IOException Deserialization exception + */ + @SuppressWarnings("unchecked") + public Object deserializeFieldValue(Schema.Field field, Schema fieldSchema, + Object redisValue, T persistent) throws IOException { + Object fieldValue = null; + switch (fieldSchema.getType()) { + case MAP: + case ARRAY: + case RECORD: + @SuppressWarnings("rawtypes") SpecificDatumReader reader = getDatumReader(fieldSchema); + fieldValue = IOUtils.deserialize((byte[]) redisValue, reader, + persistent.get(field.pos())); + break; + case ENUM: + fieldValue = AvroUtils.getEnumValue(fieldSchema, redisValue.toString()); + break; + case FIXED: + break; + case BYTES: + fieldValue = ByteBuffer.wrap((byte[]) redisValue); + break; + case STRING: + fieldValue = new Utf8(redisValue.toString()); + break; + case UNION: + if (fieldSchema.getTypes().size() == 2 && isNullable(fieldSchema)) { + int schemaPos = getUnionSchema(redisValue, fieldSchema); + Schema unionSchema = fieldSchema.getTypes().get(schemaPos); + fieldValue = deserializeFieldValue(field, unionSchema, redisValue, persistent); + } else { + reader = getDatumReader(fieldSchema); + fieldValue = IOUtils.deserialize((byte[]) redisValue, reader, + persistent.get(field.pos())); + } + break; + default: + fieldValue = redisValue; + } + return fieldValue; + } + + /** + * Deserialize an Map into a gora bean using avro + * + * @param field The field schema. + * @param fieldSchema The object schema. + * @param redisMap Map from redis. + * @param persistent Persistent object + * @return Deserialized object + * @throws java.io.IOException Deserialization exception + */ + @SuppressWarnings("unchecked") + public Object deserializeFieldMap(Schema.Field field, Schema fieldSchema, + RMap<Object, Object> redisMap, T persistent) throws IOException { + Map<Utf8, Object> fieldValue = new HashMap<>(); + switch (fieldSchema.getType()) { + case UNION: + for (Schema sc : fieldSchema.getTypes()) { + if (sc.getType() == Schema.Type.MAP) { + return deserializeFieldMap(field, sc, redisMap, persistent); + } + } + break; + case MAP: + for (Entry<Object, Object> aEntry : redisMap.entrySet()) { + String key = aEntry.getKey().toString(); + Object value = deserializeFieldValue(field, fieldSchema.getValueType(), aEntry.getValue(), persistent); + fieldValue.put(new Utf8(key), value); + } + break; + default: + throw new AssertionError(fieldSchema.getType().name()); + } + return new DirtyMapWrapper<>(fieldValue); + } + + /** + * Deserialize an List into a gora bean using avro + * + * @param field The field schema. + * @param fieldSchema The object schema. + * @param redisList List from redis. + * @param persistent Persistent object + * @return Deserialized object + * @throws java.io.IOException Deserialization exception + */ + @SuppressWarnings("unchecked") + public Object deserializeFieldList(Schema.Field field, Schema fieldSchema, + RList<Object> redisList, T persistent) throws IOException { + List<Object> fieldValue = new ArrayList<>(); + switch (fieldSchema.getType()) { + case ARRAY: + for (Object ob : redisList) { + Object value = deserializeFieldValue(field, fieldSchema.getElementType(), ob, persistent); + fieldValue.add(value); + } + break; + default: + throw new AssertionError(fieldSchema.getType().name()); + } + return new DirtyListWrapper<>(fieldValue); + } + + /** + * Gets the Datum reader for a Schema + * + * @param fieldSchema The avro schema to be used + * @return SpecificDatumReader for the schema + */ + @SuppressWarnings("rawtypes") + private SpecificDatumReader getDatumReader(Schema fieldSchema) { + SpecificDatumReader<?> reader = readerMap.get(fieldSchema); + if (reader == null) { + reader = new SpecificDatumReader(fieldSchema); + SpecificDatumReader localReader; + if ((localReader = readerMap.putIfAbsent(fieldSchema, reader)) != null) { + reader = localReader; + } + } + return reader; + } + + /** + * Gets the Datum writer for a Schema + * + * @param fieldSchema The avro schema to be used + * @return SpecificDatumWriter for the schema + */ + @SuppressWarnings("rawtypes") + private SpecificDatumWriter getDatumWriter(Schema fieldSchema) { + SpecificDatumWriter writer = writerMap.get(fieldSchema); + if (writer == null) { + writer = new SpecificDatumWriter(fieldSchema); + writerMap.put(fieldSchema, writer); + } + return writer; + } + + /** + * Verify if a schema is Nullable + * + * @param unionSchema The schema to be verified + * @return result + */ + private boolean isNullable(Schema unionSchema) { + if (unionSchema.getTypes().stream().anyMatch((innerSchema) -> (innerSchema.getType().equals(Schema.Type.NULL)))) { + return true; + } + return false; + } + + /** + * Method to retrieve the corresponding schema type index of a particular + * object having UNION schema. As UNION type can have one or more types and at + * a given instance, it holds an object of only one type of the defined types, + * this method is used to figure out the corresponding instance's schema type + * index. + * + * @param instanceValue value that the object holds + * @param unionSchema union schema containing all of the data types + * @return the unionSchemaPosition corresponding schema position + */ + private int getUnionSchema(Object instanceValue, Schema unionSchema) { + int unionSchemaPos = 0; + for (Schema currentSchema : unionSchema.getTypes()) { + Schema.Type schemaType = currentSchema.getType(); + if (instanceValue instanceof CharSequence && schemaType.equals(Schema.Type.STRING)) { + return unionSchemaPos; + } + if (instanceValue instanceof ByteBuffer && schemaType.equals(Schema.Type.BYTES)) { + return unionSchemaPos; + } + if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.BYTES)) { + return unionSchemaPos; + } + if (instanceValue instanceof Integer && schemaType.equals(Schema.Type.INT)) { + return unionSchemaPos; + } + if (instanceValue instanceof Long && schemaType.equals(Schema.Type.LONG)) { + return unionSchemaPos; + } + if (instanceValue instanceof Double && schemaType.equals(Schema.Type.DOUBLE)) { + return unionSchemaPos; + } + if (instanceValue instanceof Float && schemaType.equals(Schema.Type.FLOAT)) { + return unionSchemaPos; + } + if (instanceValue instanceof Boolean && schemaType.equals(Schema.Type.BOOLEAN)) { + return unionSchemaPos; + } + if (instanceValue instanceof Map && schemaType.equals(Schema.Type.MAP)) { + return unionSchemaPos; + } + if (instanceValue instanceof List && schemaType.equals(Schema.Type.ARRAY)) { + return unionSchemaPos; + } + if (instanceValue instanceof Persistent && schemaType.equals(Schema.Type.RECORD)) { + return unionSchemaPos; + } + if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.MAP)) { + return unionSchemaPos; + } + if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.RECORD)) { + return unionSchemaPos; + } + if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.ARRAY)) { + return unionSchemaPos; + } + unionSchemaPos++; + } + return 0; + } + +} diff --git a/gora-redis/src/main/java/org/apache/gora/redis/util/RedisStoreConstants.java b/gora-redis/src/main/java/org/apache/gora/redis/util/RedisStoreConstants.java new file mode 100755 index 0000000..793b0b5 --- /dev/null +++ b/gora-redis/src/main/java/org/apache/gora/redis/util/RedisStoreConstants.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.redis.util; + +/** + * Redis constants + */ +public class RedisStoreConstants { + + //Redis constants + public static final String FIELD_SEPARATOR = "."; + public static final String WILDCARD = "*"; + public static final String INDEX = "index"; + public static final String START_TAG = "{"; + public static final String END_TAG = "}"; + public static final String PREFIX = "redis://"; + + public static final String GORA_REDIS_ADDRESS = "gora.datastore.redis.address"; + public static final String GORA_REDIS_READMODE = "gora.datastore.redis.readMode"; + public static final String GORA_REDIS_MASTERNAME = "gora.datastore.redis.masterName"; + public static final String GORA_REDIS_MODE = "gora.datastore.redis.mode"; + public static final String GORA_REDIS_STORAGE = "gora.datastore.redis.storage"; + + private RedisStoreConstants() { + } + +} diff --git a/gora-redis/src/main/java/org/apache/gora/redis/util/ServerMode.java b/gora-redis/src/main/java/org/apache/gora/redis/util/ServerMode.java new file mode 100755 index 0000000..dfde4dc --- /dev/null +++ b/gora-redis/src/main/java/org/apache/gora/redis/util/ServerMode.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.redis.util; + +/** + * Connection mode to the Redis Server + * + * More details in : https://github.com/redisson/redisson/wiki/2.-Configuration + */ +public enum ServerMode { + /** + * Redis single server configuration. + */ + SINGLE, + /** + * Redis server cluster configuration. + */ + CLUSTER, + /** + * Redis replicated mode configuration. + */ + REPLICATED, + /** + * Redis server sentinel configuration. + */ + SENTINEL +} diff --git a/gora-redis/src/main/java/org/apache/gora/redis/util/StorageMode.java b/gora-redis/src/main/java/org/apache/gora/redis/util/StorageMode.java new file mode 100755 index 0000000..26408aa --- /dev/null +++ b/gora-redis/src/main/java/org/apache/gora/redis/util/StorageMode.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.redis.util; + +/** + * Storage mode of Objects for gora-redis + * + * More details in: + * https://cwiki.apache.org/confluence/display/GORA/Redis+backend+documentation + */ +public enum StorageMode { + /** + * The records are stored in a single hash in redis. + */ + SINGLEKEY, + /** + * The records are stored in multiple keys. + */ + MULTIKEY +} diff --git a/gora-redis/src/main/java/org/apache/gora/redis/util/package-info.java b/gora-redis/src/main/java/org/apache/gora/redis/util/package-info.java new file mode 100755 index 0000000..6afb613 --- /dev/null +++ b/gora-redis/src/main/java/org/apache/gora/redis/util/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains Redis store related util classes. + */ +package org.apache.gora.redis.util; diff --git a/gora-redis/src/test/java/org/apache/gora/redis/GoraRedisTestDriver.java b/gora-redis/src/test/java/org/apache/gora/redis/GoraRedisTestDriver.java new file mode 100755 index 0000000..9af838e --- /dev/null +++ b/gora-redis/src/test/java/org/apache/gora/redis/GoraRedisTestDriver.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.redis; + +import java.io.IOException; +import java.time.Duration; +import org.apache.gora.GoraTestDriver; +import org.apache.gora.redis.store.RedisStore; +import org.apache.gora.redis.util.RedisStartupLogWaitStrategy; +import org.apache.gora.redis.util.ServerMode; +import org.apache.gora.redis.util.StorageMode; +import org.testcontainers.containers.FixedHostPortGenericContainer; +import org.testcontainers.containers.GenericContainer; + +/** + * Helper class to execute tests in a embedded instance of Redis. + * + */ +public class GoraRedisTestDriver extends GoraTestDriver { + + private static final String DOCKER_IMAGE = "grokzen/redis-cluster:latest"; + private final FixedHostPortGenericContainer redisContainer; + + private final StorageMode storageMode; + private final ServerMode serverMode; + + public GoraRedisTestDriver(StorageMode storageMode, ServerMode serverMode) { + super(RedisStore.class); + this.storageMode = storageMode; + this.serverMode = serverMode; + GenericContainer container = new FixedHostPortGenericContainer(DOCKER_IMAGE) + .withFixedExposedPort(7000, 7000) + .withFixedExposedPort(7001, 7001) + .withFixedExposedPort(7002, 7002) + .withFixedExposedPort(7003, 7003) + .withFixedExposedPort(7004, 7004) + .withFixedExposedPort(7005, 7005) + .withFixedExposedPort(7006, 7006) + .withFixedExposedPort(7007, 7007) + .withFixedExposedPort(5000, 5000) + .withFixedExposedPort(5001, 5001) + .withFixedExposedPort(5002, 5002) + .waitingFor(new RedisStartupLogWaitStrategy()) + .withStartupTimeout(Duration.ofMinutes(3)) + .withEnv("STANDALONE", "true") + .withEnv("SENTINEL", "true"); + redisContainer = (FixedHostPortGenericContainer) container; + + } + + @Override + public void setUpClass() throws IOException { + redisContainer.start(); + log.info("Setting up Redis test driver"); + conf.set("gora.datastore.redis.storage", storageMode.name()); + conf.set("gora.datastore.redis.mode", serverMode.name()); + String bridgeIpAddress = redisContainer.getContainerInfo() + .getNetworkSettings() + .getNetworks() + .values() + .iterator() + .next() + .getIpAddress(); + switch (serverMode) { + case SINGLE: + conf.set("gora.datastore.redis.address", bridgeIpAddress + ":" + 7006); + break; + case CLUSTER: + conf.set("gora.datastore.redis.address", + bridgeIpAddress + ":" + 7000 + "," + + bridgeIpAddress + ":" + 7001 + "," + + bridgeIpAddress + ":" + 7002 + ); + break; + case REPLICATED: + conf.set("gora.datastore.redis.address", + bridgeIpAddress + ":" + 7000 + "," + + bridgeIpAddress + ":" + 7004 + ); + break; + case SENTINEL: + conf.set("gora.datastore.redis.masterName", "sentinel7000"); + conf.set("gora.datastore.redis.readMode", "MASTER"); + conf.set("gora.datastore.redis.address", + bridgeIpAddress + ":" + 5000 + "," + + bridgeIpAddress + ":" + 5001 + "," + + bridgeIpAddress + ":" + 5000 + ); + break; + default: + throw new AssertionError(serverMode.name()); + } + } + + @Override + public void tearDownClass() throws Exception { + redisContainer.stop(); + log.info("Tearing down Redis test driver"); + } +} diff --git a/gora-redis/src/test/java/org/apache/gora/redis/mapreduce/RedisStoreMapReduceTest.java b/gora-redis/src/test/java/org/apache/gora/redis/mapreduce/RedisStoreMapReduceTest.java new file mode 100755 index 0000000..711a3f1 --- /dev/null +++ b/gora-redis/src/test/java/org/apache/gora/redis/mapreduce/RedisStoreMapReduceTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.redis.mapreduce; + +import java.io.IOException; +import org.apache.gora.examples.generated.WebPage; +import org.apache.gora.mapreduce.DataStoreMapReduceTestBase; +import org.apache.gora.redis.GoraRedisTestDriver; +import org.apache.gora.redis.util.ServerMode; +import org.apache.gora.redis.util.StorageMode; +import org.apache.gora.store.DataStore; +import org.apache.gora.store.DataStoreFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; + +/** + * Executes tests for MR jobs over Redis dataStore. + * + * Mapreduce tests are disable due to failure which only occur in Maven environment. + * Test passes in local IDE environment. + */ +@Ignore +public class RedisStoreMapReduceTest extends DataStoreMapReduceTestBase { + + private final GoraRedisTestDriver driver; + + public RedisStoreMapReduceTest() throws IOException { + super(); + driver = new GoraRedisTestDriver(StorageMode.SINGLEKEY, ServerMode.SINGLE); + } + + @Override + @Before + public void setUp() throws Exception { + driver.setUpClass(); + super.setUp(); + } + + @Override + @After + public void tearDown() throws Exception { + super.tearDown(); + driver.tearDownClass(); + } + + @Override + protected DataStore<String, WebPage> createWebPageDataStore() throws IOException { + try { + return DataStoreFactory.getDataStore(String.class, WebPage.class, driver.getConfiguration()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/gora-redis/src/test/java/org/apache/gora/redis/mapreduce/package-info.java b/gora-redis/src/test/java/org/apache/gora/redis/mapreduce/package-info.java new file mode 100755 index 0000000..f7c087d --- /dev/null +++ b/gora-redis/src/test/java/org/apache/gora/redis/mapreduce/package-info.java @@ -0,0 +1,17 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.redis.mapreduce; diff --git a/gora-redis/src/test/java/org/apache/gora/redis/package-info.java b/gora-redis/src/test/java/org/apache/gora/redis/package-info.java new file mode 100755 index 0000000..4363d31 --- /dev/null +++ b/gora-redis/src/test/java/org/apache/gora/redis/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Tests for <code>gora-redis</code> including the test driver for + * {@link org.apache.gora.redis.store.RedisStoreTest} + */ +package org.apache.gora.redis; diff --git a/gora-redis/src/test/java/org/apache/gora/redis/store/RedisStoreClusterTest.java b/gora-redis/src/test/java/org/apache/gora/redis/store/RedisStoreClusterTest.java new file mode 100755 index 0000000..ae4c8fe --- /dev/null +++ b/gora-redis/src/test/java/org/apache/gora/redis/store/RedisStoreClusterTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.redis.store; + +import org.apache.gora.redis.GoraRedisTestDriver; +import org.apache.gora.redis.util.ServerMode; +import org.apache.gora.redis.util.StorageMode; +import org.apache.gora.store.DataStoreTestBase; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Tests extending {@link org.apache.gora.store.DataStoreTestBase} which run the + * base JUnit test suite for Gora. + */ +public class RedisStoreClusterTest extends DataStoreTestBase { + + static { + setTestDriver(new GoraRedisTestDriver(StorageMode.SINGLEKEY, ServerMode.CLUSTER)); + } + + // Unsupported functionality due to the limitations in Redis + @Test + @Ignore("Explicit schema creation related functionality is not supported in Redis") + @Override + public void testTruncateSchema() throws Exception { + super.testTruncateSchema(); + } + + @Test + @Ignore("Explicit schema creation related functionality is not supported in Redis") + @Override + public void testDeleteSchema() throws Exception { + super.testDeleteSchema(); + } + + @Test + @Ignore("Explicit schema creation related functionality is not supported in Redis") + @Override + public void testSchemaExists() throws Exception { + super.testSchemaExists(); + } +} diff --git a/gora-redis/src/test/java/org/apache/gora/redis/store/RedisStoreHashTest.java b/gora-redis/src/test/java/org/apache/gora/redis/store/RedisStoreHashTest.java new file mode 100755 index 0000000..9ac9935 --- /dev/null +++ b/gora-redis/src/test/java/org/apache/gora/redis/store/RedisStoreHashTest.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.redis.store; + +import org.apache.gora.redis.GoraRedisTestDriver; +import org.apache.gora.redis.util.ServerMode; +import org.apache.gora.redis.util.StorageMode; +import org.apache.gora.store.DataStoreTestBase; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Tests extending {@link org.apache.gora.store.DataStoreTestBase} which run the + * base JUnit test suite for Gora. + */ +public class RedisStoreHashTest extends DataStoreTestBase { + + static { + setTestDriver(new GoraRedisTestDriver(StorageMode.SINGLEKEY, ServerMode.SINGLE)); + } + + // Unsupported functionality due to the limitations in Redis + @Test + @Ignore("Explicit schema creation related functionality is not supported in Redis") + @Override + public void testTruncateSchema() throws Exception { + super.testTruncateSchema(); + } + + @Test + @Ignore("Explicit schema creation related functionality is not supported in Redis") + @Override + public void testDeleteSchema() throws Exception { + super.testDeleteSchema(); + } + + @Test + @Ignore("Explicit schema creation related functionality is not supported in Redis") + @Override + public void testSchemaExists() throws Exception { + super.testSchemaExists(); + } + +} diff --git a/gora-redis/src/test/java/org/apache/gora/redis/store/RedisStoreStringTest.java b/gora-redis/src/test/java/org/apache/gora/redis/store/RedisStoreStringTest.java new file mode 100755 index 0000000..347b800 --- /dev/null +++ b/gora-redis/src/test/java/org/apache/gora/redis/store/RedisStoreStringTest.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.redis.store; + +import org.apache.gora.redis.GoraRedisTestDriver; +import org.apache.gora.redis.util.ServerMode; +import org.apache.gora.redis.util.StorageMode; +import org.apache.gora.store.DataStoreTestBase; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Tests extending {@link org.apache.gora.store.DataStoreTestBase} which run the + * base JUnit test suite for Gora. + */ +public class RedisStoreStringTest extends DataStoreTestBase { + + static { + setTestDriver(new GoraRedisTestDriver(StorageMode.MULTIKEY, ServerMode.SINGLE)); + } + + // Unsupported functionality due to the limitations in Redis + @Test + @Ignore("Explicit schema creation related functionality is not supported in Redis") + @Override + public void testTruncateSchema() throws Exception { + super.testTruncateSchema(); + } + + @Test + @Ignore("Explicit schema creation related functionality is not supported in Redis") + @Override + public void testDeleteSchema() throws Exception { + super.testDeleteSchema(); + } + + @Test + @Ignore("Explicit schema creation related functionality is not supported in Redis") + @Override + public void testSchemaExists() throws Exception { + super.testSchemaExists(); + } + +} diff --git a/gora-redis/src/test/java/org/apache/gora/redis/store/package-info.java b/gora-redis/src/test/java/org/apache/gora/redis/store/package-info.java new file mode 100755 index 0000000..30d0832 --- /dev/null +++ b/gora-redis/src/test/java/org/apache/gora/redis/store/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains all the unit tests for basic CRUD operations + * functionality of the Redis dataStore. + */ +package org.apache.gora.redis.store; diff --git a/gora-redis/src/test/java/org/apache/gora/redis/util/RedisStartupLogWaitStrategy.java b/gora-redis/src/test/java/org/apache/gora/redis/util/RedisStartupLogWaitStrategy.java new file mode 100755 index 0000000..b9857b1 --- /dev/null +++ b/gora-redis/src/test/java/org/apache/gora/redis/util/RedisStartupLogWaitStrategy.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.redis.util; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; +import org.testcontainers.containers.ContainerLaunchException; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.OutputFrame; +import org.testcontainers.containers.output.WaitingConsumer; + +/** + * Utility class for detecting when the docker container is ready. + */ +public class RedisStartupLogWaitStrategy extends GenericContainer.AbstractWaitStrategy { + + private static final String REGEX = ".*Background AOF rewrite finished successfully.*"; + private final int times = 3; + + @Override + protected void waitUntilReady() { + WaitingConsumer waitingConsumer = new WaitingConsumer(); + this.container.followOutput(waitingConsumer); + Predicate waitPredicate = (outputFrame) -> { + String trimmedFrameText = ((OutputFrame) outputFrame).getUtf8String().replaceFirst("\n$", ""); + return trimmedFrameText.matches(REGEX); + }; + + try { + waitingConsumer.waitUntil(waitPredicate, this.startupTimeout.getSeconds(), TimeUnit.SECONDS, + this.times); + } catch (TimeoutException var4) { + throw new ContainerLaunchException( + "Timed out waiting for log output matching Redis server startup Log \'" + REGEX + + "\'"); + } + } +} diff --git a/gora-redis/src/test/java/org/apache/gora/redis/util/package-info.java b/gora-redis/src/test/java/org/apache/gora/redis/util/package-info.java new file mode 100755 index 0000000..a771d86 --- /dev/null +++ b/gora-redis/src/test/java/org/apache/gora/redis/util/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains all the unit tests for utils of the Redis dataStore. + */ +package org.apache.gora.redis.util; diff --git a/gora-redis/src/test/resources/gora-redis-mapping.xml b/gora-redis/src/test/resources/gora-redis-mapping.xml new file mode 100755 index 0000000..87f9450 --- /dev/null +++ b/gora-redis/src/test/resources/gora-redis-mapping.xml @@ -0,0 +1,40 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<gora-otd> + <class name="org.apache.gora.examples.generated.Employee" keyClass="java.lang.String" prefix="employee" database="0" > + <field name="ssn" column="ssn" type="STRING"/> + <field name="name" column="name" type="STRING"/> + <field name="dateOfBirth" column="dateOfBirth" type="STRING"/> + <field name="salary" column="salary" type="STRING"/> + <field name="boss" column="boss" type="STRING"/> + <field name="webpage" column="webpage" type="STRING"/> + <field name="value" column="value" type="STRING"/> + </class> + <class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" prefix="webpage" database="1" > + <field name="url" column="url" type="STRING"/> + <field name="content" column="content" type="STRING"/> + <field name="parsedContent" column="parsedContent" type="LIST"/> + <field name="outlinks" column="outlinks" type="HASH"/> + <field name="headers" column="headers" type="HASH"/> + <field name="metadata" column="metadata" type="STRING"/> + <field name="byteData" column="byteData" type="HASH"/> + <field name="stringData" column="stringData" type="HASH"/> + </class> +</gora-otd> diff --git a/gora-redis/src/test/resources/gora.properties b/gora-redis/src/test/resources/gora.properties new file mode 100755 index 0000000..81e8151 --- /dev/null +++ b/gora-redis/src/test/resources/gora.properties @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +gora.datastore.default=org.apache.gora.redis.store.RedisStore +gora.datastore.redis.storage=SINGLEKEY +gora.datastore.redis.mode=SINGLE +gora.datastore.redis.address=localhost:7006 + +#Minimal cluster configuration requires to contain at least three master nodes +#gora.datastore.redis.mode=CLUSTER +#gora.datastore.redis.address=localhost:6387,localhost:6388,localhost:6389 + +#Multiple nodes at once could be added. All nodes (master and slaves) should be provided. +#gora.datastore.redis.mode=REPLICATED +#gora.datastore.redis.address=localhost:6387,localhost:6388,localhost:6389 + +#Sentinel mode +#gora.datastore.redis.mode=SENTINEL +#gora.datastore.redis.masterName=mock +#gora.datastore.redis.readMode=MASTER +#gora.datastore.redis.address=localhost:6387,localhost:6388,localhost:6389 + diff --git a/pom.xml b/pom.xml old mode 100644 new mode 100755 index ad71914..0ad5bb7 --- a/pom.xml +++ b/pom.xml @@ -443,7 +443,7 @@ </dependency> </dependencies> </plugin> - <!--This plugin's configuration is used to store Eclipse m2e settings + <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself. --> <plugin> <groupId>org.eclipse.m2e</groupId> @@ -657,7 +657,7 @@ <id>release</id> <build> <plugins> - <!-- <plugin> <groupId>org.apache.rat</groupId> <artifactId>apache-rat-plugin</artifactId> + <!-- <plugin> <groupId>org.apache.rat</groupId> <artifactId>apache-rat-plugin</artifactId> <version>${apache-rat-plugin.version}</version> <executions> <execution> <id>rat-verify</id> <phase>test</phase> <goals> <goal>check</goal> </goals> </execution> </executions> <configuration> <licenses> <license implementation="org.apache.rat.analysis.license.SimplePatternBasedLicense"> @@ -782,6 +782,7 @@ <module>gora-ignite</module> <module>gora-kudu</module> <module>gora-hive</module> + <module>gora-redis</module> <module>gora-tutorial</module> <module>sources-dist</module> <!-- <module>gora-benchmark</module> --> @@ -807,6 +808,8 @@ <!-- Ignite Dependencies --> <ignite.version>2.6.0</ignite.version> <sqlbuilder.version>2.1.7</sqlbuilder.version> + <!-- Redis Dependencies --> + <redisson.version>3.11.0</redisson.version> <!-- Kudu Dependencies --> <kudu.version>1.9.0</kudu.version> <!-- Solr Dependencies --> @@ -841,10 +844,10 @@ <orientdb.version>2.2.22</orientdb.version> <orientqb.version>0.2.0</orientqb.version> - + <!-- CouchDB Dependencies --> <couchdb.version>1.4.2</couchdb.version> - + <!-- MongoDB Dependencies --> <mongo.embed.version>2.0.0</mongo.embed.version> @@ -857,11 +860,11 @@ <!-- Testing Dependencies --> <junit.version>4.10</junit.version> <test.container.version>1.4.2</test.container.version> - + <!-- gora-benchmark and version dependencies --> <ycsb.version>0.17.0-SNAPSHOT</ycsb.version> <datafactory.version>0.8</datafactory.version> - + <!-- Maven Plugin Dependencies --> <maven-compiler-plugin.version>3.1</maven-compiler-plugin.version> @@ -1039,7 +1042,7 @@ <artifactId>gora-mongodb</artifactId> <version>${project.version}</version> </dependency> - + <!--Kudu DataStore dependencies --> <dependency> <groupId>org.apache.kudu</groupId> @@ -1131,6 +1134,24 @@ <scope>compile</scope> </dependency> + <!-- Redis DataStore dependencies --> + <dependency> + <groupId>org.apache.gora</groupId> + <artifactId>gora-redis</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.gora</groupId> + <artifactId>gora-redis</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.redisson</groupId> + <artifactId>redisson-all</artifactId> + <version>${redisson.version}</version> + </dependency> + <!--Hadoop dependencies --> <dependency> <groupId>org.apache.hadoop</groupId> @@ -1729,7 +1750,7 @@ <artifactId>aerospike-client</artifactId> <version>${aerospike.version}</version> </dependency> - + <!-- CouchDB Dependency --> <dependency> <groupId>org.ektorp</groupId> @@ -1761,21 +1782,21 @@ <artifactId>testcontainers</artifactId> <version>${test.container.version}</version> </dependency> - + <!-- Gora Benchmark Dependencies --> <dependency> <groupId>com.yahoo.ycsb</groupId> <artifactId>core</artifactId> <version>${ycsb.version}</version> </dependency> - + <dependency> <groupId>de.flapdoodle.embed</groupId> <artifactId>de.flapdoodle.embed.mongo</artifactId> <version>${mongo.embed.version}</version> <scope>test</scope> </dependency> - + <dependency> <groupId>org.fluttercode.datafactory</groupId> <artifactId>datafactory</artifactId>