This is an automated email from the ASF dual-hosted git repository. jgresock pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 551625f7bf NIFI-12149 Create nifi-redis-utils and minor improvements to util methods 551625f7bf is described below commit 551625f7bf4662cac9ba6fb5d5f90355ee3bd705 Author: Bryan Bende <bbe...@apache.org> AuthorDate: Fri Sep 29 10:15:22 2023 -0400 NIFI-12149 Create nifi-redis-utils and minor improvements to util methods Signed-off-by: Joe Gresock <jgres...@gmail.com> This closes #7812. --- .../nifi-redis-extensions/pom.xml | 5 + .../redis/service/RedisConnectionPoolService.java | 2 +- .../nifi/redis/state/RedisStateProvider.java | 2 +- .../service/TestRedisConnectionPoolService.java | 2 +- .../pom.xml | 62 +------ .../org/apache/nifi/redis/util/RedisAction.java | 0 .../org/apache/nifi/redis/util/RedisConfig.java | 206 +++++++++++++++++++++ .../org/apache/nifi/redis/util/RedisUtils.java | 152 +++++++++++---- nifi-nar-bundles/nifi-redis-bundle/pom.xml | 1 + 9 files changed, 335 insertions(+), 97 deletions(-) diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml index 82a3b96607..bf1a5f2793 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml @@ -38,6 +38,11 @@ <version>2.0.0-SNAPSHOT</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-redis-utils</artifactId> + <version>2.0.0-SNAPSHOT</version> + </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java index 70e5ca7f5f..abaa47c19f 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java @@ -90,7 +90,7 @@ RedisConnectionPoolService extends AbstractControllerService implements RedisCon if (connectionFactory == null) { synchronized (this) { if (connectionFactory == null) { - connectionFactory = RedisUtils.createConnectionFactory(context, getLogger(), sslContext); + connectionFactory = RedisUtils.createConnectionFactory(context, sslContext); } } } diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java index b684a77d5d..4792523eef 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java @@ -344,7 +344,7 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements // visible for testing synchronized RedisConnection getRedis() { if (connectionFactory == null) { - connectionFactory = RedisUtils.createConnectionFactory(context, logger, sslContext); + connectionFactory = RedisUtils.createConnectionFactory(context, sslContext); } return connectionFactory.getConnection(); diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java index 5ec27d1996..c77e12e4fa 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java @@ -126,7 +126,7 @@ public class TestRedisConnectionPoolService { final SSLContextService sslContextService = configContext.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); providedSslContext = sslContextService.createContext(); } - JedisConnectionFactory connectionFactory = RedisUtils.createConnectionFactory(configContext, testRunner.getLogger(), providedSslContext); + JedisConnectionFactory connectionFactory = RedisUtils.createConnectionFactory(configContext, providedSslContext); return connectionFactory; } diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/pom.xml similarity index 58% copy from nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml copy to nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/pom.xml index 82a3b96607..cd3e1ee2f8 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/pom.xml @@ -22,15 +22,13 @@ <version>2.0.0-SNAPSHOT</version> </parent> - <artifactId>nifi-redis-extensions</artifactId> + <artifactId>nifi-redis-utils</artifactId> <packaging>jar</packaging> <dependencies> - <!-- Provided deps from nifi-redis-service-api --> <dependency> <groupId>org.apache.nifi</groupId> - <artifactId>nifi-distributed-cache-client-service-api</artifactId> - <scope>provided</scope> + <artifactId>nifi-api</artifactId> </dependency> <dependency> <groupId>org.apache.nifi</groupId> @@ -38,74 +36,28 @@ <version>2.0.0-SNAPSHOT</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-ssl-context-service-api</artifactId> + <scope>provided</scope> + </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> <version>${spring.data.redis.version}</version> <scope>provided</scope> </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-beans</artifactId> - </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>${jedis.version}</version> <scope>provided</scope> </dependency> - <!-- End Provided deps from nifi-redis-service-api --> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-api</artifactId> - </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-utils</artifactId> <version>2.0.0-SNAPSHOT</version> </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-record-serialization-service-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-record</artifactId> - <version>2.0.0-SNAPSHOT</version> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-record-path</artifactId> - <version>2.0.0-SNAPSHOT</version> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-ssl-context-service-api</artifactId> - <scope>provided</scope> - </dependency> - <!-- Test dependencies --> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-mock</artifactId> - <version>2.0.0-SNAPSHOT</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-mock-record-utils</artifactId> - <version>2.0.0-SNAPSHOT</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.testcontainers</groupId> - <artifactId>testcontainers</artifactId> - <version>${testcontainers.version}</version> - <scope>test</scope> - </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>junit-jupiter</artifactId> diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisAction.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisAction.java similarity index 100% rename from nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisAction.java rename to nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisAction.java diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisConfig.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisConfig.java new file mode 100644 index 0000000000..c5da310093 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisConfig.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.redis.util; + +import org.apache.nifi.redis.RedisType; + +import java.time.Duration; +import java.util.Objects; + +public class RedisConfig { + + private final RedisType redisMode; + private final String connectionString; + + private String sentinelMaster; + private String sentinelPassword; + private String password; + + private int dbIndex = 0; + private int timeout = 10000; + private int clusterMaxRedirects = 5; + + private int poolMaxTotal = 8; + private int poolMaxIdle = 8; + private int poolMinIdle = 0; + private boolean blockWhenExhausted = true; + private Duration maxWaitTime = Duration.ofSeconds(10); + private Duration minEvictableIdleTime = Duration.ofSeconds(60); + private Duration timeBetweenEvictionRuns = Duration.ofSeconds(30); + private int numTestsPerEvictionRun = -1; + private boolean testOnCreate = true; + private boolean testOnBorrow = true; + private boolean testOnReturn = false; + private boolean testWhenIdle = true; + + public RedisConfig(final RedisType redisMode, final String connectionString) { + this.redisMode = Objects.requireNonNull(redisMode); + this.connectionString = Objects.requireNonNull(connectionString); + } + + public RedisType getRedisMode() { + return redisMode; + } + + public String getConnectionString() { + return connectionString; + } + + public String getSentinelMaster() { + return sentinelMaster; + } + + public void setSentinelMaster(String sentinelMaster) { + this.sentinelMaster = sentinelMaster; + } + + public String getSentinelPassword() { + return sentinelPassword; + } + + public void setSentinelPassword(String sentinelPassword) { + this.sentinelPassword = sentinelPassword; + } + + public int getDbIndex() { + return dbIndex; + } + + public void setDbIndex(int dbIndex) { + this.dbIndex = dbIndex; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public int getTimeout() { + return timeout; + } + + public void setTimeout(int timeout) { + this.timeout = timeout; + } + + public int getClusterMaxRedirects() { + return clusterMaxRedirects; + } + + public void setClusterMaxRedirects(int clusterMaxRedirects) { + this.clusterMaxRedirects = clusterMaxRedirects; + } + + public int getPoolMaxTotal() { + return poolMaxTotal; + } + + public void setPoolMaxTotal(int poolMaxTotal) { + this.poolMaxTotal = poolMaxTotal; + } + + public int getPoolMaxIdle() { + return poolMaxIdle; + } + + public void setPoolMaxIdle(int poolMaxIdle) { + this.poolMaxIdle = poolMaxIdle; + } + + public int getPoolMinIdle() { + return poolMinIdle; + } + + public void setPoolMinIdle(int poolMinIdle) { + this.poolMinIdle = poolMinIdle; + } + + public boolean getBlockWhenExhausted() { + return blockWhenExhausted; + } + + public void setBlockWhenExhausted(boolean blockWhenExhausted) { + this.blockWhenExhausted = blockWhenExhausted; + } + + public Duration getMaxWaitTime() { + return maxWaitTime; + } + + public void setMaxWaitTime(Duration maxWaitTime) { + this.maxWaitTime = maxWaitTime; + } + + public Duration getMinEvictableIdleTime() { + return minEvictableIdleTime; + } + + public void setMinEvictableIdleTime(Duration minEvictableIdleTime) { + this.minEvictableIdleTime = minEvictableIdleTime; + } + + public Duration getTimeBetweenEvictionRuns() { + return timeBetweenEvictionRuns; + } + + public void setTimeBetweenEvictionRuns(Duration timeBetweenEvictionRuns) { + this.timeBetweenEvictionRuns = timeBetweenEvictionRuns; + } + + public int getNumTestsPerEvictionRun() { + return numTestsPerEvictionRun; + } + + public void setNumTestsPerEvictionRun(int numTestsPerEvictionRun) { + this.numTestsPerEvictionRun = numTestsPerEvictionRun; + } + + public boolean getTestOnCreate() { + return testOnCreate; + } + + public void setTestOnCreate(boolean testOnCreate) { + this.testOnCreate = testOnCreate; + } + + public boolean getTestOnBorrow() { + return testOnBorrow; + } + + public void setTestOnBorrow(boolean testOnBorrow) { + this.testOnBorrow = testOnBorrow; + } + + public boolean getTestOnReturn() { + return testOnReturn; + } + + public void setTestOnReturn(boolean testOnReturn) { + this.testOnReturn = testOnReturn; + } + + public boolean getTestWhenIdle() { + return testWhenIdle; + } + + public void setTestWhenIdle(boolean testWhenIdle) { + this.testWhenIdle = testWhenIdle; + } +} diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisUtils.java similarity index 76% rename from nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java rename to nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisUtils.java index f72ba136f5..51b17e3cd0 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisUtils.java @@ -16,37 +16,47 @@ */ package org.apache.nifi.redis.util; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.concurrent.TimeUnit; -import javax.net.ssl.SSLContext; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.context.PropertyContext; import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.redis.RedisConnectionPool; import org.apache.nifi.redis.RedisType; import org.apache.nifi.ssl.RestrictedSSLContextService; import org.apache.nifi.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.data.redis.connection.RedisClusterConfiguration; import org.springframework.data.redis.connection.RedisConfiguration; +import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisPassword; import org.springframework.data.redis.connection.RedisSentinelConfiguration; import org.springframework.data.redis.connection.RedisStandaloneConfiguration; import org.springframework.data.redis.connection.jedis.JedisClientConfiguration; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; +import org.springframework.data.redis.connection.stream.MapRecord; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.lang.Nullable; import redis.clients.jedis.JedisPoolConfig; +import javax.net.ssl.SSLContext; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.TimeUnit; + public class RedisUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(RedisUtils.class); + // These properties are shared among the controller service(s) and processor(s) that use a RedisConnectionPool public static final PropertyDescriptor REDIS_CONNECTION_POOL = new PropertyDescriptor.Builder() @@ -301,15 +311,44 @@ public class RedisUtils { REDIS_CONNECTION_PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props); } + public static RedisConfig createRedisConfig(final PropertyContext context) { + final RedisType redisType = RedisType.fromDisplayName(context.getProperty(RedisUtils.REDIS_MODE).getValue()); + final String connectString = context.getProperty(RedisUtils.CONNECTION_STRING).evaluateAttributeExpressions().getValue(); + + final RedisConfig redisConfig = new RedisConfig(redisType, connectString); + redisConfig.setSentinelMaster(context.getProperty(RedisUtils.SENTINEL_MASTER).evaluateAttributeExpressions().getValue()); + redisConfig.setDbIndex(context.getProperty(RedisUtils.DATABASE).evaluateAttributeExpressions().asInteger()); + redisConfig.setPassword(context.getProperty(RedisUtils.PASSWORD).evaluateAttributeExpressions().getValue()); + redisConfig.setSentinelPassword(context.getProperty(RedisUtils.SENTINEL_PASSWORD).evaluateAttributeExpressions().getValue()); + redisConfig.setTimeout(context.getProperty(RedisUtils.COMMUNICATION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + redisConfig.setClusterMaxRedirects(context.getProperty(RedisUtils.CLUSTER_MAX_REDIRECTS).asInteger()); + redisConfig.setPoolMaxTotal(context.getProperty(RedisUtils.POOL_MAX_TOTAL).asInteger()); + redisConfig.setPoolMaxIdle(context.getProperty(RedisUtils.POOL_MAX_IDLE).asInteger()); + redisConfig.setPoolMinIdle(context.getProperty(RedisUtils.POOL_MIN_IDLE).asInteger()); + redisConfig.setBlockWhenExhausted(context.getProperty(RedisUtils.POOL_BLOCK_WHEN_EXHAUSTED).asBoolean()); + redisConfig.setMaxWaitTime(Duration.ofMillis(context.getProperty(RedisUtils.POOL_MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS))); + redisConfig.setMinEvictableIdleTime(Duration.ofMillis(context.getProperty(RedisUtils.POOL_MIN_EVICTABLE_IDLE_TIME).asTimePeriod(TimeUnit.MILLISECONDS))); + redisConfig.setTimeBetweenEvictionRuns(Duration.ofMillis(context.getProperty(RedisUtils.POOL_TIME_BETWEEN_EVICTION_RUNS).asTimePeriod(TimeUnit.MILLISECONDS))); + redisConfig.setNumTestsPerEvictionRun(context.getProperty(RedisUtils.POOL_NUM_TESTS_PER_EVICTION_RUN).asInteger()); + redisConfig.setTestOnCreate(context.getProperty(RedisUtils.POOL_TEST_ON_CREATE).asBoolean()); + redisConfig.setTestOnBorrow(context.getProperty(RedisUtils.POOL_TEST_ON_BORROW).asBoolean()); + redisConfig.setTestOnReturn(context.getProperty(RedisUtils.POOL_TEST_ON_RETURN).asBoolean()); + redisConfig.setTestWhenIdle(context.getProperty(RedisUtils.POOL_TEST_WHILE_IDLE).asBoolean()); + return redisConfig; + } - public static JedisConnectionFactory createConnectionFactory(final PropertyContext context, final ComponentLog logger, final SSLContext sslContext) { - final String redisMode = context.getProperty(RedisUtils.REDIS_MODE).getValue(); - final String connectionString = context.getProperty(RedisUtils.CONNECTION_STRING).evaluateAttributeExpressions().getValue(); - final Integer dbIndex = context.getProperty(RedisUtils.DATABASE).evaluateAttributeExpressions().asInteger(); - final String password = context.getProperty(RedisUtils.PASSWORD).evaluateAttributeExpressions().getValue(); - final String sentinelPassword = context.getProperty(RedisUtils.SENTINEL_PASSWORD).evaluateAttributeExpressions().getValue(); - final Integer timeout = context.getProperty(RedisUtils.COMMUNICATION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); - final JedisPoolConfig poolConfig = createJedisPoolConfig(context); + public static JedisConnectionFactory createConnectionFactory(final PropertyContext context, final SSLContext sslContext) { + return createConnectionFactory(createRedisConfig(context), sslContext); + } + + public static JedisConnectionFactory createConnectionFactory(final RedisConfig redisConfig, final SSLContext sslContext) { + final RedisType redisMode = redisConfig.getRedisMode(); + final String connectionString = redisConfig.getConnectionString(); + final Integer dbIndex = redisConfig.getDbIndex(); + final String password = redisConfig.getPassword(); + final String sentinelPassword = redisConfig.getSentinelPassword(); + final Integer timeout = redisConfig.getTimeout(); + final JedisPoolConfig poolConfig = createJedisPoolConfig(redisConfig); JedisClientConfiguration.JedisClientConfigurationBuilder builder = JedisClientConfiguration.builder() .connectTimeout(Duration.ofMillis(timeout)) @@ -328,8 +367,8 @@ public class RedisUtils { final JedisClientConfiguration jedisClientConfiguration = builder.build(); JedisConnectionFactory connectionFactory; - if (RedisUtils.REDIS_MODE_STANDALONE.getValue().equals(redisMode)) { - logger.info("Connecting to Redis in standalone mode at " + connectionString); + if (RedisType.STANDALONE == redisMode) { + LOGGER.info("Connecting to Redis in standalone mode at " + connectionString); final String[] hostAndPortSplit = connectionString.split("[:]"); final String host = hostAndPortSplit[0].trim(); final Integer port = Integer.parseInt(hostAndPortSplit[1].trim()); @@ -338,32 +377,32 @@ public class RedisUtils { connectionFactory = new JedisConnectionFactory(redisStandaloneConfiguration, jedisClientConfiguration); - } else if (RedisUtils.REDIS_MODE_SENTINEL.getValue().equals(redisMode)) { + } else if (RedisType.SENTINEL == redisMode) { final String[] sentinels = connectionString.split("[,]"); - final String sentinelMaster = context.getProperty(RedisUtils.SENTINEL_MASTER).evaluateAttributeExpressions().getValue(); + final String sentinelMaster = redisConfig.getSentinelMaster(); final RedisSentinelConfiguration sentinelConfiguration = new RedisSentinelConfiguration(sentinelMaster, new HashSet<>(getTrimmedValues(sentinels))); enrichRedisConfiguration(sentinelConfiguration, dbIndex, password, sentinelPassword); - logger.info("Connecting to Redis in sentinel mode..."); - logger.info("Redis master = " + sentinelMaster); + LOGGER.info("Connecting to Redis in sentinel mode..."); + LOGGER.info("Redis master = " + sentinelMaster); for (final String sentinel : sentinels) { - logger.info("Redis sentinel at " + sentinel); + LOGGER.info("Redis sentinel at " + sentinel); } connectionFactory = new JedisConnectionFactory(sentinelConfiguration, jedisClientConfiguration); } else { final String[] clusterNodes = connectionString.split("[,]"); - final Integer maxRedirects = context.getProperty(RedisUtils.CLUSTER_MAX_REDIRECTS).asInteger(); + final Integer maxRedirects = redisConfig.getClusterMaxRedirects(); final RedisClusterConfiguration clusterConfiguration = new RedisClusterConfiguration(getTrimmedValues(clusterNodes)); enrichRedisConfiguration(clusterConfiguration, dbIndex, password, sentinelPassword); clusterConfiguration.setMaxRedirects(maxRedirects); - logger.info("Connecting to Redis in clustered mode..."); + LOGGER.info("Connecting to Redis in clustered mode..."); for (final String clusterNode : clusterNodes) { - logger.info("Redis cluster node at " + clusterNode); + LOGGER.info("Redis cluster node at " + clusterNode); } connectionFactory = new JedisConnectionFactory(clusterConfiguration, jedisClientConfiguration); @@ -397,20 +436,20 @@ public class RedisUtils { } } - private static JedisPoolConfig createJedisPoolConfig(final PropertyContext context) { + private static JedisPoolConfig createJedisPoolConfig(final RedisConfig redisConfig) { final JedisPoolConfig poolConfig = new JedisPoolConfig(); - poolConfig.setMaxTotal(context.getProperty(RedisUtils.POOL_MAX_TOTAL).asInteger()); - poolConfig.setMaxIdle(context.getProperty(RedisUtils.POOL_MAX_IDLE).asInteger()); - poolConfig.setMinIdle(context.getProperty(RedisUtils.POOL_MIN_IDLE).asInteger()); - poolConfig.setBlockWhenExhausted(context.getProperty(RedisUtils.POOL_BLOCK_WHEN_EXHAUSTED).asBoolean()); - poolConfig.setMaxWait(context.getProperty(RedisUtils.POOL_MAX_WAIT_TIME).asDuration()); - poolConfig.setMinEvictableIdleTime(context.getProperty(RedisUtils.POOL_MIN_EVICTABLE_IDLE_TIME).asDuration()); - poolConfig.setTimeBetweenEvictionRuns(context.getProperty(RedisUtils.POOL_TIME_BETWEEN_EVICTION_RUNS).asDuration()); - poolConfig.setNumTestsPerEvictionRun(context.getProperty(RedisUtils.POOL_NUM_TESTS_PER_EVICTION_RUN).asInteger()); - poolConfig.setTestOnCreate(context.getProperty(RedisUtils.POOL_TEST_ON_CREATE).asBoolean()); - poolConfig.setTestOnBorrow(context.getProperty(RedisUtils.POOL_TEST_ON_BORROW).asBoolean()); - poolConfig.setTestOnReturn(context.getProperty(RedisUtils.POOL_TEST_ON_RETURN).asBoolean()); - poolConfig.setTestWhileIdle(context.getProperty(RedisUtils.POOL_TEST_WHILE_IDLE).asBoolean()); + poolConfig.setMaxTotal(redisConfig.getPoolMaxTotal()); + poolConfig.setMaxIdle(redisConfig.getPoolMaxIdle()); + poolConfig.setMinIdle(redisConfig.getPoolMinIdle()); + poolConfig.setBlockWhenExhausted(redisConfig.getBlockWhenExhausted()); + poolConfig.setMaxWait(redisConfig.getMaxWaitTime()); + poolConfig.setMinEvictableIdleTime(redisConfig.getMinEvictableIdleTime()); + poolConfig.setTimeBetweenEvictionRuns(redisConfig.getTimeBetweenEvictionRuns()); + poolConfig.setNumTestsPerEvictionRun(redisConfig.getNumTestsPerEvictionRun()); + poolConfig.setTestOnCreate(redisConfig.getTestOnCreate()); + poolConfig.setTestOnBorrow(redisConfig.getTestOnBorrow()); + poolConfig.setTestOnReturn(redisConfig.getTestOnReturn()); + poolConfig.setTestWhileIdle(redisConfig.getTestWhenIdle()); return poolConfig; } @@ -482,4 +521,39 @@ public class RedisUtils { } } + public static RedisTemplate<String, String> createRedisTemplateForStreams(final RedisConnectionFactory connectionFactory) { + final StreamMessageListenerContainer.StreamMessageListenerContainerOptionsBuilder<String, MapRecord<String, String, byte[]>> builder = + StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() + .hashValueSerializer(RedisSerializer.byteArray()); + + final StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, byte[]>> containerOptions = builder.build(); + return createRedisTemplateForStreams(connectionFactory, containerOptions); + } + + public static RedisTemplate<String, String> createRedisTemplateForStreams( + final RedisConnectionFactory connectionFactory, + final StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, byte[]>> containerOptions) { + final RedisTemplate<String,String> redisTemplate = new RedisTemplate<>(); + redisTemplate.setKeySerializer(containerOptions.getKeySerializer()); + redisTemplate.setValueSerializer(containerOptions.getKeySerializer()); + redisTemplate.setHashKeySerializer(containerOptions.getHashKeySerializer()); + redisTemplate.setHashValueSerializer(containerOptions.getHashValueSerializer()); + redisTemplate.setConnectionFactory(connectionFactory); + redisTemplate.afterPropertiesSet(); + return redisTemplate; + } + + public static RedisTemplate<String, String> createRedisTemplateForKeyValues(final RedisConnectionFactory connectionFactory) { + final RedisTemplate<String, String> redisTemplate = new StringRedisTemplate(connectionFactory); + redisTemplate.afterPropertiesSet(); + return redisTemplate; + } + + public static RedisTemplate<String, byte[]> createRedisTemplateForPubSub(final RedisConnectionFactory connectionFactory) { + final RedisTemplate<String, byte[]> redisTemplate = new RedisTemplate<>(); + redisTemplate.setConnectionFactory(connectionFactory); + redisTemplate.setValueSerializer(RedisSerializer.byteArray()); + redisTemplate.afterPropertiesSet(); + return redisTemplate; + } } diff --git a/nifi-nar-bundles/nifi-redis-bundle/pom.xml b/nifi-nar-bundles/nifi-redis-bundle/pom.xml index 06e9c45e47..62d3c833f1 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-redis-bundle/pom.xml @@ -31,6 +31,7 @@ </properties> <modules> + <module>nifi-redis-utils</module> <module>nifi-redis-service-api</module> <module>nifi-redis-service-api-nar</module> <module>nifi-redis-extensions</module>