This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new a672294  NIFI-6621: Add support for Druid schema-less dimensions
a672294 is described below

commit a672294f3f3edf2f3fbd05a20d0585dc1a431c79
Author: samhjelmfelt <samhjelmf...@yahoo.com>
AuthorDate: Wed Sep 4 16:20:27 2019 -0500

    NIFI-6621: Add support for Druid schema-less dimensions
    
    NIFI-6621: Small change to fix failing tests
    
    NIFI-6621: Minor style changes
    Signed-off-by: Matthew Burgess <mattyb...@apache.org>
    
    This closes #3693
---
 .../controller/druid/DruidTranquilityController.java   | 18 +++++++++++-------
 .../druid/DruidTranquilityControllerTest.java          |  2 --
 .../druid/MockDruidTranquilityController.java          |  3 ++-
 3 files changed, 13 insertions(+), 10 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
index 78aab4c..4d69699 100644
--- 
a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
+++ 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
@@ -231,8 +231,7 @@ public class DruidTranquilityController extends 
AbstractControllerService implem
     public static final PropertyDescriptor DIMENSIONS_LIST = new 
PropertyDescriptor.Builder()
             .name("druid-cs-dimensions-list")
             .displayName("Dimension Fields")
-            .description("A comma separated list of field names that will be 
stored as dimensions on ingest.")
-            .required(true)
+            .description("A comma separated list of field names that will be 
stored as dimensions on ingest. Set to empty string for schema-less 
dimensions.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
@@ -400,7 +399,7 @@ public class DruidTranquilityController extends 
AbstractControllerService implem
 
         transitUri = String.format(FIREHOSE_PATTERN, dataSource) + 
";indexServicePath=" + indexService;
 
-        final List<String> dimensions = getDimensions(dimensionsStringList);
+        final DruidDimensions dimensions = getDimensions(dimensionsStringList);
         final List<AggregatorFactory> aggregator = 
getAggregatorList(aggregatorJSON);
 
         final Timestamper<Map<String, Object>> timestamper = new 
Timestamper<Map<String, Object>>() {
@@ -446,14 +445,14 @@ public class DruidTranquilityController extends 
AbstractControllerService implem
     }
 
     Beam<Map<String, Object>> buildBeam(String dataSource, String 
indexService, String discoveryPath, int clusterPartitions, int 
clusterReplication,
-                                        String segmentGranularity, String 
queryGranularity, String windowPeriod, String firehoseGracePeriod, String 
indexRetryPeriod, List<String> dimensions,
+                                        String segmentGranularity, String 
queryGranularity, String windowPeriod, String firehoseGracePeriod, String 
indexRetryPeriod, DruidDimensions dimensions,
                                         List<AggregatorFactory> aggregator, 
Timestamper<Map<String, Object>> timestamper, TimestampSpec timestampSpec) {
         return DruidBeams.builder(timestamper)
                 .curator(curator)
                 .discoveryPath(discoveryPath)
                 
.location(DruidLocation.create(DruidEnvironment.create(indexService, 
FIREHOSE_PATTERN), dataSource))
                 .timestampSpec(timestampSpec)
-                
.rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregator, 
QueryGranularity.fromString(queryGranularity)))
+                .rollup(DruidRollup.create(dimensions, aggregator, 
QueryGranularity.fromString(queryGranularity)))
                 .tuning(
                         ClusteredBeamTuning
                                 .builder()
@@ -518,7 +517,7 @@ public class DruidTranquilityController extends 
AbstractControllerService implem
         }
     }
 
-    private List<String> getDimensions(String dimensionStringList) {
+    private DruidDimensions getDimensions(String dimensionStringList) {
         List<String> dimensionList = new ArrayList<>();
         if (dimensionStringList != null) {
             Arrays.stream(dimensionStringList.split(","))
@@ -526,7 +525,12 @@ public class DruidTranquilityController extends 
AbstractControllerService implem
                     .map(String::trim)
                     .forEach(dimensionList::add);
         }
-        return dimensionList;
+        if(dimensionList.isEmpty()) {
+            getLogger().debug("Using schema-less dimensions");
+            return DruidDimensions.schemaless();
+        } else {
+            return DruidDimensions.specific(dimensionList);
+        }
     }
 
     private List<AggregatorFactory> getAggregatorList(String aggregatorJSON) {
diff --git 
a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/druid/DruidTranquilityControllerTest.java
 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/druid/DruidTranquilityControllerTest.java
index 56c2616..acccb67 100644
--- 
a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/druid/DruidTranquilityControllerTest.java
+++ 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/druid/DruidTranquilityControllerTest.java
@@ -55,8 +55,6 @@ public class DruidTranquilityControllerTest {
         runner.setProperty(service, 
DruidTranquilityController.ZOOKEEPER_CONNECTION_STRING, "localhost:2181");
         runner.assertNotValid(service);
         runner.setProperty(service, 
DruidTranquilityController.AGGREGATOR_JSON, "[{\"type\": \"count\", \"name\": 
\"count\"}]");
-        runner.assertNotValid(service);
-        runner.setProperty(service, 
DruidTranquilityController.DIMENSIONS_LIST, "dim1,dim2");
         runner.assertValid(service);
     }
 
diff --git 
a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java
 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java
index 62b0181..f41f164 100644
--- 
a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java
+++ 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.controller.druid;
 
 import com.metamx.tranquility.beam.Beam;
+import com.metamx.tranquility.druid.DruidDimensions;
 import com.metamx.tranquility.tranquilizer.MessageDroppedException;
 import com.metamx.tranquility.tranquilizer.Tranquilizer;
 import com.metamx.tranquility.typeclass.Timestamper;
@@ -137,7 +138,7 @@ public class MockDruidTranquilityController extends 
DruidTranquilityController {
     @SuppressWarnings("unchecked")
     @Override
     Beam<Map<String, Object>> buildBeam(String dataSource, String 
indexService, String discoveryPath, int clusterPartitions, int 
clusterReplication,
-                                        String segmentGranularity, String 
queryGranularity, String windowPeriod, String firehoseGracePeriod, String 
indexRetryPeriod, List<String> dimensions,
+                                        String segmentGranularity, String 
queryGranularity, String windowPeriod, String firehoseGracePeriod, String 
indexRetryPeriod, DruidDimensions dimensions,
                                         List<AggregatorFactory> aggregator, 
Timestamper<Map<String, Object>> timestamper, TimestampSpec timestampSpec) {
         return mock(Beam.class);
     }

Reply via email to