This is an automated email from the ASF dual-hosted git repository. mattyb149 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new a672294 NIFI-6621: Add support for Druid schema-less dimensions a672294 is described below commit a672294f3f3edf2f3fbd05a20d0585dc1a431c79 Author: samhjelmfelt <samhjelmf...@yahoo.com> AuthorDate: Wed Sep 4 16:20:27 2019 -0500 NIFI-6621: Add support for Druid schema-less dimensions NIFI-6621: Small change to fix failing tests NIFI-6621: Minor style changes Signed-off-by: Matthew Burgess <mattyb...@apache.org> This closes #3693 --- .../controller/druid/DruidTranquilityController.java | 18 +++++++++++------- .../druid/DruidTranquilityControllerTest.java | 2 -- .../druid/MockDruidTranquilityController.java | 3 ++- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java index 78aab4c..4d69699 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java @@ -231,8 +231,7 @@ public class DruidTranquilityController extends AbstractControllerService implem public static final PropertyDescriptor DIMENSIONS_LIST = new PropertyDescriptor.Builder() .name("druid-cs-dimensions-list") .displayName("Dimension Fields") - .description("A comma separated list of field names that will be stored as dimensions on ingest.") - .required(true) + .description("A comma separated list of field names that will be stored as dimensions on ingest. Set to empty string for schema-less dimensions.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); @@ -400,7 +399,7 @@ public class DruidTranquilityController extends AbstractControllerService implem transitUri = String.format(FIREHOSE_PATTERN, dataSource) + ";indexServicePath=" + indexService; - final List<String> dimensions = getDimensions(dimensionsStringList); + final DruidDimensions dimensions = getDimensions(dimensionsStringList); final List<AggregatorFactory> aggregator = getAggregatorList(aggregatorJSON); final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>() { @@ -446,14 +445,14 @@ public class DruidTranquilityController extends AbstractControllerService implem } Beam<Map<String, Object>> buildBeam(String dataSource, String indexService, String discoveryPath, int clusterPartitions, int clusterReplication, - String segmentGranularity, String queryGranularity, String windowPeriod, String firehoseGracePeriod, String indexRetryPeriod, List<String> dimensions, + String segmentGranularity, String queryGranularity, String windowPeriod, String firehoseGracePeriod, String indexRetryPeriod, DruidDimensions dimensions, List<AggregatorFactory> aggregator, Timestamper<Map<String, Object>> timestamper, TimestampSpec timestampSpec) { return DruidBeams.builder(timestamper) .curator(curator) .discoveryPath(discoveryPath) .location(DruidLocation.create(DruidEnvironment.create(indexService, FIREHOSE_PATTERN), dataSource)) .timestampSpec(timestampSpec) - .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregator, QueryGranularity.fromString(queryGranularity))) + .rollup(DruidRollup.create(dimensions, aggregator, QueryGranularity.fromString(queryGranularity))) .tuning( ClusteredBeamTuning .builder() @@ -518,7 +517,7 @@ public class DruidTranquilityController extends AbstractControllerService implem } } - private List<String> getDimensions(String dimensionStringList) { + private DruidDimensions getDimensions(String dimensionStringList) { List<String> dimensionList = new ArrayList<>(); if (dimensionStringList != null) { Arrays.stream(dimensionStringList.split(",")) @@ -526,7 +525,12 @@ public class DruidTranquilityController extends AbstractControllerService implem .map(String::trim) .forEach(dimensionList::add); } - return dimensionList; + if(dimensionList.isEmpty()) { + getLogger().debug("Using schema-less dimensions"); + return DruidDimensions.schemaless(); + } else { + return DruidDimensions.specific(dimensionList); + } } private List<AggregatorFactory> getAggregatorList(String aggregatorJSON) { diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/druid/DruidTranquilityControllerTest.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/druid/DruidTranquilityControllerTest.java index 56c2616..acccb67 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/druid/DruidTranquilityControllerTest.java +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/druid/DruidTranquilityControllerTest.java @@ -55,8 +55,6 @@ public class DruidTranquilityControllerTest { runner.setProperty(service, DruidTranquilityController.ZOOKEEPER_CONNECTION_STRING, "localhost:2181"); runner.assertNotValid(service); runner.setProperty(service, DruidTranquilityController.AGGREGATOR_JSON, "[{\"type\": \"count\", \"name\": \"count\"}]"); - runner.assertNotValid(service); - runner.setProperty(service, DruidTranquilityController.DIMENSIONS_LIST, "dim1,dim2"); runner.assertValid(service); } diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java index 62b0181..f41f164 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java @@ -17,6 +17,7 @@ package org.apache.nifi.controller.druid; import com.metamx.tranquility.beam.Beam; +import com.metamx.tranquility.druid.DruidDimensions; import com.metamx.tranquility.tranquilizer.MessageDroppedException; import com.metamx.tranquility.tranquilizer.Tranquilizer; import com.metamx.tranquility.typeclass.Timestamper; @@ -137,7 +138,7 @@ public class MockDruidTranquilityController extends DruidTranquilityController { @SuppressWarnings("unchecked") @Override Beam<Map<String, Object>> buildBeam(String dataSource, String indexService, String discoveryPath, int clusterPartitions, int clusterReplication, - String segmentGranularity, String queryGranularity, String windowPeriod, String firehoseGracePeriod, String indexRetryPeriod, List<String> dimensions, + String segmentGranularity, String queryGranularity, String windowPeriod, String firehoseGracePeriod, String indexRetryPeriod, DruidDimensions dimensions, List<AggregatorFactory> aggregator, Timestamper<Map<String, Object>> timestamper, TimestampSpec timestampSpec) { return mock(Beam.class); }