NIFI-4809 - Implement a SiteToSiteMetricsReportingTask Fixed dependency issue by providing a local JSON reader
Rebased + fixed conflict + updated versions in pom + EL scope Signed-off-by: Matthew Burgess <mattyb...@apache.org> This closes #2575 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6fbe1515 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6fbe1515 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6fbe1515 Branch: refs/heads/master Commit: 6fbe1515eefd2071dc75a1de2c1fc15cc282da76 Parents: ce0855e Author: Pierre Villard <pierre.villard...@gmail.com> Authored: Tue Jan 23 23:15:18 2018 +0100 Committer: Matthew Burgess <mattyb...@apache.org> Committed: Wed Apr 11 14:44:30 2018 -0400 ---------------------------------------------------------------------- .../nifi-ambari-reporting-task/pom.xml | 15 +- .../reporting/ambari/AmbariReportingTask.java | 4 +- .../reporting/ambari/api/MetricBuilder.java | 84 ---- .../nifi/reporting/ambari/api/MetricFields.java | 29 -- .../reporting/ambari/api/MetricsBuilder.java | 93 ---- .../reporting/ambari/metrics/MetricNames.java | 55 --- .../ambari/metrics/MetricsService.java | 131 ------ .../ambari/api/TestMetricsBuilder.java | 2 + .../ambari/metrics/TestMetricsService.java | 2 + .../nifi-reporting-utils/pom.xml | 10 + .../reporting/util/metrics/MetricNames.java | 59 +++ .../reporting/util/metrics/MetricsService.java | 230 ++++++++++ .../util/metrics/api/MetricBuilder.java | 84 ++++ .../util/metrics/api/MetricFields.java | 29 ++ .../util/metrics/api/MetricsBuilder.java | 93 ++++ .../nifi-site-to-site-reporting-task/pom.xml | 39 +- .../AbstractSiteToSiteReportingTask.java | 420 ++++++++++++++++++- .../SiteToSiteBulletinReportingTask.java | 18 +- .../SiteToSiteMetricsReportingTask.java | 222 ++++++++++ .../SiteToSiteProvenanceReportingTask.java | 28 +- .../SiteToSiteStatusReportingTask.java | 37 +- .../org.apache.nifi.reporting.ReportingTask | 3 +- .../additionalDetails.html | 178 ++++++++ .../additionalDetails.html | 2 +- .../src/main/resources/schema-metrics.avsc | 37 ++ .../TestSiteToSiteMetricsReportingTask.java | 296 +++++++++++++ 26 files changed, 1715 insertions(+), 485 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml index dafe829..de024e2 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml @@ -30,21 +30,11 @@ <artifactId>jersey-client</artifactId> </dependency> <dependency> - <groupId>org.glassfish</groupId> - <artifactId>javax.json</artifactId> - <version>1.0.4</version> - </dependency> - <dependency> <groupId>javax.json</groupId> <artifactId>javax.json-api</artifactId> <version>1.0</version> </dependency> <dependency> - <groupId>com.yammer.metrics</groupId> - <artifactId>metrics-core</artifactId> - <version>2.2.0</version> - </dependency> - <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-api</artifactId> </dependency> @@ -53,6 +43,11 @@ <artifactId>nifi-utils</artifactId> <version>1.7.0-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-reporting-utils</artifactId> + <version>1.7.0-SNAPSHOT</version> + </dependency> <!-- test dependencies --> <dependency> <groupId>org.apache.nifi</groupId> http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java index 5bbdecb..0568b3e 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java @@ -29,8 +29,8 @@ import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.AbstractReportingTask; import org.apache.nifi.reporting.ReportingContext; -import org.apache.nifi.reporting.ambari.api.MetricsBuilder; -import org.apache.nifi.reporting.ambari.metrics.MetricsService; +import org.apache.nifi.reporting.util.metrics.MetricsService; +import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder; import org.apache.nifi.scheduling.SchedulingStrategy; import javax.json.Json; http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java deleted file mode 100644 index 8e234ce..0000000 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.reporting.ambari.api; - -import javax.json.JsonBuilderFactory; -import javax.json.JsonObject; - -/** - * Builds the JsonObject for an individual metric. - */ -public class MetricBuilder { - - private final JsonBuilderFactory factory; - - private String applicationId; - private String instanceId; - private String hostname; - private String timestamp; - private String metricName; - private String metricValue; - - public MetricBuilder(final JsonBuilderFactory factory) { - this.factory = factory; - } - - public MetricBuilder applicationId(final String applicationId) { - this.applicationId = applicationId; - return this; - } - - public MetricBuilder instanceId(final String instanceId) { - this.instanceId = instanceId; - return this; - } - - public MetricBuilder hostname(final String hostname) { - this.hostname = hostname; - return this; - } - - public MetricBuilder timestamp(final long timestamp) { - this.timestamp = String.valueOf(timestamp); - return this; - } - - public MetricBuilder metricName(final String metricName) { - this.metricName = metricName; - return this; - } - - public MetricBuilder metricValue(final String metricValue) { - this.metricValue = metricValue; - return this; - } - - public JsonObject build() { - return factory.createObjectBuilder() - .add(MetricFields.METRIC_NAME, metricName) - .add(MetricFields.APP_ID, applicationId) - .add(MetricFields.INSTANCE_ID, instanceId) - .add(MetricFields.HOSTNAME, hostname) - .add(MetricFields.TIMESTAMP, timestamp) - .add(MetricFields.START_TIME, timestamp) - .add(MetricFields.METRICS, - factory.createObjectBuilder() - .add(String.valueOf(timestamp), metricValue) - ).build(); - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java deleted file mode 100644 index 1c1629c..0000000 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.reporting.ambari.api; - -public interface MetricFields { - - String METRIC_NAME = "metricname"; - String APP_ID = "appid"; - String INSTANCE_ID = "instanceid"; - String HOSTNAME = "hostname"; - String TIMESTAMP = "timestamp"; - String START_TIME = "starttime"; - String METRICS = "metrics"; - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java deleted file mode 100644 index 11b4db5..0000000 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.reporting.ambari.api; - -import javax.json.JsonArrayBuilder; -import javax.json.JsonBuilderFactory; -import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; -import java.util.HashMap; -import java.util.Map; - -/** - * Builds the overall JsonObject for the Metrics. - */ -public class MetricsBuilder { - - static final String ROOT_JSON_ELEMENT = "metrics"; - - private final JsonBuilderFactory factory; - - private long timestamp; - private String applicationId; - private String instanceId; - private String hostname; - private Map<String,String> metrics = new HashMap<>(); - - public MetricsBuilder(final JsonBuilderFactory factory) { - this.factory = factory; - } - - public MetricsBuilder applicationId(final String applicationId) { - this.applicationId = applicationId; - return this; - } - - public MetricsBuilder instanceId(final String instanceId) { - this.instanceId = instanceId; - return this; - } - - public MetricsBuilder hostname(final String hostname) { - this.hostname = hostname; - return this; - } - - public MetricsBuilder timestamp(final long timestamp) { - this.timestamp = timestamp; - return this; - } - - public MetricsBuilder metric(final String name, String value) { - this.metrics.put(name, value); - return this; - } - - public MetricsBuilder addAllMetrics(final Map<String,String> metrics) { - this.metrics.putAll(metrics); - return this; - } - - public JsonObject build() { - // builds JsonObject for individual metrics - final MetricBuilder metricBuilder = new MetricBuilder(factory); - metricBuilder.instanceId(instanceId).applicationId(applicationId).timestamp(timestamp).hostname(hostname); - - final JsonArrayBuilder metricArrayBuilder = factory.createArrayBuilder(); - - for (Map.Entry<String,String> entry : metrics.entrySet()) { - metricBuilder.metricName(entry.getKey()).metricValue(entry.getValue()); - metricArrayBuilder.add(metricBuilder.build()); - } - - // add the array of metrics to a top-level json object - final JsonObjectBuilder metricsBuilder = factory.createObjectBuilder(); - metricsBuilder.add(ROOT_JSON_ELEMENT, metricArrayBuilder); - return metricsBuilder.build(); - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java deleted file mode 100644 index 20cfa4e..0000000 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.reporting.ambari.metrics; - -/** - * The Metric names to send to Ambari. - */ -public interface MetricNames { - - // Metric Name separator - String METRIC_NAME_SEPARATOR = "."; - - // NiFi Metrics - String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes"; - String BYTES_RECEIVED = "BytesReceivedLast5Minutes"; - String FLOW_FILES_SENT = "FlowFilesSentLast5Minutes"; - String BYTES_SENT = "BytesSentLast5Minutes"; - String FLOW_FILES_QUEUED = "FlowFilesQueued"; - String BYTES_QUEUED = "BytesQueued"; - String BYTES_READ = "BytesReadLast5Minutes"; - String BYTES_WRITTEN = "BytesWrittenLast5Minutes"; - String ACTIVE_THREADS = "ActiveThreads"; - String TOTAL_TASK_DURATION_SECONDS = "TotalTaskDurationSeconds"; - String TOTAL_TASK_DURATION_NANOS = "TotalTaskDurationNanoSeconds"; - - // JVM Metrics - String JVM_UPTIME = "jvm.uptime"; - String JVM_HEAP_USED = "jvm.heap_used"; - String JVM_HEAP_USAGE = "jvm.heap_usage"; - String JVM_NON_HEAP_USAGE = "jvm.non_heap_usage"; - String JVM_THREAD_STATES_RUNNABLE = "jvm.thread_states.runnable"; - String JVM_THREAD_STATES_BLOCKED = "jvm.thread_states.blocked"; - String JVM_THREAD_STATES_TIMED_WAITING = "jvm.thread_states.timed_waiting"; - String JVM_THREAD_STATES_TERMINATED = "jvm.thread_states.terminated"; - String JVM_THREAD_COUNT = "jvm.thread_count"; - String JVM_DAEMON_THREAD_COUNT = "jvm.daemon_thread_count"; - String JVM_FILE_DESCRIPTOR_USAGE = "jvm.file_descriptor_usage"; - String JVM_GC_RUNS = "jvm.gc.runs"; - String JVM_GC_TIME = "jvm.gc.time"; - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java deleted file mode 100644 index cef257d..0000000 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.reporting.ambari.metrics; - -import com.yammer.metrics.core.VirtualMachineMetrics; -import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.controller.status.ProcessorStatus; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -/** - * A service used to produce key/value metrics based on a given input. - */ -public class MetricsService { - - /** - * Generates a Map of metrics for a ProcessGroupStatus instance. - * - * @param status a ProcessGroupStatus to get metrics from - * @param appendPgId if true, the process group ID will be appended at the end of the metric name - * @return a map of metrics for the given status - */ - public Map<String,String> getMetrics(ProcessGroupStatus status, boolean appendPgId) { - final Map<String,String> metrics = new HashMap<>(); - metrics.put(appendPgId(MetricNames.FLOW_FILES_RECEIVED, status, appendPgId), String.valueOf(status.getFlowFilesReceived())); - metrics.put(appendPgId(MetricNames.BYTES_RECEIVED, status, appendPgId), String.valueOf(status.getBytesReceived())); - metrics.put(appendPgId(MetricNames.FLOW_FILES_SENT, status, appendPgId), String.valueOf(status.getFlowFilesSent())); - metrics.put(appendPgId(MetricNames.BYTES_SENT, status, appendPgId), String.valueOf(status.getBytesSent())); - metrics.put(appendPgId(MetricNames.FLOW_FILES_QUEUED, status, appendPgId), String.valueOf(status.getQueuedCount())); - metrics.put(appendPgId(MetricNames.BYTES_QUEUED, status, appendPgId), String.valueOf(status.getQueuedContentSize())); - metrics.put(appendPgId(MetricNames.BYTES_READ, status, appendPgId), String.valueOf(status.getBytesRead())); - metrics.put(appendPgId(MetricNames.BYTES_WRITTEN, status, appendPgId), String.valueOf(status.getBytesWritten())); - metrics.put(appendPgId(MetricNames.ACTIVE_THREADS, status, appendPgId), String.valueOf(status.getActiveThreadCount())); - - final long durationNanos = calculateProcessingNanos(status); - metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_NANOS, status, appendPgId), String.valueOf(durationNanos)); - - final long durationSeconds = TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS); - metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_SECONDS, status, appendPgId), String.valueOf(durationSeconds)); - - return metrics; - } - - /** - * Generates a Map of metrics for VirtualMachineMetrics. - * - * @param virtualMachineMetrics a VirtualMachineMetrics instance to get metrics from - * @return a map of metrics from the given VirtualMachineStatus - */ - public Map<String,String> getMetrics(VirtualMachineMetrics virtualMachineMetrics) { - final Map<String,String> metrics = new HashMap<>(); - metrics.put(MetricNames.JVM_UPTIME, String.valueOf(virtualMachineMetrics.uptime())); - metrics.put(MetricNames.JVM_HEAP_USED, String.valueOf(virtualMachineMetrics.heapUsed())); - metrics.put(MetricNames.JVM_HEAP_USAGE, String.valueOf(virtualMachineMetrics.heapUsage())); - metrics.put(MetricNames.JVM_NON_HEAP_USAGE, String.valueOf(virtualMachineMetrics.nonHeapUsage())); - metrics.put(MetricNames.JVM_THREAD_COUNT, String.valueOf(virtualMachineMetrics.threadCount())); - metrics.put(MetricNames.JVM_DAEMON_THREAD_COUNT, String.valueOf(virtualMachineMetrics.daemonThreadCount())); - metrics.put(MetricNames.JVM_FILE_DESCRIPTOR_USAGE, String.valueOf(virtualMachineMetrics.fileDescriptorUsage())); - - for (Map.Entry<Thread.State,Double> entry : virtualMachineMetrics.threadStatePercentages().entrySet()) { - final int normalizedValue = (int) (100 * (entry.getValue() == null ? 0 : entry.getValue())); - switch(entry.getKey()) { - case BLOCKED: - metrics.put(MetricNames.JVM_THREAD_STATES_BLOCKED, String.valueOf(normalizedValue)); - break; - case RUNNABLE: - metrics.put(MetricNames.JVM_THREAD_STATES_RUNNABLE, String.valueOf(normalizedValue)); - break; - case TERMINATED: - metrics.put(MetricNames.JVM_THREAD_STATES_TERMINATED, String.valueOf(normalizedValue)); - break; - case TIMED_WAITING: - metrics.put(MetricNames.JVM_THREAD_STATES_TIMED_WAITING, String.valueOf(normalizedValue)); - break; - default: - break; - } - } - - for (Map.Entry<String,VirtualMachineMetrics.GarbageCollectorStats> entry : virtualMachineMetrics.garbageCollectors().entrySet()) { - final String gcName = entry.getKey().replace(" ", ""); - final long runs = entry.getValue().getRuns(); - final long timeMS = entry.getValue().getTime(TimeUnit.MILLISECONDS); - metrics.put(MetricNames.JVM_GC_RUNS + "." + gcName, String.valueOf(runs)); - metrics.put(MetricNames.JVM_GC_TIME + "." + gcName, String.valueOf(timeMS)); - } - - return metrics; - } - - // calculates the total processing time of all processors in nanos - protected long calculateProcessingNanos(final ProcessGroupStatus status) { - long nanos = 0L; - - for (final ProcessorStatus procStats : status.getProcessorStatus()) { - nanos += procStats.getProcessingNanos(); - } - - for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) { - nanos += calculateProcessingNanos(childGroupStatus); - } - - return nanos; - } - - // append the process group ID if necessary - private String appendPgId(String name, ProcessGroupStatus status, boolean appendPgId) { - if(appendPgId) { - return name + MetricNames.METRIC_NAME_SEPARATOR + status.getId(); - } else { - return name; - } - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java index cdaa453..9b96eb9 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.reporting.ambari.api; +import org.apache.nifi.reporting.util.metrics.api.MetricFields; +import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java index 93224eb..ec0cf6e 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java @@ -19,6 +19,8 @@ package org.apache.nifi.reporting.ambari.metrics; import com.yammer.metrics.core.VirtualMachineMetrics; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.reporting.util.metrics.MetricNames; +import org.apache.nifi.reporting.util.metrics.MetricsService; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml index ba10afb..3e2d158 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml @@ -40,6 +40,16 @@ <artifactId>commons-lang3</artifactId> <version>3.7</version> </dependency> + <dependency> + <groupId>com.yammer.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>2.2.0</version> + </dependency> + <dependency> + <groupId>org.glassfish</groupId> + <artifactId>javax.json</artifactId> + <version>1.0.4</version> + </dependency> </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java new file mode 100644 index 0000000..19bb90d --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.util.metrics; + +/** + * The Metric names to send to Ambari. + */ +public interface MetricNames { + + // Metric Name separator + String METRIC_NAME_SEPARATOR = "."; + + // NiFi Metrics + String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes"; + String BYTES_RECEIVED = "BytesReceivedLast5Minutes"; + String FLOW_FILES_SENT = "FlowFilesSentLast5Minutes"; + String BYTES_SENT = "BytesSentLast5Minutes"; + String FLOW_FILES_QUEUED = "FlowFilesQueued"; + String BYTES_QUEUED = "BytesQueued"; + String BYTES_READ = "BytesReadLast5Minutes"; + String BYTES_WRITTEN = "BytesWrittenLast5Minutes"; + String ACTIVE_THREADS = "ActiveThreads"; + String TOTAL_TASK_DURATION_SECONDS = "TotalTaskDurationSeconds"; + String TOTAL_TASK_DURATION_NANOS = "TotalTaskDurationNanoSeconds"; + + // JVM Metrics + String JVM_UPTIME = "jvm.uptime"; + String JVM_HEAP_USED = "jvm.heap_used"; + String JVM_HEAP_USAGE = "jvm.heap_usage"; + String JVM_NON_HEAP_USAGE = "jvm.non_heap_usage"; + String JVM_THREAD_STATES_RUNNABLE = "jvm.thread_states.runnable"; + String JVM_THREAD_STATES_BLOCKED = "jvm.thread_states.blocked"; + String JVM_THREAD_STATES_TIMED_WAITING = "jvm.thread_states.timed_waiting"; + String JVM_THREAD_STATES_TERMINATED = "jvm.thread_states.terminated"; + String JVM_THREAD_COUNT = "jvm.thread_count"; + String JVM_DAEMON_THREAD_COUNT = "jvm.daemon_thread_count"; + String JVM_FILE_DESCRIPTOR_USAGE = "jvm.file_descriptor_usage"; + String JVM_GC_RUNS = "jvm.gc.runs"; + String JVM_GC_TIME = "jvm.gc.time"; + + // OS Metrics + String LOAD1MN = "loadAverage1min"; + String CORES = "availableCores"; + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java new file mode 100644 index 0000000..ed3922a --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.util.metrics; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; + +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.reporting.util.metrics.api.MetricFields; + +import com.yammer.metrics.core.VirtualMachineMetrics; + +/** + * A service used to produce key/value metrics based on a given input. + */ +public class MetricsService { + + /** + * Generates a Map of metrics for a ProcessGroupStatus instance. + * + * @param status a ProcessGroupStatus to get metrics from + * @param appendPgId if true, the process group ID will be appended at the end of the metric name + * @return a map of metrics for the given status + */ + public Map<String,String> getMetrics(ProcessGroupStatus status, boolean appendPgId) { + final Map<String,String> metrics = new HashMap<>(); + + Map<String,Long> longMetrics = getLongMetrics(status, appendPgId); + for (String key : longMetrics.keySet()) { + metrics.put(key, String.valueOf(longMetrics.get(key))); + } + + Map<String,Integer> integerMetrics = getIntegerMetrics(status, appendPgId); + for (String key : integerMetrics.keySet()) { + metrics.put(key, String.valueOf(integerMetrics.get(key))); + } + + return metrics; + } + + private Map<String,Integer> getIntegerMetrics(ProcessGroupStatus status, boolean appendPgId) { + final Map<String,Integer> metrics = new HashMap<>(); + metrics.put(appendPgId(MetricNames.FLOW_FILES_RECEIVED, status, appendPgId), status.getFlowFilesReceived()); + metrics.put(appendPgId(MetricNames.FLOW_FILES_SENT, status, appendPgId), status.getFlowFilesSent()); + metrics.put(appendPgId(MetricNames.FLOW_FILES_QUEUED, status, appendPgId), status.getQueuedCount()); + metrics.put(appendPgId(MetricNames.ACTIVE_THREADS, status, appendPgId), status.getActiveThreadCount()); + return metrics; + } + + private Map<String,Long> getLongMetrics(ProcessGroupStatus status, boolean appendPgId) { + final Map<String,Long> metrics = new HashMap<>(); + metrics.put(appendPgId(MetricNames.BYTES_RECEIVED, status, appendPgId), status.getBytesReceived()); + metrics.put(appendPgId(MetricNames.BYTES_SENT, status, appendPgId), status.getBytesSent()); + metrics.put(appendPgId(MetricNames.BYTES_QUEUED, status, appendPgId), status.getQueuedContentSize()); + metrics.put(appendPgId(MetricNames.BYTES_READ, status, appendPgId), status.getBytesRead()); + metrics.put(appendPgId(MetricNames.BYTES_WRITTEN, status, appendPgId), status.getBytesWritten()); + + final long durationNanos = calculateProcessingNanos(status); + metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_NANOS, status, appendPgId), durationNanos); + + final long durationSeconds = TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS); + metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_SECONDS, status, appendPgId), durationSeconds); + + return metrics; + } + + /** + * Generates a Map of metrics for VirtualMachineMetrics. + * + * @param virtualMachineMetrics a VirtualMachineMetrics instance to get metrics from + * @return a map of metrics from the given VirtualMachineStatus + */ + public Map<String,String> getMetrics(VirtualMachineMetrics virtualMachineMetrics) { + final Map<String,String> metrics = new HashMap<>(); + + Map<String,Integer> integerMetrics = getIntegerMetrics(virtualMachineMetrics); + for (String key : integerMetrics.keySet()) { + metrics.put(key, String.valueOf(integerMetrics.get(key))); + } + + Map<String,Long> longMetrics = getLongMetrics(virtualMachineMetrics); + for (String key : longMetrics.keySet()) { + metrics.put(key, String.valueOf(longMetrics.get(key))); + } + + Map<String,Double> doubleMetrics = getDoubleMetrics(virtualMachineMetrics); + for (String key : doubleMetrics.keySet()) { + metrics.put(key, String.valueOf(doubleMetrics.get(key))); + } + + return metrics; + } + + // calculates the total processing time of all processors in nanos + protected long calculateProcessingNanos(final ProcessGroupStatus status) { + long nanos = 0L; + + for (final ProcessorStatus procStats : status.getProcessorStatus()) { + nanos += procStats.getProcessingNanos(); + } + + for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) { + nanos += calculateProcessingNanos(childGroupStatus); + } + + return nanos; + } + + // append the process group ID if necessary + private String appendPgId(String name, ProcessGroupStatus status, boolean appendPgId) { + if(appendPgId) { + return name + MetricNames.METRIC_NAME_SEPARATOR + status.getId(); + } else { + return name; + } + } + + private Map<String,Double> getDoubleMetrics(VirtualMachineMetrics virtualMachineMetrics) { + final Map<String,Double> metrics = new HashMap<>(); + metrics.put(MetricNames.JVM_HEAP_USED, virtualMachineMetrics.heapUsed()); + metrics.put(MetricNames.JVM_HEAP_USAGE, virtualMachineMetrics.heapUsage()); + metrics.put(MetricNames.JVM_NON_HEAP_USAGE, virtualMachineMetrics.nonHeapUsage()); + metrics.put(MetricNames.JVM_FILE_DESCRIPTOR_USAGE, virtualMachineMetrics.fileDescriptorUsage()); + return metrics; + } + + private Map<String,Long> getLongMetrics(VirtualMachineMetrics virtualMachineMetrics) { + final Map<String,Long> metrics = new HashMap<>(); + metrics.put(MetricNames.JVM_UPTIME, virtualMachineMetrics.uptime()); + + for (Map.Entry<String,VirtualMachineMetrics.GarbageCollectorStats> entry : virtualMachineMetrics.garbageCollectors().entrySet()) { + final String gcName = entry.getKey().replace(" ", ""); + final long runs = entry.getValue().getRuns(); + final long timeMS = entry.getValue().getTime(TimeUnit.MILLISECONDS); + metrics.put(MetricNames.JVM_GC_RUNS + "." + gcName, runs); + metrics.put(MetricNames.JVM_GC_TIME + "." + gcName, timeMS); + } + + return metrics; + } + + private Map<String,Integer> getIntegerMetrics(VirtualMachineMetrics virtualMachineMetrics) { + final Map<String,Integer> metrics = new HashMap<>(); + metrics.put(MetricNames.JVM_DAEMON_THREAD_COUNT, virtualMachineMetrics.daemonThreadCount()); + metrics.put(MetricNames.JVM_THREAD_COUNT, virtualMachineMetrics.threadCount()); + + for (Map.Entry<Thread.State,Double> entry : virtualMachineMetrics.threadStatePercentages().entrySet()) { + final int normalizedValue = (int) (100 * (entry.getValue() == null ? 0 : entry.getValue())); + switch(entry.getKey()) { + case BLOCKED: + metrics.put(MetricNames.JVM_THREAD_STATES_BLOCKED, normalizedValue); + break; + case RUNNABLE: + metrics.put(MetricNames.JVM_THREAD_STATES_RUNNABLE, normalizedValue); + break; + case TERMINATED: + metrics.put(MetricNames.JVM_THREAD_STATES_TERMINATED, normalizedValue); + break; + case TIMED_WAITING: + metrics.put(MetricNames.JVM_THREAD_STATES_TIMED_WAITING, normalizedValue); + break; + default: + break; + } + } + + return metrics; + } + + public JsonObject getMetrics(JsonBuilderFactory factory, ProcessGroupStatus status, VirtualMachineMetrics virtualMachineMetrics, + String applicationId, String id, String hostname, long currentTimeMillis, int availableProcessors, double systemLoad) { + JsonObjectBuilder objectBuilder = factory.createObjectBuilder() + .add(MetricFields.APP_ID, applicationId) + .add(MetricFields.HOSTNAME, hostname) + .add(MetricFields.INSTANCE_ID, status.getId()) + .add(MetricFields.TIMESTAMP, currentTimeMillis); + + objectBuilder + .add(MetricNames.CORES, availableProcessors) + .add(MetricNames.LOAD1MN, systemLoad); + + Map<String,Integer> integerMetrics = getIntegerMetrics(virtualMachineMetrics); + for (String key : integerMetrics.keySet()) { + objectBuilder.add(key.replaceAll("\\.", ""), integerMetrics.get(key)); + } + + Map<String,Long> longMetrics = getLongMetrics(virtualMachineMetrics); + for (String key : longMetrics.keySet()) { + objectBuilder.add(key.replaceAll("\\.", ""), longMetrics.get(key)); + } + + Map<String,Double> doubleMetrics = getDoubleMetrics(virtualMachineMetrics); + for (String key : doubleMetrics.keySet()) { + objectBuilder.add(key.replaceAll("\\.", ""), doubleMetrics.get(key)); + } + + Map<String,Long> longPgMetrics = getLongMetrics(status, false); + for (String key : longPgMetrics.keySet()) { + objectBuilder.add(key, longPgMetrics.get(key)); + } + + Map<String,Integer> integerPgMetrics = getIntegerMetrics(status, false); + for (String key : integerPgMetrics.keySet()) { + objectBuilder.add(key, integerPgMetrics.get(key)); + } + + return objectBuilder.build(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java new file mode 100644 index 0000000..81fb021 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.util.metrics.api; + +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; + +/** + * Builds the JsonObject for an individual metric. + */ +public class MetricBuilder { + + private final JsonBuilderFactory factory; + + private String applicationId; + private String instanceId; + private String hostname; + private String timestamp; + private String metricName; + private String metricValue; + + public MetricBuilder(final JsonBuilderFactory factory) { + this.factory = factory; + } + + public MetricBuilder applicationId(final String applicationId) { + this.applicationId = applicationId; + return this; + } + + public MetricBuilder instanceId(final String instanceId) { + this.instanceId = instanceId; + return this; + } + + public MetricBuilder hostname(final String hostname) { + this.hostname = hostname; + return this; + } + + public MetricBuilder timestamp(final long timestamp) { + this.timestamp = String.valueOf(timestamp); + return this; + } + + public MetricBuilder metricName(final String metricName) { + this.metricName = metricName; + return this; + } + + public MetricBuilder metricValue(final String metricValue) { + this.metricValue = metricValue; + return this; + } + + public JsonObject build() { + return factory.createObjectBuilder() + .add(MetricFields.METRIC_NAME, metricName) + .add(MetricFields.APP_ID, applicationId) + .add(MetricFields.INSTANCE_ID, instanceId) + .add(MetricFields.HOSTNAME, hostname) + .add(MetricFields.TIMESTAMP, timestamp) + .add(MetricFields.START_TIME, timestamp) + .add(MetricFields.METRICS, + factory.createObjectBuilder() + .add(String.valueOf(timestamp), metricValue) + ).build(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java new file mode 100644 index 0000000..4c451ea --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.util.metrics.api; + +public interface MetricFields { + + String METRIC_NAME = "metricname"; + String APP_ID = "appid"; + String INSTANCE_ID = "instanceid"; + String HOSTNAME = "hostname"; + String TIMESTAMP = "timestamp"; + String START_TIME = "starttime"; + String METRICS = "metrics"; + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java new file mode 100644 index 0000000..3694720 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.util.metrics.api; + +import javax.json.JsonArrayBuilder; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; +import java.util.HashMap; +import java.util.Map; + +/** + * Builds the overall JsonObject for the Metrics. + */ +public class MetricsBuilder { + + static final String ROOT_JSON_ELEMENT = "metrics"; + + private final JsonBuilderFactory factory; + + private long timestamp; + private String applicationId; + private String instanceId; + private String hostname; + private Map<String,String> metrics = new HashMap<>(); + + public MetricsBuilder(final JsonBuilderFactory factory) { + this.factory = factory; + } + + public MetricsBuilder applicationId(final String applicationId) { + this.applicationId = applicationId; + return this; + } + + public MetricsBuilder instanceId(final String instanceId) { + this.instanceId = instanceId; + return this; + } + + public MetricsBuilder hostname(final String hostname) { + this.hostname = hostname; + return this; + } + + public MetricsBuilder timestamp(final long timestamp) { + this.timestamp = timestamp; + return this; + } + + public MetricsBuilder metric(final String name, String value) { + this.metrics.put(name, value); + return this; + } + + public MetricsBuilder addAllMetrics(final Map<String,String> metrics) { + this.metrics.putAll(metrics); + return this; + } + + public JsonObject build() { + // builds JsonObject for individual metrics + final MetricBuilder metricBuilder = new MetricBuilder(factory); + metricBuilder.instanceId(instanceId).applicationId(applicationId).timestamp(timestamp).hostname(hostname); + + final JsonArrayBuilder metricArrayBuilder = factory.createArrayBuilder(); + + for (Map.Entry<String,String> entry : metrics.entrySet()) { + metricBuilder.metricName(entry.getKey()).metricValue(entry.getValue()); + metricArrayBuilder.add(metricBuilder.build()); + } + + // add the array of metrics to a top-level json object + final JsonObjectBuilder metricsBuilder = factory.createObjectBuilder(); + metricsBuilder.add(ROOT_JSON_ELEMENT, metricArrayBuilder); + return metricsBuilder.build(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml index c320ae2..93a3196 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml @@ -55,6 +55,23 @@ <version>1.7.0-SNAPSHOT</version> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-schema-registry-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-avro-record-utils</artifactId> + <version>1.7.0-SNAPSHOT</version> + </dependency> + <dependency> <groupId>org.glassfish</groupId> <artifactId>javax.json</artifactId> <version>1.0.4</version> @@ -83,10 +100,30 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock-record-utils</artifactId> + <version>1.7.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> - <version>4.12</version> <scope>test</scope> </dependency> </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes combine.children="append"> + <exclude>src/main/resources/schema-metrics.avsc</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> + </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java index 341a6d8..e755354 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java @@ -16,6 +16,23 @@ */ package org.apache.nifi.reporting; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.text.DateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import javax.json.JsonObjectBuilder; +import javax.json.JsonValue; +import javax.net.ssl.SSLContext; + import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; @@ -25,27 +42,51 @@ import org.apache.nifi.components.Validator; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.events.EventReporter; import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.remote.protocol.http.HttpProxy; import org.apache.nifi.remote.util.SiteToSiteRestApiClient; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SerializedForm; +import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.MapDataType; +import org.apache.nifi.serialization.record.type.RecordDataType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.ssl.RestrictedSSLContextService; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.util.StringUtils; - -import javax.net.ssl.SSLContext; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.JsonToken; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.node.ArrayNode; /** * Base class for ReportingTasks that send data over site-to-site. */ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingTask { + + protected static final String LAST_EVENT_ID_KEY = "last_event_id"; protected static final String DESTINATION_URL_PATH = "/nifi"; + protected static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder() .name("Destination URL") @@ -141,8 +182,16 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT .sensitive(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .build(); + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("Specifies the Controller Service to use for writing out the records.") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(false) + .build(); protected volatile SiteToSiteClient siteToSiteClient; + protected volatile RecordSchema recordSchema; @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { @@ -188,7 +237,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT final SiteToSiteTransportProtocol mode = SiteToSiteTransportProtocol.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue()); final HttpProxy httpProxy = mode.equals(SiteToSiteTransportProtocol.RAW) || StringUtils.isEmpty(context.getProperty(HTTP_PROXY_HOSTNAME).getValue()) ? null : new HttpProxy(context.getProperty(HTTP_PROXY_HOSTNAME).getValue(), context.getProperty(HTTP_PROXY_PORT).asInteger(), - context.getProperty(HTTP_PROXY_USERNAME).getValue(), context.getProperty(HTTP_PROXY_PASSWORD).getValue()); + context.getProperty(HTTP_PROXY_USERNAME).getValue(), context.getProperty(HTTP_PROXY_PASSWORD).getValue()); siteToSiteClient = new SiteToSiteClient.Builder() .urls(SiteToSiteRestApiClient.parseClusterUrls(destinationUrl)) @@ -215,6 +264,33 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT return this.siteToSiteClient; } + protected byte[] getData(final ReportingContext context, InputStream in, Map<String, String> attributes) { + try (final JsonRecordReader reader = new JsonRecordReader(in, recordSchema)) { + + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final RecordSchema writeSchema = writerFactory.getSchema(null, recordSchema); + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + + try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) { + writer.beginRecordSet(); + + Record record; + while ((record = reader.nextRecord()) != null) { + writer.write(record); + } + + final WriteResult writeResult = writer.finishRecordSet(); + + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + attributes.putAll(writeResult.getAttributes()); + } + + return out.toByteArray(); + } catch (IOException | SchemaNotFoundException | MalformedRecordException e) { + throw new ProcessException("Failed to write metrics using record writer: " + e.getMessage(), e); + } + } + static class NiFiUrlValidator implements Validator { @Override public ValidationResult validate(final String subject, final String input, final ValidationContext context) { @@ -236,4 +312,334 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT } } } + + protected void addField(final JsonObjectBuilder builder, final String key, final Long value) { + if (value != null) { + builder.add(key, value.longValue()); + } + } + + protected void addField(final JsonObjectBuilder builder, final String key, final Integer value) { + if (value != null) { + builder.add(key, value.intValue()); + } + } + + protected void addField(final JsonObjectBuilder builder, final String key, final String value) { + if (value == null) { + return; + } + + builder.add(key, value); + } + + protected void addField(final JsonObjectBuilder builder, final String key, final String value, final boolean allowNullValues) { + if (value == null) { + if (allowNullValues) { + builder.add(key, JsonValue.NULL); + } + } else { + builder.add(key, value); + } + } + + private class JsonRecordReader implements RecordReader { + + private RecordSchema recordSchema; + private final JsonParser jsonParser; + private final boolean array; + private final JsonNode firstJsonNode; + private boolean firstObjectConsumed = false; + + private final Supplier<DateFormat> dateFormat = () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()); + private final Supplier<DateFormat> timeFormat = () -> DataTypeUtils.getDateFormat(RecordFieldType.TIME.getDefaultFormat()); + private final Supplier<DateFormat> timestampFormat = () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()); + + public JsonRecordReader(final InputStream in, RecordSchema recordSchema) throws IOException, MalformedRecordException { + this.recordSchema = recordSchema; + try { + jsonParser = new JsonFactory().createJsonParser(in); + jsonParser.setCodec(new ObjectMapper()); + JsonToken token = jsonParser.nextToken(); + if (token == JsonToken.START_ARRAY) { + array = true; + token = jsonParser.nextToken(); + } else { + array = false; + } + if (token == JsonToken.START_OBJECT) { + firstJsonNode = jsonParser.readValueAsTree(); + } else { + firstJsonNode = null; + } + } catch (final JsonParseException e) { + throw new MalformedRecordException("Could not parse data as JSON", e); + } + } + + @Override + public void close() throws IOException { + jsonParser.close(); + } + + @Override + public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException { + if (firstObjectConsumed && !array) { + return null; + } + try { + return convertJsonNodeToRecord(getNextJsonNode(), getSchema(), null, coerceTypes, dropUnknownFields); + } catch (final MalformedRecordException mre) { + throw mre; + } catch (final IOException ioe) { + throw ioe; + } catch (final Exception e) { + throw new MalformedRecordException("Failed to convert data into a Record object with the given schema", e); + } + } + + @Override + public RecordSchema getSchema() throws MalformedRecordException { + return recordSchema; + } + + private JsonNode getNextJsonNode() throws JsonParseException, IOException, MalformedRecordException { + if (!firstObjectConsumed) { + firstObjectConsumed = true; + return firstJsonNode; + } + while (true) { + final JsonToken token = jsonParser.nextToken(); + if (token == null) { + return null; + } + switch (token) { + case END_OBJECT: + continue; + case START_OBJECT: + return jsonParser.readValueAsTree(); + case END_ARRAY: + case START_ARRAY: + return null; + default: + throw new MalformedRecordException("Expected to get a JSON Object but got a token of type " + token.name()); + } + } + } + + private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final String fieldNamePrefix, + final boolean coerceTypes, final boolean dropUnknown) throws IOException, MalformedRecordException { + + final Map<String, Object> values = new HashMap<>(schema.getFieldCount() * 2); + + if (dropUnknown) { + for (final RecordField recordField : schema.getFields()) { + final JsonNode childNode = getChildNode(jsonNode, recordField); + if (childNode == null) { + continue; + } + + final String fieldName = recordField.getFieldName(); + final Object value; + + if (coerceTypes) { + final DataType desiredType = recordField.getDataType(); + final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName; + value = convertField(childNode, fullFieldName, desiredType, dropUnknown); + } else { + value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType()); + } + + values.put(fieldName, value); + } + } else { + final Iterator<String> fieldNames = jsonNode.getFieldNames(); + while (fieldNames.hasNext()) { + final String fieldName = fieldNames.next(); + final JsonNode childNode = jsonNode.get(fieldName); + final RecordField recordField = schema.getField(fieldName).orElse(null); + final Object value; + + if (coerceTypes && recordField != null) { + final DataType desiredType = recordField.getDataType(); + final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName; + value = convertField(childNode, fullFieldName, desiredType, dropUnknown); + } else { + value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType()); + } + + values.put(fieldName, value); + } + } + + final Supplier<String> supplier = () -> jsonNode.toString(); + return new MapRecord(schema, values, SerializedForm.of(supplier, "application/json"), false, dropUnknown); + } + + private JsonNode getChildNode(final JsonNode jsonNode, final RecordField field) { + if (jsonNode.has(field.getFieldName())) { + return jsonNode.get(field.getFieldName()); + } + for (final String alias : field.getAliases()) { + if (jsonNode.has(alias)) { + return jsonNode.get(alias); + } + } + return null; + } + + protected Object convertField(final JsonNode fieldNode, final String fieldName, final DataType desiredType, final boolean dropUnknown) throws IOException, MalformedRecordException { + if (fieldNode == null || fieldNode.isNull()) { + return null; + } + + switch (desiredType.getFieldType()) { + case BOOLEAN: + case BYTE: + case CHAR: + case DOUBLE: + case FLOAT: + case INT: + case BIGINT: + case LONG: + case SHORT: + case STRING: + case DATE: + case TIME: + case TIMESTAMP: { + final Object rawValue = getRawNodeValue(fieldNode, null); + final Object converted = DataTypeUtils.convertType(rawValue, desiredType, dateFormat, timeFormat, timestampFormat, fieldName); + return converted; + } + case MAP: { + final DataType valueType = ((MapDataType) desiredType).getValueType(); + + final Map<String, Object> map = new HashMap<>(); + final Iterator<String> fieldNameItr = fieldNode.getFieldNames(); + while (fieldNameItr.hasNext()) { + final String childName = fieldNameItr.next(); + final JsonNode childNode = fieldNode.get(childName); + final Object childValue = convertField(childNode, fieldName, valueType, dropUnknown); + map.put(childName, childValue); + } + + return map; + } + case ARRAY: { + final ArrayNode arrayNode = (ArrayNode) fieldNode; + final int numElements = arrayNode.size(); + final Object[] arrayElements = new Object[numElements]; + int count = 0; + for (final JsonNode node : arrayNode) { + final DataType elementType = ((ArrayDataType) desiredType).getElementType(); + final Object converted = convertField(node, fieldName, elementType, dropUnknown); + arrayElements[count++] = converted; + } + + return arrayElements; + } + case RECORD: { + if (fieldNode.isObject()) { + RecordSchema childSchema; + if (desiredType instanceof RecordDataType) { + childSchema = ((RecordDataType) desiredType).getChildSchema(); + } else { + return null; + } + + if (childSchema == null) { + final List<RecordField> fields = new ArrayList<>(); + final Iterator<String> fieldNameItr = fieldNode.getFieldNames(); + while (fieldNameItr.hasNext()) { + fields.add(new RecordField(fieldNameItr.next(), RecordFieldType.STRING.getDataType())); + } + + childSchema = new SimpleRecordSchema(fields); + } + + return convertJsonNodeToRecord(fieldNode, childSchema, fieldName + ".", true, dropUnknown); + } else { + return null; + } + } + case CHOICE: { + return DataTypeUtils.convertType(getRawNodeValue(fieldNode, null), desiredType, fieldName); + } + } + + return null; + } + + protected Object getRawNodeValue(final JsonNode fieldNode, final DataType dataType) throws IOException { + if (fieldNode == null || fieldNode.isNull()) { + return null; + } + + if (fieldNode.isNumber()) { + return fieldNode.getNumberValue(); + } + + if (fieldNode.isBinary()) { + return fieldNode.getBinaryValue(); + } + + if (fieldNode.isBoolean()) { + return fieldNode.getBooleanValue(); + } + + if (fieldNode.isTextual()) { + return fieldNode.getTextValue(); + } + + if (fieldNode.isArray()) { + final ArrayNode arrayNode = (ArrayNode) fieldNode; + final int numElements = arrayNode.size(); + final Object[] arrayElements = new Object[numElements]; + int count = 0; + + final DataType elementDataType; + if (dataType != null && dataType.getFieldType() == RecordFieldType.ARRAY) { + final ArrayDataType arrayDataType = (ArrayDataType) dataType; + elementDataType = arrayDataType.getElementType(); + } else { + elementDataType = null; + } + + for (final JsonNode node : arrayNode) { + final Object value = getRawNodeValue(node, elementDataType); + arrayElements[count++] = value; + } + + return arrayElements; + } + + if (fieldNode.isObject()) { + RecordSchema childSchema; + if (dataType != null && RecordFieldType.RECORD == dataType.getFieldType()) { + final RecordDataType recordDataType = (RecordDataType) dataType; + childSchema = recordDataType.getChildSchema(); + } else { + childSchema = null; + } + + if (childSchema == null) { + childSchema = new SimpleRecordSchema(Collections.emptyList()); + } + + final Iterator<String> fieldNames = fieldNode.getFieldNames(); + final Map<String, Object> childValues = new HashMap<>(); + while (fieldNames.hasNext()) { + final String childFieldName = fieldNames.next(); + final Object childValue = getRawNodeValue(fieldNode.get(childFieldName), dataType); + childValues.put(childFieldName, childValue); + } + + final MapRecord record = new MapRecord(childSchema, childValues); + return record; + } + + return null; + } + + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java index fac7696..20ed96a 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java @@ -68,9 +68,6 @@ import java.util.concurrent.TimeUnit; @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min") public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReportingTask { - static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; - static final String LAST_EVENT_ID_KEY = "last_event_id"; - static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder() .name("Platform") .description("The value to use for the platform field in each provenance event.") @@ -195,7 +192,7 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting lastSentBulletinId = currMaxId; } - static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df, + private JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df, final String platform, final String nodeIdentifier) { addField(builder, "objectId", UUID.randomUUID().toString()); @@ -216,17 +213,4 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting return builder.build(); } - private static void addField(final JsonObjectBuilder builder, final String key, final Long value) { - if (value != null) { - builder.add(key, value.longValue()); - } - } - - private static void addField(final JsonObjectBuilder builder, final String key, final String value) { - if (value == null) { - return; - } - builder.add(key, value); - } - }