This is an automated email from the ASF dual-hosted git repository. rzo1 pushed a commit to branch STORM-3988 in repository https://gitbox.apache.org/repos/asf/storm.git
commit 50bb8c49e13cac5167d1cddf562004503dd4ef2c Author: Richard Zowalla <[email protected]> AuthorDate: Thu Oct 19 08:59:17 2023 +0200 STORM-3988 - Remove "storm-opentsdb" --- examples/storm-opentsdb-examples/pom.xml | 96 ---------- .../apache/storm/opentsdb/MetricGenBatchSpout.java | 96 ---------- .../org/apache/storm/opentsdb/MetricGenSpout.java | 74 -------- .../storm/opentsdb/SampleOpenTsdbBoltTopology.java | 61 ------- .../opentsdb/SampleOpenTsdbTridentTopology.java | 77 -------- external/storm-opentsdb/README.md | 76 -------- external/storm-opentsdb/pom.xml | 114 ------------ .../storm/opentsdb/OpenTsdbMetricDatapoint.java | 134 -------------- .../bolt/ITupleOpenTsdbDatapointMapper.java | 39 ---- .../apache/storm/opentsdb/bolt/OpenTsdbBolt.java | 188 ------------------- .../bolt/TupleOpenTsdbDatapointMapper.java | 135 -------------- .../storm/opentsdb/client/ClientResponse.java | 200 --------------------- .../storm/opentsdb/client/OpenTsdbClient.java | 155 ---------------- .../storm/opentsdb/trident/OpenTsdbState.java | 92 ---------- .../opentsdb/trident/OpenTsdbStateFactory.java | 52 ------ .../opentsdb/trident/OpenTsdbStateUpdater.java | 37 ---- pom.xml | 2 - 17 files changed, 1628 deletions(-) diff --git a/examples/storm-opentsdb-examples/pom.xml b/examples/storm-opentsdb-examples/pom.xml deleted file mode 100644 index 6099e04d8..000000000 --- a/examples/storm-opentsdb-examples/pom.xml +++ /dev/null @@ -1,96 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <artifactId>storm</artifactId> - <groupId>org.apache.storm</groupId> - <version>2.6.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <artifactId>storm-opentsdb-examples</artifactId> - - <dependencies> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-client</artifactId> - <version>${project.version}</version> - <scope>${provided.scope}</scope> - </dependency> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-opentsdb</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <configuration> - <createDependencyReducedPom>true</createDependencyReducedPom> - <filters> - <filter> - <artifact>*:*</artifact> - <excludes> - <exclude>META-INF/*.SF</exclude> - <exclude>META-INF/*.sf</exclude> - <exclude>META-INF/*.DSA</exclude> - <exclude>META-INF/*.dsa</exclude> - <exclude>META-INF/*.RSA</exclude> - <exclude>META-INF/*.rsa</exclude> - <exclude>META-INF/*.EC</exclude> - <exclude>META-INF/*.ec</exclude> - <exclude>META-INF/MSFTSIG.SF</exclude> - <exclude>META-INF/MSFTSIG.RSA</exclude> - </excludes> - </filter> - </filters> - </configuration> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> - <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> - </transformer> - </transformers> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - <!--Note - the version would be inherited--> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-pmd-plugin</artifactId> - </plugin> - </plugins> - </build> -</project> diff --git a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/MetricGenBatchSpout.java b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/MetricGenBatchSpout.java deleted file mode 100644 index 85b2fb218..000000000 --- a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/MetricGenBatchSpout.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.storm.opentsdb; - -import com.google.common.collect.Lists; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; - -import org.apache.storm.Config; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.spout.IBatchSpout; -import org.apache.storm.tuple.Fields; - -/** - * BatchSpout implementation for metrics generation. - */ -public class MetricGenBatchSpout implements IBatchSpout { - - private int batchSize; - private final Map<Long, List<List<Object>>> batches = new HashMap<>(); - - public MetricGenBatchSpout(int batchSize) { - this.batchSize = batchSize; - } - - @Override - public void open(Map<String, Object> conf, TopologyContext context) { - - } - - @Override - public void emitBatch(long batchId, TridentCollector collector) { - List<List<Object>> values; - if (batches.containsKey(batchId)) { - values = batches.get(batchId); - } else { - values = new ArrayList<>(); - for (int i = 0; i < batchSize; i++) { - // tuple values are mapped with - // metric, timestamp, value, Map of tagK/tagV respectively. - values.add(Lists.newArrayList(Lists.newArrayList("device.temp", System.currentTimeMillis(), new Random().nextLong(), - Collections.singletonMap("loc.id", new Random().nextInt() % 64 + "")))); - } - batches.put(batchId, values); - } - for (List<Object> value : values) { - collector.emit(value); - } - - } - - @Override - public void ack(long batchId) { - batches.remove(batchId); - } - - @Override - public void close() { - - } - - @Override - public Map<String, Object> getComponentConfiguration() { - Config conf = new Config(); - conf.setMaxTaskParallelism(1); - return conf; - } - - @Override - public Fields getOutputFields() { - return MetricGenSpout.DEFAULT_METRIC_FIELDS; - } -} diff --git a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/MetricGenSpout.java b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/MetricGenSpout.java deleted file mode 100644 index b9632611a..000000000 --- a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/MetricGenSpout.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.storm.opentsdb; - -import com.google.common.collect.Lists; - -import java.util.Collections; -import java.util.Map; -import java.util.Random; - -import org.apache.storm.opentsdb.bolt.TupleOpenTsdbDatapointMapper; -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichSpout; -import org.apache.storm.tuple.Fields; - -/** - * Spout to generate tuples containing metric data. - */ -public class MetricGenSpout extends BaseRichSpout { - - public static final Fields DEFAULT_METRIC_FIELDS = - new Fields(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER.getMetricField(), - TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER.getTimestampField(), - TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER.getValueField(), - TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER.getTagsField()); - - private Map<String, Object> conf; - private TopologyContext context; - private SpoutOutputCollector collector; - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(DEFAULT_METRIC_FIELDS); - } - - @Override - public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { - this.conf = conf; - this.context = context; - this.collector = collector; - } - - @Override - public void nextTuple() { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - // ignore - } - // tuple values are mapped with - // metric, timestamp, value, Map of tagK/tagV respectively. - collector.emit(Lists.newArrayList("device.temp", System.currentTimeMillis(), new Random().nextLong(), - Collections.singletonMap("loc.id", new Random().nextInt() % 64 + ""))); - } -} diff --git a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java deleted file mode 100644 index 23a086cb9..000000000 --- a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.storm.opentsdb; - -import java.util.Collections; - -import org.apache.storm.Config; -import org.apache.storm.StormSubmitter; -import org.apache.storm.opentsdb.bolt.OpenTsdbBolt; -import org.apache.storm.opentsdb.bolt.TupleOpenTsdbDatapointMapper; -import org.apache.storm.opentsdb.client.OpenTsdbClient; -import org.apache.storm.topology.TopologyBuilder; - -/** - * Sample application to use OpenTSDB bolt. - */ -public class SampleOpenTsdbBoltTopology { - - public static void main(String[] args) throws Exception { - if (args.length == 0) { - throw new IllegalArgumentException("There should be at least one argument. Run as `SampleOpenTsdbBoltTopology <tsdb-url>`"); - } - - TopologyBuilder topologyBuilder = new TopologyBuilder(); - - topologyBuilder.setSpout("metric-gen", new MetricGenSpout(), 5); - - String openTsdbUrl = args[0]; - OpenTsdbClient.Builder builder = OpenTsdbClient.newBuilder(openTsdbUrl).sync(30_000).returnDetails(); - final OpenTsdbBolt openTsdbBolt = new OpenTsdbBolt(builder, Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER)); - openTsdbBolt.withBatchSize(10).withFlushInterval(2).failTupleForFailedMetrics(); - topologyBuilder.setBolt("opentsdb", openTsdbBolt).shuffleGrouping("metric-gen"); - - Config conf = new Config(); - conf.setDebug(true); - String topoName = "word-count"; - if (args.length > 1) { - topoName = args[1]; - } - conf.setNumWorkers(3); - - StormSubmitter.submitTopologyWithProgressBar(topoName, conf, topologyBuilder.createTopology()); - } -} diff --git a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java deleted file mode 100644 index 1b1a2cf7b..000000000 --- a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.storm.opentsdb; - -import java.util.Collections; - -import org.apache.storm.Config; -import org.apache.storm.StormSubmitter; -import org.apache.storm.opentsdb.bolt.TupleOpenTsdbDatapointMapper; -import org.apache.storm.opentsdb.client.OpenTsdbClient; -import org.apache.storm.opentsdb.trident.OpenTsdbStateFactory; -import org.apache.storm.opentsdb.trident.OpenTsdbStateUpdater; -import org.apache.storm.trident.Stream; -import org.apache.storm.trident.TridentTopology; -import org.apache.storm.trident.operation.Consumer; -import org.apache.storm.trident.tuple.TridentTuple; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Sample trident topology to store time series metrics in to OpenTsdb. - */ -public class SampleOpenTsdbTridentTopology { - private static final Logger LOG = LoggerFactory.getLogger(SampleOpenTsdbTridentTopology.class); - - public static void main(String[] args) throws Exception { - if (args.length == 0) { - throw new IllegalArgumentException("There should be at least one argument. Run as `SampleOpenTsdbTridentTopology <tsdb-url>`"); - } - - String tsdbUrl = args[0]; - - - final OpenTsdbClient.Builder openTsdbClientBuilder = OpenTsdbClient.newBuilder(tsdbUrl); - final OpenTsdbStateFactory openTsdbStateFactory = - new OpenTsdbStateFactory(openTsdbClientBuilder, - Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER)); - - TridentTopology tridentTopology = new TridentTopology(); - final Stream stream = tridentTopology.newStream("metric-tsdb-stream", new MetricGenBatchSpout(10)); - - stream.peek(new Consumer() { - @Override - public void accept(TridentTuple input) { - LOG.info("########### Received tuple: [{}]", input); - } - }).partitionPersist(openTsdbStateFactory, MetricGenSpout.DEFAULT_METRIC_FIELDS, new OpenTsdbStateUpdater()); - - - Config conf = new Config(); - conf.setDebug(true); - String topoName = "word-count"; - if (args.length > 1) { - topoName = args[1]; - } - conf.setNumWorkers(3); - - StormSubmitter.submitTopologyWithProgressBar(topoName, conf, tridentTopology.build()); - } -} diff --git a/external/storm-opentsdb/README.md b/external/storm-opentsdb/README.md deleted file mode 100644 index 39db5257c..000000000 --- a/external/storm-opentsdb/README.md +++ /dev/null @@ -1,76 +0,0 @@ -# Storm OpenTSDB Bolt and TridentState - -OpenTSDB offers a scalable and highly available storage for time series data. It consists of a -Time Series Daemon (TSD) servers along with command line utilities. Each TSD connects to the -configured HBase cluster to push/query the data. - -Time series data point consists of: - - a metric name. - - a UNIX timestamp (seconds or milliseconds since Epoch). - - a value (64 bit integer or single-precision floating point value). - - a set of tags (key-value pairs) that describe the time series the point belongs to. - -Storm bolt and trident state creates the above time series data from a tuple based on the given `TupleMetricPointMapper` - -This module provides core Storm and Trident bolt implementations for writing data to OpenTSDB. - -Time series data points are written with at-least-once guarantee and duplicate data points should be handled as mentioned [here](http://opentsdb.net/docs/build/html/user_guide/writing.html#duplicate-data-points) in OpenTSDB. - -## Examples - -### Core Bolt -Below example describes the usage of core bolt which is `org.apache.storm.opentsdb.bolt.OpenTsdbBolt` - -```java - - OpenTsdbClient.Builder builder = OpenTsdbClient.newBuilder(openTsdbUrl).sync(30_000).returnDetails(); - final OpenTsdbBolt openTsdbBolt = new OpenTsdbBolt(builder, Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER)); - openTsdbBolt.withBatchSize(10).withFlushInterval(2000); - topologyBuilder.setBolt("opentsdb", openTsdbBolt).shuffleGrouping("metric-gen"); - -``` - - -### Trident State - -```java - - final OpenTsdbStateFactory openTsdbStateFactory = - new OpenTsdbStateFactory(OpenTsdbClient.newBuilder(tsdbUrl), - Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER)); - TridentTopology tridentTopology = new TridentTopology(); - - final Stream stream = tridentTopology.newStream("metric-tsdb-stream", new MetricGenSpout()); - - stream.peek(new Consumer() { - @Override - public void accept(TridentTuple input) { - LOG.info("########### Received tuple: [{}]", input); - } - }).partitionPersist(openTsdbStateFactory, MetricGenSpout.DEFAULT_METRIC_FIELDS, new OpenTsdbStateUpdater()); - -``` - -## License - -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. - -## Committer Sponsors - - * Sriharha Chintalapani ([[email protected]](mailto:[email protected])) - * Jungtaek Lim ([@HeartSaVioR](https://github.com/HeartSaVioR)) diff --git a/external/storm-opentsdb/pom.xml b/external/storm-opentsdb/pom.xml deleted file mode 100644 index dc38d94ee..000000000 --- a/external/storm-opentsdb/pom.xml +++ /dev/null @@ -1,114 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <artifactId>storm</artifactId> - <groupId>org.apache.storm</groupId> - <version>2.6.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <artifactId>storm-opentsdb</artifactId> - - <developers> - <developer> - <id>satishd</id> - <name>Satish Duggana</name> - <email>[email protected]</email> - </developer> - </developers> - - <dependencies> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-client</artifactId> - <version>${project.version}</version> - <scope>${provided.scope}</scope> - </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - <version>${jackson.databind.version}</version> - </dependency> - <dependency> - <groupId>org.glassfish.jersey.core</groupId> - <artifactId>jersey-client</artifactId> - </dependency> - <dependency> - <groupId>org.glassfish.jersey.media</groupId> - <artifactId>jersey-media-json-jackson</artifactId> - </dependency> - <dependency> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient</artifactId> - </dependency> - <dependency> - <groupId>org.glassfish.jersey.connectors</groupId> - <artifactId>jersey-apache-connector</artifactId> - </dependency> - <!-- Extra Java 11 jars for Jersey. Jersey's dependency tree only includes these on Java 11, - so we need to include them manually to ensure that Java 8 builds work on Java 11. --> - <dependency> - <groupId>com.sun.activation</groupId> - <artifactId>jakarta.activation</artifactId> - </dependency> - <dependency> - <groupId>jakarta.activation</groupId> - <artifactId>jakarta.activation-api</artifactId> - </dependency> - <dependency> - <groupId>jakarta.xml.bind</groupId> - <artifactId>jakarta.xml.bind-api</artifactId> - </dependency> - <!-- End extra Jersey Java 11 jars --> - - <!--test dependencies --> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - </dependency> - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest</artifactId> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - <!--Note - the version would be inherited--> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-pmd-plugin</artifactId> - </plugin> - </plugins> - </build> -</project> diff --git a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/OpenTsdbMetricDatapoint.java b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/OpenTsdbMetricDatapoint.java deleted file mode 100644 index 0b47e107a..000000000 --- a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/OpenTsdbMetricDatapoint.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.storm.opentsdb; - -import java.io.Serializable; -import java.util.Collections; -import java.util.Map; - -/** - * This class represents a metric data point in OpenTSDB's format. - */ -public class OpenTsdbMetricDatapoint implements Serializable { - - // metric name - private final String metric; - - // map of tag value pairs - private final Map<String, String> tags; - - // timestamp either in milliseconds or seconds at which this metric is occurred. - private final long timestamp; - - // value of the metric - private final Number value; - - // required for jackson serialization - private OpenTsdbMetricDatapoint() { - this(null, null, 0L, null); - } - - public OpenTsdbMetricDatapoint(String metric, Map<String, String> tags, long timestamp, Number value) { - this.metric = metric; - this.tags = Collections.unmodifiableMap(tags); - this.timestamp = timestamp; - this.value = value; - - if (!(value instanceof Integer || value instanceof Long || value instanceof Float)) { - throw new RuntimeException("Received tuple contains unsupported value: " + value + " field. It must be Integer/Long/Float."); - } - - } - - /** - * Retrieve the metric name of this datapoint. - * @return metric name of this datapoint - */ - public String getMetric() { - return metric; - } - - /** - * Retrieve the map of tag/value pairs of this metric. - * @return Map of tag/value pairs of this metric - */ - public Map<String, String> getTags() { - return tags; - } - - /** - * Retrieve the timestamp at which this metric occured. - * @return timestamp either in milliseconds or seconds at which this metric occurred - */ - public long getTimestamp() { - return timestamp; - } - - /** - * Retrieve the value of this metric datapoint. - * @return value of this metric datapoint - */ - public Object getValue() { - return value; - } - - @Override - public String toString() { - return "OpenTsdbMetricDataPoint{" - + "metric='" + metric + '\'' - + ", tags=" + tags - + ", timestamp=" + timestamp - + ", value=" + value - + '}'; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof OpenTsdbMetricDatapoint)) { - return false; - } - - OpenTsdbMetricDatapoint that = (OpenTsdbMetricDatapoint) o; - - if (timestamp != that.timestamp) { - return false; - } - if (value != that.value) { - return false; - } - if (!metric.equals(that.metric)) { - return false; - } - return tags.equals(that.tags); - - } - - @Override - public int hashCode() { - int result = metric.hashCode(); - result = 31 * result + tags.hashCode(); - result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); - result = 31 * result + value.hashCode(); - return result; - } -} diff --git a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/ITupleOpenTsdbDatapointMapper.java b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/ITupleOpenTsdbDatapointMapper.java deleted file mode 100644 index c385bba3a..000000000 --- a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/ITupleOpenTsdbDatapointMapper.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.storm.opentsdb.bolt; - -import java.io.Serializable; - -import org.apache.storm.opentsdb.OpenTsdbMetricDatapoint; -import org.apache.storm.tuple.ITuple; - -/** - * This class gives a mapping of a {@link ITuple} with {@link OpenTsdbMetricDatapoint}. - */ -public interface ITupleOpenTsdbDatapointMapper extends Serializable { - - /** - * Returns a {@link OpenTsdbMetricDatapoint} for a given {@code tuple}. - * - * @param tuple tuple instance - */ - OpenTsdbMetricDatapoint getMetricPoint(ITuple tuple); - -} diff --git a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/OpenTsdbBolt.java b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/OpenTsdbBolt.java deleted file mode 100644 index 2a3688b51..000000000 --- a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/OpenTsdbBolt.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.storm.opentsdb.bolt; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.storm.opentsdb.OpenTsdbMetricDatapoint; -import org.apache.storm.opentsdb.client.ClientResponse; -import org.apache.storm.opentsdb.client.OpenTsdbClient; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichBolt; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.BatchHelper; -import org.apache.storm.utils.TupleUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Basic bolt implementation for storing timeseries datapoints to OpenTSDB. - * <p/> - * List of {@link OpenTsdbMetricDatapoint} is generated for each tuple with the configured {@code tupleOpenTsdbDatapointMappers}. - * All these datapoints are batched till the given {@code batchSize} or {@code flushIntervalInSeconds} is reached. - * <p/> - * - * <p>Example topology: - * <blockquote><pre> - * OpenTsdbClient.Builder builder = OpenTsdbClient.newBuilder(openTsdbUrl).sync(30_000).returnDetails(); - * final OpenTsdbBolt openTsdbBolt = new OpenTsdbBolt(builder, Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER)); - * openTsdbBolt.withBatchSize(10).withFlushInterval(2).failTupleForFailedMetrics(); - * topologyBuilder.setBolt("opentsdb", openTsdbBolt).shuffleGrouping("metric-gen"); - * </pre></blockquote> - * - */ -public class OpenTsdbBolt extends BaseRichBolt { - private static final Logger LOG = LoggerFactory.getLogger(OpenTsdbBolt.class); - - private final OpenTsdbClient.Builder openTsdbClientBuilder; - private final List<? extends ITupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers; - private int batchSize; - private int flushIntervalInSeconds; - private boolean failTupleForFailedMetrics; - - private BatchHelper batchHelper; - private OpenTsdbClient openTsdbClient; - private Map<OpenTsdbMetricDatapoint, Tuple> metricPointsWithTuple = new HashMap<>(); - private OutputCollector collector; - - public OpenTsdbBolt(OpenTsdbClient.Builder openTsdbClientBuilder, - ITupleOpenTsdbDatapointMapper tupleOpenTsdbDatapointMapper) { - this.openTsdbClientBuilder = openTsdbClientBuilder; - this.tupleOpenTsdbDatapointMappers = Collections.singletonList(tupleOpenTsdbDatapointMapper); - } - - public OpenTsdbBolt(OpenTsdbClient.Builder openTsdbClientBuilder, - List<? extends ITupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers) { - this.openTsdbClientBuilder = openTsdbClientBuilder; - this.tupleOpenTsdbDatapointMappers = tupleOpenTsdbDatapointMappers; - } - - public OpenTsdbBolt withFlushInterval(int flushIntervalInSeconds) { - this.flushIntervalInSeconds = flushIntervalInSeconds; - return this; - } - - public OpenTsdbBolt withBatchSize(int batchSize) { - this.batchSize = batchSize; - return this; - } - - /** - * When it is invoked, this bolt acks only the tuples which have successful metrics stored into OpenTSDB and fails - * the respective tuples of the failed metrics. - * - * @return same instance by setting {@code failTupleForFailedMetrics} to true - */ - public OpenTsdbBolt failTupleForFailedMetrics() { - this.failTupleForFailedMetrics = true; - return this; - } - - @Override - public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) { - this.collector = collector; - batchHelper = new BatchHelper(batchSize, collector); - openTsdbClient = openTsdbClientBuilder.build(); - } - - @Override - public void execute(Tuple tuple) { - try { - if (batchHelper.shouldHandle(tuple)) { - final List<OpenTsdbMetricDatapoint> metricDataPoints = getMetricPoints(tuple); - for (OpenTsdbMetricDatapoint metricDataPoint : metricDataPoints) { - metricPointsWithTuple.put(metricDataPoint, tuple); - } - batchHelper.addBatch(tuple); - } - - if (batchHelper.shouldFlush()) { - LOG.debug("Sending metrics of size [{}]", metricPointsWithTuple.size()); - - ClientResponse.Details clientResponse = openTsdbClient.writeMetricPoints(metricPointsWithTuple.keySet()); - - if (failTupleForFailedMetrics && clientResponse != null && clientResponse.getFailed() > 0) { - final List<ClientResponse.Details.Error> errors = clientResponse.getErrors(); - LOG.error("Some of the metric points failed with errors: [{}]", clientResponse); - if (errors != null && !errors.isEmpty()) { - - Set<Tuple> failedTuples = new HashSet<>(); - for (ClientResponse.Details.Error error : errors) { - final Tuple failedTuple = metricPointsWithTuple.get(error.getDatapoint()); - if (failedTuple != null) { - failedTuples.add(failedTuple); - } - } - - for (Tuple batchedTuple : batchHelper.getBatchTuples()) { - if (failedTuples.contains(batchedTuple)) { - collector.fail(batchedTuple); - } else { - collector.ack(batchedTuple); - } - } - - } else { - throw new RuntimeException("Some of the metric points failed with details: " + errors); - } - } else { - LOG.debug("Acknowledging batched tuples"); - batchHelper.ack(); - } - metricPointsWithTuple.clear(); - } - } catch (Exception e) { - batchHelper.fail(e); - metricPointsWithTuple.clear(); - } - } - - private List<OpenTsdbMetricDatapoint> getMetricPoints(Tuple tuple) { - List<OpenTsdbMetricDatapoint> metricDataPoints = new ArrayList<>(); - for (ITupleOpenTsdbDatapointMapper tupleOpenTsdbDatapointMapper : tupleOpenTsdbDatapointMappers) { - metricDataPoints.add(tupleOpenTsdbDatapointMapper.getMetricPoint(tuple)); - } - - return metricDataPoints; - } - - @Override - public void cleanup() { - openTsdbClient.cleanup(); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - // this is a sink and no result to emit. - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalInSeconds); - } -} diff --git a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/TupleOpenTsdbDatapointMapper.java b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/TupleOpenTsdbDatapointMapper.java deleted file mode 100644 index e2aed5c34..000000000 --- a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/TupleOpenTsdbDatapointMapper.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.storm.opentsdb.bolt; - -import java.util.Map; - -import org.apache.storm.opentsdb.OpenTsdbMetricDatapoint; -import org.apache.storm.tuple.ITuple; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Converts {@link org.apache.storm.tuple.ITuple} to {@link OpenTsdbMetricDatapoint}. - */ -public final class TupleOpenTsdbDatapointMapper implements ITupleOpenTsdbDatapointMapper { - private static final Logger LOG = LoggerFactory.getLogger(TupleOpenTsdbDatapointMapper.class); - - /** - * Default mapper which can be used when the tuple already contains fields mapping metric, timestamp, tags and value. - */ - public static final TupleOpenTsdbDatapointMapper DEFAULT_MAPPER = - new TupleOpenTsdbDatapointMapper("metric", "timestamp", "tags", "value"); - - private final String metricField; - private final String timestampField; - private final String valueField; - private final String tagsField; - - public TupleOpenTsdbDatapointMapper(String metricField, String timestampField, String tagsField, String valueField) { - this.metricField = metricField; - this.timestampField = timestampField; - this.tagsField = tagsField; - this.valueField = valueField; - } - - @Override - public OpenTsdbMetricDatapoint getMetricPoint(ITuple tuple) { - return new OpenTsdbMetricDatapoint( - tuple.getStringByField(metricField), - (Map<String, String>) tuple.getValueByField(tagsField), - tuple.getLongByField(timestampField), - (Number) tuple.getValueByField(valueField)); - } - - /** - * Retrieve metric field name in the tuple. - * @return metric field name in the tuple - */ - public String getMetricField() { - return metricField; - } - - /** - * Retrieve the timestamp field name in the tuple. - * @return timestamp field name in the tuple - */ - public String getTimestampField() { - return timestampField; - } - - /** - * Retrieve the value field name in the tuple. - * @return value field name in the tuple - */ - public String getValueField() { - return valueField; - } - - /** - * Retrieve the tags field name in the tuple. - * @return tags field name in the tuple - */ - public String getTagsField() { - return tagsField; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof TupleOpenTsdbDatapointMapper)) { - return false; - } - - TupleOpenTsdbDatapointMapper that = (TupleOpenTsdbDatapointMapper) o; - - if (!metricField.equals(that.metricField)) { - return false; - } - if (!timestampField.equals(that.timestampField)) { - return false; - } - if (!valueField.equals(that.valueField)) { - return false; - } - return tagsField.equals(that.tagsField); - } - - @Override - public int hashCode() { - int result = metricField.hashCode(); - result = 31 * result + timestampField.hashCode(); - result = 31 * result + valueField.hashCode(); - result = 31 * result + tagsField.hashCode(); - return result; - } - - @Override - public String toString() { - return "TupleOpenTsdbDatapointMapper{" - + "metricField='" + metricField + '\'' - + ", timestampField='" + timestampField + '\'' - + ", valueField='" + valueField + '\'' - + ", tagsField='" + tagsField + '\'' - + '}'; - } -} diff --git a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/client/ClientResponse.java b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/client/ClientResponse.java deleted file mode 100644 index bb51b5ca3..000000000 --- a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/client/ClientResponse.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.storm.opentsdb.client; - -import java.io.Serializable; -import java.util.List; - -import org.apache.storm.opentsdb.OpenTsdbMetricDatapoint; - -/** - * This class represents the response from OpenTsdb for a request sent. - */ -public interface ClientResponse extends Serializable { - - - class Summary implements ClientResponse { - private int failed; - private int success; - private int timeouts; - - public Summary() { - } - - public Summary(int success, int failed, int timeouts) { - this.failed = failed; - this.success = success; - this.timeouts = timeouts; - } - - public int getFailed() { - return failed; - } - - public int getSuccess() { - return success; - } - - public int getTimeouts() { - return timeouts; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof Summary)) { - return false; - } - - Summary summary = (Summary) o; - - if (failed != summary.failed) { - return false; - } - if (success != summary.success) { - return false; - } - return timeouts == summary.timeouts; - - } - - @Override - public int hashCode() { - int result = failed; - result = 31 * result + success; - result = 31 * result + timeouts; - return result; - } - - @Override - public String toString() { - return "Summary{" - + "failed=" + failed - + ", success=" + success - + ", timeouts=" + timeouts - + '}'; - } - } - - class Details extends Summary { - private List<Error> errors; - - public Details() { - } - - public Details(int success, int failed, int timeouts, List<Error> errors) { - super(success, failed, timeouts); - this.errors = errors; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof Details)) { - return false; - } - if (!super.equals(o)) { - return false; - } - - Details details = (Details) o; - - return errors.equals(details.errors); - - } - - public List<Error> getErrors() { - return errors; - } - - @Override - public int hashCode() { - int result = super.hashCode(); - result = 31 * result + errors.hashCode(); - return result; - } - - @Override - public String toString() { - return "Details{" - + "errors=" + errors - + super.toString() - + '}'; - } - - public static class Error implements Serializable { - private String error; - private OpenTsdbMetricDatapoint datapoint; - - public Error() { - } - - public Error(String error, OpenTsdbMetricDatapoint datapoint) { - this.error = error; - this.datapoint = datapoint; - } - - public String getError() { - return error; - } - - public OpenTsdbMetricDatapoint getDatapoint() { - return datapoint; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof Error)) { - return false; - } - - Error error1 = (Error) o; - - if (!error.equals(error1.error)) { - return false; - } - return datapoint.equals(error1.datapoint); - - } - - @Override - public int hashCode() { - int result = error.hashCode(); - result = 31 * result + datapoint.hashCode(); - return result; - } - - @Override - public String toString() { - return "Error{" - + "error='" + error + '\'' - + ", datapoint=" + datapoint - + '}'; - } - } - } -} diff --git a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/client/OpenTsdbClient.java b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/client/OpenTsdbClient.java deleted file mode 100644 index dde938ef4..000000000 --- a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/client/OpenTsdbClient.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.storm.opentsdb.client; - -import com.google.common.base.Preconditions; - -import java.io.Serializable; -import java.util.Collection; -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.Entity; -import javax.ws.rs.client.WebTarget; - -import org.apache.storm.opentsdb.OpenTsdbMetricDatapoint; -import org.glassfish.jersey.apache.connector.ApacheConnectorProvider; -import org.glassfish.jersey.client.ClientConfig; -import org.glassfish.jersey.client.ClientProperties; -import org.glassfish.jersey.client.RequestEntityProcessing; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Client to connect to OpenTsdb TSD for storing timeseries datapoints. - */ -public class OpenTsdbClient { - private static final String PUT_PATH = "/api/put"; - private static Logger LOG = LoggerFactory.getLogger(OpenTsdbClient.class); - - private final String urlString; - private final boolean sync; - private final long syncTimeout; - private final ResponseType responseType; - private final boolean enableChunkedEncoding; - - private WebTarget target; - private Client client; - - public enum ResponseType { - None(""), - Summary("summary"), - Details("details"); - - private final String value; - - ResponseType(String value) { - this.value = value; - } - } - - protected OpenTsdbClient(String urlString, boolean sync, long syncTimeOut, ResponseType responseType, boolean enableChunkedEncoding) { - this.urlString = urlString; - this.sync = sync; - this.syncTimeout = syncTimeOut; - this.responseType = responseType; - this.enableChunkedEncoding = enableChunkedEncoding; - - init(); - } - - private void init() { - - final ApacheConnectorProvider apacheConnectorProvider = new ApacheConnectorProvider(); - final ClientConfig clientConfig = new ClientConfig().connectorProvider(apacheConnectorProvider); - - // transfer encoding should be set as jersey sets it on by default. - clientConfig.property(ClientProperties.REQUEST_ENTITY_PROCESSING, - enableChunkedEncoding ? RequestEntityProcessing.CHUNKED : RequestEntityProcessing.BUFFERED); - - client = ClientBuilder.newClient(clientConfig); - - target = client.target(urlString).path(PUT_PATH); - if (sync) { - // need to add an empty string else it is nto added as query param. - target = target.queryParam("sync", "").queryParam("sync_timeout", syncTimeout); - } - if (responseType != ResponseType.None) { - // need to add an empty string else it is nto added as query param. - target = target.queryParam(responseType.value, ""); - } - - LOG.info("target uri [{}]", target.getUri()); - } - - public ClientResponse.Details writeMetricPoint(OpenTsdbMetricDatapoint metricDataPoint) { - return target.request().post(Entity.json(metricDataPoint), ClientResponse.Details.class); - } - - public ClientResponse.Details writeMetricPoints(Collection<OpenTsdbMetricDatapoint> metricDataPoints) { - LOG.debug("Writing metric points to OpenTSDB [{}]", metricDataPoints.size()); - return target.request().post(Entity.json(metricDataPoints), ClientResponse.Details.class); - } - - public void cleanup() { - client.close(); - } - - public static OpenTsdbClient.Builder newBuilder(String url) { - return new Builder(url); - } - - public static class Builder implements Serializable { - private final String url; - private boolean sync; - private long syncTimeOut; - private boolean enableChunkedEncoding; - private ResponseType responseType = ResponseType.None; - - public Builder(String url) { - this.url = url; - } - - public OpenTsdbClient.Builder sync(long timeoutInMilliSecs) { - Preconditions.checkArgument(timeoutInMilliSecs > 0, "timeout value should be more than zero."); - sync = true; - syncTimeOut = timeoutInMilliSecs; - return this; - } - - public OpenTsdbClient.Builder returnSummary() { - responseType = ResponseType.Summary; - return this; - } - - public OpenTsdbClient.Builder returnDetails() { - responseType = ResponseType.Details; - return this; - } - - public Builder enableChunkedEncoding() { - enableChunkedEncoding = true; - return this; - } - - public OpenTsdbClient build() { - return new OpenTsdbClient(url, sync, syncTimeOut, responseType, enableChunkedEncoding); - } - } -} diff --git a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/trident/OpenTsdbState.java b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/trident/OpenTsdbState.java deleted file mode 100644 index 6614af5f5..000000000 --- a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/trident/OpenTsdbState.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.storm.opentsdb.trident; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.apache.storm.opentsdb.OpenTsdbMetricDatapoint; -import org.apache.storm.opentsdb.bolt.ITupleOpenTsdbDatapointMapper; -import org.apache.storm.opentsdb.client.ClientResponse; -import org.apache.storm.opentsdb.client.OpenTsdbClient; -import org.apache.storm.topology.FailedException; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.state.State; -import org.apache.storm.trident.tuple.TridentTuple; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Trident {@link State} implementation for OpenTSDB. - */ -public class OpenTsdbState implements State { - private static final Logger LOG = LoggerFactory.getLogger(OpenTsdbState.class); - - private final Map<String, Object> conf; - private final OpenTsdbClient.Builder openTsdbClientBuilder; - private final Iterable<? extends ITupleOpenTsdbDatapointMapper> tupleMetricPointMappers; - private OpenTsdbClient openTsdbClient; - - public OpenTsdbState(Map<String, Object> conf, - OpenTsdbClient.Builder openTsdbClientBuilder, - Iterable<? extends ITupleOpenTsdbDatapointMapper> tupleMetricPointMappers) { - this.conf = conf; - this.openTsdbClientBuilder = openTsdbClientBuilder; - this.tupleMetricPointMappers = tupleMetricPointMappers; - } - - public void prepare() { - openTsdbClient = openTsdbClientBuilder.build(); - } - - @Override - public void beginCommit(Long txid) { - - } - - @Override - public void commit(Long txid) { - - } - - public void update(List<TridentTuple> tridentTuples, TridentCollector collector) { - try { - List<OpenTsdbMetricDatapoint> metricDataPoints = new ArrayList<>(); - for (TridentTuple tridentTuple : tridentTuples) { - for (ITupleOpenTsdbDatapointMapper tupleOpenTsdbDatapointMapper : tupleMetricPointMappers) { - metricDataPoints.add(tupleOpenTsdbDatapointMapper.getMetricPoint(tridentTuple)); - } - } - final ClientResponse.Details details = openTsdbClient.writeMetricPoints(metricDataPoints); - - if (details != null && (details.getFailed() > 0)) { - final String errorMsg = "Failed in writing metrics to TSDB with details: " + details; - LOG.error(errorMsg); - throw new RuntimeException(errorMsg); - } - - } catch (Exception e) { - collector.reportError(e); - throw new FailedException(e); - } - - } -} diff --git a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/trident/OpenTsdbStateFactory.java b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/trident/OpenTsdbStateFactory.java deleted file mode 100644 index 01cd2c99e..000000000 --- a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/trident/OpenTsdbStateFactory.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.storm.opentsdb.trident; - -import java.util.List; -import java.util.Map; - -import org.apache.storm.opentsdb.bolt.ITupleOpenTsdbDatapointMapper; -import org.apache.storm.opentsdb.client.OpenTsdbClient; -import org.apache.storm.task.IMetricsContext; -import org.apache.storm.trident.state.State; -import org.apache.storm.trident.state.StateFactory; - -/** - * Trident {@link StateFactory} implementation for OpenTSDB. - */ -public class OpenTsdbStateFactory implements StateFactory { - - private OpenTsdbClient.Builder builder; - private final List<? extends ITupleOpenTsdbDatapointMapper> tridentTupleOpenTsdbDatapointMappers; - - public OpenTsdbStateFactory(OpenTsdbClient.Builder builder, - List<? extends ITupleOpenTsdbDatapointMapper> tridentTupleOpenTsdbDatapointMappers) { - this.builder = builder; - this.tridentTupleOpenTsdbDatapointMappers = tridentTupleOpenTsdbDatapointMappers; - } - - @Override - public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { - final OpenTsdbState openTsdbState = new OpenTsdbState(conf, builder, tridentTupleOpenTsdbDatapointMappers); - openTsdbState.prepare(); - - return openTsdbState; - } -} diff --git a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/trident/OpenTsdbStateUpdater.java b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/trident/OpenTsdbStateUpdater.java deleted file mode 100644 index 7330c9452..000000000 --- a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/trident/OpenTsdbStateUpdater.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.storm.opentsdb.trident; - -import java.util.List; - -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.state.BaseStateUpdater; -import org.apache.storm.trident.tuple.TridentTuple; - -/** - * Trident {@link org.apache.storm.trident.state.StateUpdater} implementation for OpenTSDB. - */ -public class OpenTsdbStateUpdater extends BaseStateUpdater<OpenTsdbState> { - - @Override - public void updateState(OpenTsdbState state, List<TridentTuple> tuples, TridentCollector collector) { - state.update(tuples, collector); - } -} diff --git a/pom.xml b/pom.xml index 32b49bed8..c04ac0950 100644 --- a/pom.xml +++ b/pom.xml @@ -497,7 +497,6 @@ <module>external/storm-metrics</module> <module>external/storm-kafka-client</module> <module>external/storm-kafka-migration</module> - <module>external/storm-opentsdb</module> <module>external/storm-kafka-monitor</module> <module>external/storm-jms</module> <module>external/storm-pmml</module> @@ -519,7 +518,6 @@ <module>examples/storm-starter</module> <module>examples/storm-loadgen</module> <module>examples/storm-redis-examples</module> - <module>examples/storm-opentsdb-examples</module> <module>examples/storm-solr-examples</module> <module>examples/storm-kafka-client-examples</module> <module>examples/storm-jdbc-examples</module>
