Fix line endings so that the diff is meaningful. Signed-off-by: Shanyu Zhao <shz...@microsoft.com>
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1f13f15d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1f13f15d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1f13f15d Branch: refs/heads/master Commit: 1f13f15d0c0de233fd4cc4ff6d6de586c4736142 Parents: e515492 Author: Shanyu Zhao <shz...@microsoft.com> Authored: Wed May 13 14:50:02 2015 -0700 Committer: Shanyu Zhao <shz...@microsoft.com> Committed: Wed May 13 14:50:02 2015 -0700 ---------------------------------------------------------------------- external/storm-eventhubs/pom.xml | 236 +++++++++---------- .../eventhubs/bolt/DefaultEventDataFormat.java | 94 ++++---- .../storm/eventhubs/bolt/EventHubBolt.java | 202 ++++++++-------- .../eventhubs/bolt/EventHubBoltConfig.java | 214 ++++++++--------- .../storm/eventhubs/bolt/IEventDataFormat.java | 56 ++--- .../storm/eventhubs/client/EventHubClient.java | 190 +++++++-------- .../storm/eventhubs/client/EventHubSender.java | 198 ++++++++-------- .../storm/eventhubs/samples/EventHubLoop.java | 104 ++++---- 8 files changed, 647 insertions(+), 647 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml index 2ceed09..2dfb739 100755 --- a/external/storm-eventhubs/pom.xml +++ b/external/storm-eventhubs/pom.xml @@ -1,119 +1,119 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <artifactId>storm</artifactId> - <groupId>org.apache.storm</groupId> - <version>0.11.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <artifactId>storm-eventhubs</artifactId> - <version>0.11.0-SNAPSHOT</version> - <packaging>jar</packaging> - <name>storm-eventhubs</name> - <description>EventHubs Storm Spout</description> - - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <qpid.version>0.32</qpid.version> - </properties> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <version>2.3</version> - <executions> - <execution> - <goals> - <goal>shade</goal> - </goals> - <phase>package</phase> - </execution> - </executions> - <configuration> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"> - </transformer> - </transformers> - <outputFile>target/${project.artifactId}-${project.version}-jar-with-dependencies.jar</outputFile> - </configuration> - </plugin> - <plugin> - <artifactId>maven-antrun-plugin</artifactId> - <executions> - <execution> - <phase>package</phase> - <configuration> - <tasks> - <copy file="src/main/resources/config.properties" tofile="target/eventhubs-config.properties"/> - </tasks> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> - <dependencies> - <dependency> - <groupId>org.apache.qpid</groupId> - <artifactId>qpid-client</artifactId> - <version>${qpid.version}</version> - </dependency> - <dependency> - <groupId>org.apache.qpid</groupId> - <artifactId>qpid-amqp-1-0-client-jms</artifactId> - <version>${qpid.version}</version> - </dependency> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-core</artifactId> - <version>${project.version}</version> - <!-- keep storm out of the jar-with-dependencies --> - <type>jar</type> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.curator</groupId> - <artifactId>curator-framework</artifactId> - <version>${curator.version}</version> - <exclusions> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.11</version> - <scope>test</scope> - </dependency> - </dependencies> +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>storm</artifactId> + <groupId>org.apache.storm</groupId> + <version>0.11.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>storm-eventhubs</artifactId> + <version>0.11.0-SNAPSHOT</version> + <packaging>jar</packaging> + <name>storm-eventhubs</name> + <description>EventHubs Storm Spout</description> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <qpid.version>0.32</qpid.version> + </properties> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.3</version> + <executions> + <execution> + <goals> + <goal>shade</goal> + </goals> + <phase>package</phase> + </execution> + </executions> + <configuration> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"> + </transformer> + </transformers> + <outputFile>target/${project.artifactId}-${project.version}-jar-with-dependencies.jar</outputFile> + </configuration> + </plugin> + <plugin> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <configuration> + <tasks> + <copy file="src/main/resources/config.properties" tofile="target/eventhubs-config.properties"/> + </tasks> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + <dependencies> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-client</artifactId> + <version>${qpid.version}</version> + </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-amqp-1-0-client-jms</artifactId> + <version>${qpid.version}</version> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${project.version}</version> + <!-- keep storm out of the jar-with-dependencies --> + <type>jar</type> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + <version>${curator.version}</version> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java index 1bd8288..6b3eba7 100644 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java @@ -1,47 +1,47 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ -package org.apache.storm.eventhubs.bolt; - -import backtype.storm.tuple.Tuple; - -/** - * A default implementation of IEventDataFormat that converts the tuple - * into a delimited string. - */ -public class DefaultEventDataFormat implements IEventDataFormat { - private static final long serialVersionUID = 1L; - private String delimiter = ","; - - public DefaultEventDataFormat withFieldDelimiter(String delimiter) { - this.delimiter = delimiter; - return this; - } - - @Override - public byte[] serialize(Tuple tuple) { - StringBuilder sb = new StringBuilder(); - for(Object obj : tuple.getValues()) { - if(sb.length() != 0) { - sb.append(delimiter); - } - sb.append(obj.toString()); - } - return sb.toString().getBytes(); - } - -} +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.apache.storm.eventhubs.bolt; + +import backtype.storm.tuple.Tuple; + +/** + * A default implementation of IEventDataFormat that converts the tuple + * into a delimited string. + */ +public class DefaultEventDataFormat implements IEventDataFormat { + private static final long serialVersionUID = 1L; + private String delimiter = ","; + + public DefaultEventDataFormat withFieldDelimiter(String delimiter) { + this.delimiter = delimiter; + return this; + } + + @Override + public byte[] serialize(Tuple tuple) { + StringBuilder sb = new StringBuilder(); + for(Object obj : tuple.getValues()) { + if(sb.length() != 0) { + sb.append(delimiter); + } + sb.append(obj.toString()); + } + return sb.toString().getBytes(); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java index 09f90b1..a817744 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java @@ -1,101 +1,101 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ -package org.apache.storm.eventhubs.bolt; - -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.storm.eventhubs.client.EventHubClient; -import org.apache.storm.eventhubs.client.EventHubException; -import org.apache.storm.eventhubs.client.EventHubSender; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Tuple; - -/** - * A bolt that writes event message to EventHub. - */ -public class EventHubBolt extends BaseRichBolt { - private static final long serialVersionUID = 1L; - private static final Logger logger = LoggerFactory - .getLogger(EventHubBolt.class); - - protected OutputCollector collector; - protected EventHubSender sender; - protected EventHubBoltConfig boltConfig; - - - public EventHubBolt(String connectionString, String entityPath) { - boltConfig = new EventHubBoltConfig(connectionString, entityPath); - } - - public EventHubBolt(String userName, String password, String namespace, - String entityPath, boolean partitionMode) { - boltConfig = new EventHubBoltConfig(userName, password, namespace, - entityPath, partitionMode); - } - - public EventHubBolt(EventHubBoltConfig config) { - boltConfig = config; - } - - @Override - public void prepare(Map config, TopologyContext context, OutputCollector collector) { - this.collector = collector; - String myPartitionId = null; - if(boltConfig.getPartitionMode()) { - //We can use the task index (starting from 0) as the partition ID - myPartitionId = "" + context.getThisTaskIndex(); - } - logger.info("creating sender: " + boltConfig.getConnectionString() - + ", " + boltConfig.getEntityPath() + ", " + myPartitionId); - try { - EventHubClient eventHubClient = EventHubClient.create( - boltConfig.getConnectionString(), boltConfig.getEntityPath()); - sender = eventHubClient.createPartitionSender(myPartitionId); - } - catch(Exception ex) { - logger.error(ex.getMessage()); - throw new RuntimeException(ex); - } - - } - - @Override - public void execute(Tuple tuple) { - try { - sender.send(boltConfig.getEventDataFormat().serialize(tuple)); - collector.ack(tuple); - } - catch(EventHubException ex) { - logger.error(ex.getMessage()); - collector.fail(tuple); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - - } - -} +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.apache.storm.eventhubs.bolt; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.storm.eventhubs.client.EventHubClient; +import org.apache.storm.eventhubs.client.EventHubException; +import org.apache.storm.eventhubs.client.EventHubSender; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Tuple; + +/** + * A bolt that writes event message to EventHub. + */ +public class EventHubBolt extends BaseRichBolt { + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory + .getLogger(EventHubBolt.class); + + protected OutputCollector collector; + protected EventHubSender sender; + protected EventHubBoltConfig boltConfig; + + + public EventHubBolt(String connectionString, String entityPath) { + boltConfig = new EventHubBoltConfig(connectionString, entityPath); + } + + public EventHubBolt(String userName, String password, String namespace, + String entityPath, boolean partitionMode) { + boltConfig = new EventHubBoltConfig(userName, password, namespace, + entityPath, partitionMode); + } + + public EventHubBolt(EventHubBoltConfig config) { + boltConfig = config; + } + + @Override + public void prepare(Map config, TopologyContext context, OutputCollector collector) { + this.collector = collector; + String myPartitionId = null; + if(boltConfig.getPartitionMode()) { + //We can use the task index (starting from 0) as the partition ID + myPartitionId = "" + context.getThisTaskIndex(); + } + logger.info("creating sender: " + boltConfig.getConnectionString() + + ", " + boltConfig.getEntityPath() + ", " + myPartitionId); + try { + EventHubClient eventHubClient = EventHubClient.create( + boltConfig.getConnectionString(), boltConfig.getEntityPath()); + sender = eventHubClient.createPartitionSender(myPartitionId); + } + catch(Exception ex) { + logger.error(ex.getMessage()); + throw new RuntimeException(ex); + } + + } + + @Override + public void execute(Tuple tuple) { + try { + sender.send(boltConfig.getEventDataFormat().serialize(tuple)); + collector.ack(tuple); + } + catch(EventHubException ex) { + logger.error(ex.getMessage()); + collector.fail(tuple); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java index 909e8ac..4383a72 100644 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java @@ -1,107 +1,107 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ -package org.apache.storm.eventhubs.bolt; - -import java.io.Serializable; - -import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; - -/* - * EventHubs bolt configurations - * - * Partition mode: - * With partitionMode=true you need to create the same number of tasks as the number of - * EventHubs partitions, and each bolt task will only send data to one partition. - * The partition ID is the task ID of the bolt. - * - * Event format: - * The formatter to convert tuple to bytes for EventHubs. - * if null, the default format is common delimited tuple fields. - */ -public class EventHubBoltConfig implements Serializable { - private static final long serialVersionUID = 1L; - - private String connectionString; - private final String entityPath; - protected boolean partitionMode; - protected IEventDataFormat dataFormat; - - public EventHubBoltConfig(String connectionString, String entityPath) { - this(connectionString, entityPath, false, null); - } - - public EventHubBoltConfig(String connectionString, String entityPath, - boolean partitionMode) { - this(connectionString, entityPath, partitionMode, null); - } - - public EventHubBoltConfig(String userName, String password, String namespace, - String entityPath, boolean partitionMode) { - this(userName, password, namespace, - EventHubSpoutConfig.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode); - } - - public EventHubBoltConfig(String connectionString, String entityPath, - boolean partitionMode, IEventDataFormat dataFormat) { - this.connectionString = connectionString; - this.entityPath = entityPath; - this.partitionMode = partitionMode; - this.dataFormat = dataFormat; - if(this.dataFormat == null) { - this.dataFormat = new DefaultEventDataFormat(); - } - } - - public EventHubBoltConfig(String userName, String password, String namespace, - String targetFqnAddress, String entityPath) { - this(userName, password, namespace, targetFqnAddress, entityPath, false, null); - } - - public EventHubBoltConfig(String userName, String password, String namespace, - String targetFqnAddress, String entityPath, boolean partitionMode) { - this(userName, password, namespace, targetFqnAddress, entityPath, partitionMode, null); - } - - public EventHubBoltConfig(String userName, String password, String namespace, - String targetFqnAddress, String entityPath, boolean partitionMode, - IEventDataFormat dataFormat) { - this.connectionString = EventHubSpoutConfig.buildConnectionString(userName, password, namespace, targetFqnAddress); - this.entityPath = entityPath; - this.partitionMode = partitionMode; - this.dataFormat = dataFormat; - if(this.dataFormat == null) { - this.dataFormat = new DefaultEventDataFormat(); - } - } - - public String getConnectionString() { - return connectionString; - } - - public String getEntityPath() { - return entityPath; - } - - public boolean getPartitionMode() { - return partitionMode; - } - - public IEventDataFormat getEventDataFormat() { - return dataFormat; - } -} +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.apache.storm.eventhubs.bolt; + +import java.io.Serializable; + +import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; + +/* + * EventHubs bolt configurations + * + * Partition mode: + * With partitionMode=true you need to create the same number of tasks as the number of + * EventHubs partitions, and each bolt task will only send data to one partition. + * The partition ID is the task ID of the bolt. + * + * Event format: + * The formatter to convert tuple to bytes for EventHubs. + * if null, the default format is common delimited tuple fields. + */ +public class EventHubBoltConfig implements Serializable { + private static final long serialVersionUID = 1L; + + private String connectionString; + private final String entityPath; + protected boolean partitionMode; + protected IEventDataFormat dataFormat; + + public EventHubBoltConfig(String connectionString, String entityPath) { + this(connectionString, entityPath, false, null); + } + + public EventHubBoltConfig(String connectionString, String entityPath, + boolean partitionMode) { + this(connectionString, entityPath, partitionMode, null); + } + + public EventHubBoltConfig(String userName, String password, String namespace, + String entityPath, boolean partitionMode) { + this(userName, password, namespace, + EventHubSpoutConfig.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode); + } + + public EventHubBoltConfig(String connectionString, String entityPath, + boolean partitionMode, IEventDataFormat dataFormat) { + this.connectionString = connectionString; + this.entityPath = entityPath; + this.partitionMode = partitionMode; + this.dataFormat = dataFormat; + if(this.dataFormat == null) { + this.dataFormat = new DefaultEventDataFormat(); + } + } + + public EventHubBoltConfig(String userName, String password, String namespace, + String targetFqnAddress, String entityPath) { + this(userName, password, namespace, targetFqnAddress, entityPath, false, null); + } + + public EventHubBoltConfig(String userName, String password, String namespace, + String targetFqnAddress, String entityPath, boolean partitionMode) { + this(userName, password, namespace, targetFqnAddress, entityPath, partitionMode, null); + } + + public EventHubBoltConfig(String userName, String password, String namespace, + String targetFqnAddress, String entityPath, boolean partitionMode, + IEventDataFormat dataFormat) { + this.connectionString = EventHubSpoutConfig.buildConnectionString(userName, password, namespace, targetFqnAddress); + this.entityPath = entityPath; + this.partitionMode = partitionMode; + this.dataFormat = dataFormat; + if(this.dataFormat == null) { + this.dataFormat = new DefaultEventDataFormat(); + } + } + + public String getConnectionString() { + return connectionString; + } + + public String getEntityPath() { + return entityPath; + } + + public boolean getPartitionMode() { + return partitionMode; + } + + public IEventDataFormat getEventDataFormat() { + return dataFormat; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java index cb05c0f..2003c34 100644 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java @@ -1,28 +1,28 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ -package org.apache.storm.eventhubs.bolt; - -import java.io.Serializable; -import backtype.storm.tuple.Tuple; - -/** - * Serialize a tuple to a byte array to be sent to EventHubs - */ -public interface IEventDataFormat extends Serializable { - public byte[] serialize(Tuple tuple); -} +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.apache.storm.eventhubs.bolt; + +import java.io.Serializable; +import backtype.storm.tuple.Tuple; + +/** + * Serialize a tuple to a byte array to be sent to EventHubs + */ +public interface IEventDataFormat extends Serializable { + public byte[] serialize(Tuple tuple); +} http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java index 2afe5b4..564a26f 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java @@ -1,95 +1,95 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ -package org.apache.storm.eventhubs.client; - -import org.apache.qpid.amqp_1_0.client.Connection; -import org.apache.qpid.amqp_1_0.client.ConnectionErrorException; -import org.apache.qpid.amqp_1_0.client.ConnectionException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class EventHubClient { - - private static final String DefaultConsumerGroupName = "$default"; - private static final Logger logger = LoggerFactory.getLogger(EventHubClient.class); - private static final long ConnectionSyncTimeout = 60000L; - - private final String connectionString; - private final String entityPath; - private final Connection connection; - - private EventHubClient(String connectionString, String entityPath) throws EventHubException { - this.connectionString = connectionString; - this.entityPath = entityPath; - this.connection = this.createConnection(); - } - - /** - * creates a new instance of EventHubClient using the supplied connection string and entity path. - * - * @param connectionString connection string to the namespace of event hubs. connection string format: - * amqps://{userId}:{password}@{namespaceName}.servicebus.windows.net - * @param entityPath the name of event hub entity. - * - * @return EventHubClient - * @throws org.apache.storm.eventhubs.client.EventHubException - */ - public static EventHubClient create(String connectionString, String entityPath) throws EventHubException { - return new EventHubClient(connectionString, entityPath); - } - - public EventHubSender createPartitionSender(String partitionId) throws Exception { - return new EventHubSender(this.connection.createSession(), this.entityPath, partitionId); - } - - public EventHubConsumerGroup getConsumerGroup(String cgName) { - if(cgName == null || cgName.length() == 0) { - cgName = DefaultConsumerGroupName; - } - return new EventHubConsumerGroup(connection, entityPath, cgName); - } - - public void close() { - try { - this.connection.close(); - } catch (ConnectionErrorException e) { - logger.error(e.toString()); - } - } - - private Connection createConnection() throws EventHubException { - ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder(this.connectionString); - Connection clientConnection; - - try { - clientConnection = new Connection( - connectionStringBuilder.getHost(), - connectionStringBuilder.getPort(), - connectionStringBuilder.getUserName(), - connectionStringBuilder.getPassword(), - connectionStringBuilder.getHost(), - connectionStringBuilder.getSsl()); - } catch (ConnectionException e) { - logger.error(e.toString()); - throw new EventHubException(e); - } - clientConnection.getEndpoint().setSyncTimeout(ConnectionSyncTimeout); - SelectorFilterWriter.register(clientConnection.getEndpoint().getDescribedTypeRegistry()); - return clientConnection; - } -} +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.apache.storm.eventhubs.client; + +import org.apache.qpid.amqp_1_0.client.Connection; +import org.apache.qpid.amqp_1_0.client.ConnectionErrorException; +import org.apache.qpid.amqp_1_0.client.ConnectionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EventHubClient { + + private static final String DefaultConsumerGroupName = "$default"; + private static final Logger logger = LoggerFactory.getLogger(EventHubClient.class); + private static final long ConnectionSyncTimeout = 60000L; + + private final String connectionString; + private final String entityPath; + private final Connection connection; + + private EventHubClient(String connectionString, String entityPath) throws EventHubException { + this.connectionString = connectionString; + this.entityPath = entityPath; + this.connection = this.createConnection(); + } + + /** + * creates a new instance of EventHubClient using the supplied connection string and entity path. + * + * @param connectionString connection string to the namespace of event hubs. connection string format: + * amqps://{userId}:{password}@{namespaceName}.servicebus.windows.net + * @param entityPath the name of event hub entity. + * + * @return EventHubClient + * @throws org.apache.storm.eventhubs.client.EventHubException + */ + public static EventHubClient create(String connectionString, String entityPath) throws EventHubException { + return new EventHubClient(connectionString, entityPath); + } + + public EventHubSender createPartitionSender(String partitionId) throws Exception { + return new EventHubSender(this.connection.createSession(), this.entityPath, partitionId); + } + + public EventHubConsumerGroup getConsumerGroup(String cgName) { + if(cgName == null || cgName.length() == 0) { + cgName = DefaultConsumerGroupName; + } + return new EventHubConsumerGroup(connection, entityPath, cgName); + } + + public void close() { + try { + this.connection.close(); + } catch (ConnectionErrorException e) { + logger.error(e.toString()); + } + } + + private Connection createConnection() throws EventHubException { + ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder(this.connectionString); + Connection clientConnection; + + try { + clientConnection = new Connection( + connectionStringBuilder.getHost(), + connectionStringBuilder.getPort(), + connectionStringBuilder.getUserName(), + connectionStringBuilder.getPassword(), + connectionStringBuilder.getHost(), + connectionStringBuilder.getSsl()); + } catch (ConnectionException e) { + logger.error(e.toString()); + throw new EventHubException(e); + } + clientConnection.getEndpoint().setSyncTimeout(ConnectionSyncTimeout); + SelectorFilterWriter.register(clientConnection.getEndpoint().getDescribedTypeRegistry()); + return clientConnection; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java index 7c45578..435893e 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java @@ -1,99 +1,99 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ -package org.apache.storm.eventhubs.client; - -import java.util.concurrent.TimeoutException; -import org.apache.qpid.amqp_1_0.client.LinkDetachedException; -import org.apache.qpid.amqp_1_0.client.Message; -import org.apache.qpid.amqp_1_0.client.Sender; -import org.apache.qpid.amqp_1_0.client.Session; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.messaging.Data; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class EventHubSender { - - private static final Logger logger = LoggerFactory.getLogger(EventHubSender.class); - - private final Session session; - private final String entityPath; - private final String partitionId; - private final String destinationAddress; - - private Sender sender; - - public EventHubSender(Session session, String entityPath, String partitionId) { - this.session = session; - this.entityPath = entityPath; - this.partitionId = partitionId; - this.destinationAddress = this.getDestinationAddress(); - } - - public void send(byte[] data) throws EventHubException { - try { - if (this.sender == null) { - this.ensureSenderCreated(); - } - - Binary bin = new Binary(data); - Message message = new Message(new Data(bin)); - this.sender.send(message); - - } catch (LinkDetachedException e) { - logger.error(e.getMessage()); - - EventHubException eventHubException = new EventHubException("Sender has been closed"); - throw eventHubException; - } catch (TimeoutException e) { - logger.error(e.getMessage()); - - EventHubException eventHubException = new EventHubException("Timed out while waiting to get credit to send"); - throw eventHubException; - } catch (Exception e) { - logger.error(e.getMessage()); - } - } - - public void send(String data) throws EventHubException { - //For interop with other language, convert string to bytes - send(data.getBytes()); - } - - public void close() { - try { - this.sender.close(); - } catch (Sender.SenderClosingException e) { - logger.error("Closing a sender encountered error: " + e.getMessage()); - } - } - - private String getDestinationAddress() { - if (this.partitionId == null || this.partitionId.equals("")) { - return this.entityPath; - } else { - return String.format(Constants.DestinationAddressFormatString, this.entityPath, this.partitionId); - } - } - - private synchronized void ensureSenderCreated() throws Exception { - if (this.sender == null) { - this.sender = this.session.createSender(this.destinationAddress); - } - } -} +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.apache.storm.eventhubs.client; + +import java.util.concurrent.TimeoutException; +import org.apache.qpid.amqp_1_0.client.LinkDetachedException; +import org.apache.qpid.amqp_1_0.client.Message; +import org.apache.qpid.amqp_1_0.client.Sender; +import org.apache.qpid.amqp_1_0.client.Session; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.messaging.Data; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EventHubSender { + + private static final Logger logger = LoggerFactory.getLogger(EventHubSender.class); + + private final Session session; + private final String entityPath; + private final String partitionId; + private final String destinationAddress; + + private Sender sender; + + public EventHubSender(Session session, String entityPath, String partitionId) { + this.session = session; + this.entityPath = entityPath; + this.partitionId = partitionId; + this.destinationAddress = this.getDestinationAddress(); + } + + public void send(byte[] data) throws EventHubException { + try { + if (this.sender == null) { + this.ensureSenderCreated(); + } + + Binary bin = new Binary(data); + Message message = new Message(new Data(bin)); + this.sender.send(message); + + } catch (LinkDetachedException e) { + logger.error(e.getMessage()); + + EventHubException eventHubException = new EventHubException("Sender has been closed"); + throw eventHubException; + } catch (TimeoutException e) { + logger.error(e.getMessage()); + + EventHubException eventHubException = new EventHubException("Timed out while waiting to get credit to send"); + throw eventHubException; + } catch (Exception e) { + logger.error(e.getMessage()); + } + } + + public void send(String data) throws EventHubException { + //For interop with other language, convert string to bytes + send(data.getBytes()); + } + + public void close() { + try { + this.sender.close(); + } catch (Sender.SenderClosingException e) { + logger.error("Closing a sender encountered error: " + e.getMessage()); + } + } + + private String getDestinationAddress() { + if (this.partitionId == null || this.partitionId.equals("")) { + return this.entityPath; + } else { + return String.format(Constants.DestinationAddressFormatString, this.entityPath, this.partitionId); + } + } + + private synchronized void ensureSenderCreated() throws Exception { + if (this.sender == null) { + this.sender = this.session.createSender(this.destinationAddress); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/1f13f15d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java index c908f9d..2f62a23 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/EventHubLoop.java @@ -1,52 +1,52 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ -package org.apache.storm.eventhubs.samples; - -import backtype.storm.generated.StormTopology; -import backtype.storm.topology.TopologyBuilder; - -import org.apache.storm.eventhubs.bolt.EventHubBolt; -import org.apache.storm.eventhubs.bolt.EventHubBoltConfig; -import org.apache.storm.eventhubs.spout.EventHubSpout; - -/** - * A sample topology that loops message back to EventHub - */ -public class EventHubLoop extends EventCount { - - @Override - protected StormTopology buildTopology(EventHubSpout eventHubSpout) { - TopologyBuilder topologyBuilder = new TopologyBuilder(); - - topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount()) - .setNumTasks(spoutConfig.getPartitionCount()); - EventHubBoltConfig boltConfig = new EventHubBoltConfig(spoutConfig.getConnectionString(), - spoutConfig.getEntityPath(), true); - - EventHubBolt eventHubBolt = new EventHubBolt(boltConfig); - int boltTasks = spoutConfig.getPartitionCount(); - topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks) - .localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks); - return topologyBuilder.createTopology(); - } - - public static void main(String[] args) throws Exception { - EventHubLoop scenario = new EventHubLoop(); - scenario.runScenario(args); - } -} +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.apache.storm.eventhubs.samples; + +import backtype.storm.generated.StormTopology; +import backtype.storm.topology.TopologyBuilder; + +import org.apache.storm.eventhubs.bolt.EventHubBolt; +import org.apache.storm.eventhubs.bolt.EventHubBoltConfig; +import org.apache.storm.eventhubs.spout.EventHubSpout; + +/** + * A sample topology that loops message back to EventHub + */ +public class EventHubLoop extends EventCount { + + @Override + protected StormTopology buildTopology(EventHubSpout eventHubSpout) { + TopologyBuilder topologyBuilder = new TopologyBuilder(); + + topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount()) + .setNumTasks(spoutConfig.getPartitionCount()); + EventHubBoltConfig boltConfig = new EventHubBoltConfig(spoutConfig.getConnectionString(), + spoutConfig.getEntityPath(), true); + + EventHubBolt eventHubBolt = new EventHubBolt(boltConfig); + int boltTasks = spoutConfig.getPartitionCount(); + topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks) + .localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks); + return topologyBuilder.createTopology(); + } + + public static void main(String[] args) throws Exception { + EventHubLoop scenario = new EventHubLoop(); + scenario.runScenario(args); + } +}