Remove client adding reference eventhubs-client package and use ResilientEventHubReceiver
Signed-off-by: Shanyu Zhao <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/85aeb3d4 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/85aeb3d4 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/85aeb3d4 Branch: refs/heads/master Commit: 85aeb3d48efcaa28d6fc5dbfe6ce87af9f3e2615 Parents: 1f13f15 Author: Shanyu Zhao <[email protected]> Authored: Sun May 17 01:14:01 2015 -0700 Committer: Shanyu Zhao <[email protected]> Committed: Sun May 17 01:14:01 2015 -0700 ---------------------------------------------------------------------- external/storm-eventhubs/pom.xml | 13 +- .../storm/eventhubs/bolt/EventHubBolt.java | 6 +- .../eventhubs/bolt/EventHubBoltConfig.java | 4 +- .../client/ConnectionStringBuilder.java | 116 ---------------- .../storm/eventhubs/client/Constants.java | 32 ----- .../storm/eventhubs/client/EventHubClient.java | 95 ------------- .../eventhubs/client/EventHubConsumerGroup.java | 72 ---------- .../eventhubs/client/EventHubException.java | 37 ----- .../eventhubs/client/EventHubReceiver.java | 139 ------------------- .../eventhubs/client/EventHubSendClient.java | 70 ---------- .../storm/eventhubs/client/EventHubSender.java | 99 ------------- .../storm/eventhubs/client/SelectorFilter.java | 38 ----- .../eventhubs/client/SelectorFilterWriter.java | 64 --------- .../eventhubs/spout/EventHubReceiverFilter.java | 56 -------- .../eventhubs/spout/EventHubReceiverImpl.java | 50 +++---- .../eventhubs/spout/EventHubSpoutConfig.java | 31 +---- .../eventhubs/spout/IEventHubReceiver.java | 5 +- .../spout/IEventHubReceiverFilter.java | 35 ----- .../eventhubs/spout/SimplePartitionManager.java | 11 +- .../spout/StaticPartitionCoordinator.java | 2 +- .../TransactionalTridentEventHubEmitter.java | 2 +- .../trident/TridentPartitionManager.java | 12 +- .../eventhubs/spout/EventHubReceiverMock.java | 18 +-- 23 files changed, 56 insertions(+), 951 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml index 2dfb739..6d4a47b 100755 --- a/external/storm-eventhubs/pom.xml +++ b/external/storm-eventhubs/pom.xml @@ -33,7 +33,7 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <qpid.version>0.32</qpid.version> + <eventhubs.client.version>0.9</eventhubs.client.version> </properties> <build> <plugins> @@ -77,14 +77,9 @@ </build> <dependencies> <dependency> - <groupId>org.apache.qpid</groupId> - <artifactId>qpid-client</artifactId> - <version>${qpid.version}</version> - </dependency> - <dependency> - <groupId>org.apache.qpid</groupId> - <artifactId>qpid-amqp-1-0-client-jms</artifactId> - <version>${qpid.version}</version> + <groupId>com.microsoft.eventhubs.client</groupId> + <artifactId>eventhubs-client</artifactId> + <version>${eventhubs.client.version}</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java index a817744..9acf7fa 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java @@ -22,9 +22,9 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.storm.eventhubs.client.EventHubClient; -import org.apache.storm.eventhubs.client.EventHubException; -import org.apache.storm.eventhubs.client.EventHubSender; +import com.microsoft.eventhubs.client.EventHubClient; +import com.microsoft.eventhubs.client.EventHubException; +import com.microsoft.eventhubs.client.EventHubSender; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java index 4383a72..10b4e39 100644 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java @@ -20,6 +20,7 @@ package org.apache.storm.eventhubs.bolt; import java.io.Serializable; import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; +import com.microsoft.eventhubs.client.ConnectionStringBuilder; /* * EventHubs bolt configurations @@ -80,7 +81,8 @@ public class EventHubBoltConfig implements Serializable { public EventHubBoltConfig(String userName, String password, String namespace, String targetFqnAddress, String entityPath, boolean partitionMode, IEventDataFormat dataFormat) { - this.connectionString = EventHubSpoutConfig.buildConnectionString(userName, password, namespace, targetFqnAddress); + this.connectionString = new ConnectionStringBuilder(userName, password, + namespace, targetFqnAddress).getConnectionString(); this.entityPath = entityPath; this.partitionMode = partitionMode; this.dataFormat = dataFormat; http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/ConnectionStringBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/ConnectionStringBuilder.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/ConnectionStringBuilder.java deleted file mode 100755 index 518c88d..0000000 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/ConnectionStringBuilder.java +++ /dev/null @@ -1,116 +0,0 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ -package org.apache.storm.eventhubs.client; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLConnection; -import java.net.URLDecoder; -import java.net.URLStreamHandler; - -public class ConnectionStringBuilder { - - private final String connectionString; - - private String host; - private int port; - private String userName; - private String password; - private boolean ssl; - - // amqps://[username]:[password]@[namespace].servicebus.windows.net/ - public ConnectionStringBuilder(String connectionString) throws EventHubException { - this.connectionString = connectionString; - this.initialize(); - } - - public String getHost() { - return this.host; - } - - public void setHost(String value) { - this.host = value; - } - - public int getPort() { - return this.port; - } - - public void setPort(int value) { - this.port = value; - } - - public String getUserName() { - return this.userName; - } - - public void setUserName(String value) { - this.userName = value; - } - - public String getPassword() { - return this.password; - } - - public void setPassword(String value) { - this.password = value; - } - - public boolean getSsl() { - return this.ssl; - } - - public void setSsl(boolean value) { - this.ssl = value; - } - - private void initialize() throws EventHubException { - - URL url; - try { - url = new URL(null, this.connectionString, new NullURLStreamHandler()); - } catch (MalformedURLException e) { - throw new EventHubException("connectionString is not valid.", e); - } - - String protocol = url.getProtocol(); - this.ssl = protocol.equalsIgnoreCase(Constants.SslScheme); - this.host = url.getHost(); - this.port = url.getPort(); - - if (this.port == -1) { - this.port = this.ssl ? Constants.DefaultSslPort : Constants.DefaultPort; - } - - String userInfo = url.getUserInfo(); - if (userInfo != null) { - String[] credentials = userInfo.split(":", 2); - this.userName = URLDecoder.decode(credentials[0]); - this.password = URLDecoder.decode(credentials[1]); - } - } - - class NullURLStreamHandler extends URLStreamHandler { - - @Override - protected URLConnection openConnection(URL u) throws IOException { - throw new UnsupportedOperationException("Not supported yet."); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/Constants.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/Constants.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/Constants.java deleted file mode 100755 index d87ad53..0000000 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/Constants.java +++ /dev/null @@ -1,32 +0,0 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ -package org.apache.storm.eventhubs.client; - -public class Constants { - - public static final String DefaultStartingOffset = "-1"; - public static final String SelectorFilterName = "apache.org:selector-filter:string"; - public static final String OffsetFilterFormatString = "amqp.annotation.x-opt-offset > '%s'"; - public static final String EnqueueTimeFilterFormatString = "amqp.annotation.x-opt-enqueuedtimeutc > %d"; - public static final String ConsumerAddressFormatString = "%s/ConsumerGroups/%s/Partitions/%s"; - public static final String DestinationAddressFormatString = "%s/Partitions/%s"; - - public static final String SslScheme = "amqps"; - public static final int DefaultPort = 5672; - public static final int DefaultSslPort = 5671; -} http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java deleted file mode 100755 index 564a26f..0000000 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java +++ /dev/null @@ -1,95 +0,0 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ -package org.apache.storm.eventhubs.client; - -import org.apache.qpid.amqp_1_0.client.Connection; -import org.apache.qpid.amqp_1_0.client.ConnectionErrorException; -import org.apache.qpid.amqp_1_0.client.ConnectionException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class EventHubClient { - - private static final String DefaultConsumerGroupName = "$default"; - private static final Logger logger = LoggerFactory.getLogger(EventHubClient.class); - private static final long ConnectionSyncTimeout = 60000L; - - private final String connectionString; - private final String entityPath; - private final Connection connection; - - private EventHubClient(String connectionString, String entityPath) throws EventHubException { - this.connectionString = connectionString; - this.entityPath = entityPath; - this.connection = this.createConnection(); - } - - /** - * creates a new instance of EventHubClient using the supplied connection string and entity path. - * - * @param connectionString connection string to the namespace of event hubs. connection string format: - * amqps://{userId}:{password}@{namespaceName}.servicebus.windows.net - * @param entityPath the name of event hub entity. - * - * @return EventHubClient - * @throws org.apache.storm.eventhubs.client.EventHubException - */ - public static EventHubClient create(String connectionString, String entityPath) throws EventHubException { - return new EventHubClient(connectionString, entityPath); - } - - public EventHubSender createPartitionSender(String partitionId) throws Exception { - return new EventHubSender(this.connection.createSession(), this.entityPath, partitionId); - } - - public EventHubConsumerGroup getConsumerGroup(String cgName) { - if(cgName == null || cgName.length() == 0) { - cgName = DefaultConsumerGroupName; - } - return new EventHubConsumerGroup(connection, entityPath, cgName); - } - - public void close() { - try { - this.connection.close(); - } catch (ConnectionErrorException e) { - logger.error(e.toString()); - } - } - - private Connection createConnection() throws EventHubException { - ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder(this.connectionString); - Connection clientConnection; - - try { - clientConnection = new Connection( - connectionStringBuilder.getHost(), - connectionStringBuilder.getPort(), - connectionStringBuilder.getUserName(), - connectionStringBuilder.getPassword(), - connectionStringBuilder.getHost(), - connectionStringBuilder.getSsl()); - } catch (ConnectionException e) { - logger.error(e.toString()); - throw new EventHubException(e); - } - clientConnection.getEndpoint().setSyncTimeout(ConnectionSyncTimeout); - SelectorFilterWriter.register(clientConnection.getEndpoint().getDescribedTypeRegistry()); - return clientConnection; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubConsumerGroup.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubConsumerGroup.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubConsumerGroup.java deleted file mode 100755 index 892ff9c..0000000 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubConsumerGroup.java +++ /dev/null @@ -1,72 +0,0 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ -package org.apache.storm.eventhubs.client; - -import org.apache.qpid.amqp_1_0.client.Connection; -import org.apache.qpid.amqp_1_0.client.ConnectionException; -import org.apache.qpid.amqp_1_0.client.Session; - -public class EventHubConsumerGroup { - - private final Connection connection; - private final String entityPath; - private final String consumerGroupName; - - private Session session; - - public EventHubConsumerGroup(Connection connection, String entityPath, String consumerGroupName) { - this.connection = connection; - this.entityPath = entityPath; - this.consumerGroupName = consumerGroupName; - } - - public EventHubReceiver createReceiver(String partitionId, String startingOffset, int defaultCredits) throws EventHubException { - this.ensureSessionCreated(); - - if (startingOffset == null || startingOffset.equals("")) { - startingOffset = Constants.DefaultStartingOffset; - } - - String filterStr = String.format(Constants.OffsetFilterFormatString, startingOffset); - return new EventHubReceiver(this.session, this.entityPath, this.consumerGroupName, partitionId, filterStr, defaultCredits); - } - - public EventHubReceiver createReceiver(String partitionId, long timeAfter, int defaultCredits) throws EventHubException { - this.ensureSessionCreated(); - - String filterStr = String.format(Constants.EnqueueTimeFilterFormatString, timeAfter); - return new EventHubReceiver(this.session, this.entityPath, this.consumerGroupName, partitionId, filterStr, defaultCredits); - } - - public void close() { - if (this.session != null) { - this.session.close(); - } - } - - synchronized void ensureSessionCreated() throws EventHubException { - - try { - if (this.session == null) { - this.session = this.connection.createSession(); - } - } catch (ConnectionException e) { - throw new EventHubException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubException.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubException.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubException.java deleted file mode 100755 index 3e94573..0000000 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubException.java +++ /dev/null @@ -1,37 +0,0 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ -package org.apache.storm.eventhubs.client; - -public class EventHubException extends Exception { - - public EventHubException() { - super(); - } - - public EventHubException(String message) { - super(message); - } - - public EventHubException(Throwable cause) { - super(cause); - } - - public EventHubException(String message, Throwable cause) { - super(message, cause); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubReceiver.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubReceiver.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubReceiver.java deleted file mode 100755 index c8900a8..0000000 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubReceiver.java +++ /dev/null @@ -1,139 +0,0 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ -package org.apache.storm.eventhubs.client; - -import java.util.Collections; -import java.util.Map; -import org.apache.qpid.amqp_1_0.client.AcknowledgeMode; -import org.apache.qpid.amqp_1_0.client.ConnectionErrorException; -import org.apache.qpid.amqp_1_0.client.Message; -import org.apache.qpid.amqp_1_0.client.Receiver; -import org.apache.qpid.amqp_1_0.client.Session; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.messaging.Filter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public final class EventHubReceiver { - - private static final Logger logger = LoggerFactory - .getLogger(EventHubReceiver.class); - private static final String linkName = "eventhubs-receiver-link"; - - private final Session session; - private final String entityPath; - private final String consumerGroupName; - private final String partitionId; - private final String consumerAddress; - private final Map<Symbol, Filter> filters; - private final int defaultCredits; - - private Receiver receiver; - private boolean isClosed; - - public EventHubReceiver(Session session, String entityPath, - String consumerGroupName, String partitionId, String filterStr, int defaultCredits) - throws EventHubException { - - this.session = session; - this.entityPath = entityPath; - this.consumerGroupName = consumerGroupName; - this.partitionId = partitionId; - this.consumerAddress = this.getConsumerAddress(); - this.filters = Collections.singletonMap( - Symbol.valueOf(Constants.SelectorFilterName), - (Filter) new SelectorFilter(filterStr)); - logger.info("receiver filter string: " + filterStr); - this.defaultCredits = defaultCredits; - - this.ensureReceiverCreated(); - } - - // receive without timeout means wait until a message is delivered. - public Message receive() { - return this.receive(-1L); - } - - public Message receive(long waitTimeInMilliseconds) { - - this.checkIfClosed(); - - Message message = this.receiver.receive(waitTimeInMilliseconds); - - if (message != null) { - // Let's acknowledge a message although EH service doesn't need it - // to avoid AMQP flow issue. - receiver.acknowledge(message); - - return message; - } else { - this.checkError(); - } - - return null; - } - - public void close() { - if (!isClosed) { - receiver.close(); - isClosed = true; - } - } - - private String getConsumerAddress() { - return String.format(Constants.ConsumerAddressFormatString, - entityPath, consumerGroupName, partitionId); - } - - private void ensureReceiverCreated() throws EventHubException { - try { - logger.info("defaultCredits: " + defaultCredits); - receiver = session.createReceiver(consumerAddress, - AcknowledgeMode.ALO, linkName, false, filters, null); - receiver.setCredit(UnsignedInteger.valueOf(defaultCredits), true); - } catch (ConnectionErrorException e) { - // caller (EventHubSpout) will log the error - throw new EventHubException(e); - } - } - - private void checkError() { - org.apache.qpid.amqp_1_0.type.transport.Error error = this.receiver.getError(); - if (error != null) { - String errorMessage = error.toString(); - logger.error(errorMessage); - this.close(); - - throw new RuntimeException(errorMessage); - } else { - // adding a sleep here to avoid any potential tight-loop issue. - try { - Thread.sleep(10); - } catch (InterruptedException e) { - logger.error(e.toString()); - } - } - } - - private void checkIfClosed() { - if (this.isClosed) { - throw new RuntimeException("receiver was closed."); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSendClient.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSendClient.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSendClient.java deleted file mode 100755 index ad31cc1..0000000 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSendClient.java +++ /dev/null @@ -1,70 +0,0 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ -package org.apache.storm.eventhubs.client; - -import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; - -public class EventHubSendClient { - - public static void main(String[] args) throws Exception { - - if (args == null || args.length < 7) { - throw new IllegalArgumentException( - "arguments are missing. [username] [password] [namespace] [entityPath] [partitionId] [messageSize] [messageCount] are required."); - } - - String username = args[0]; - String password = args[1]; - String namespace = args[2]; - String entityPath = args[3]; - String partitionId = args[4]; - int messageSize = Integer.parseInt(args[5]); - int messageCount = Integer.parseInt(args[6]); - assert(messageSize > 0); - assert(messageCount > 0); - - if (partitionId.equals("-1")) { - // -1 means we want to send data to partitions in round-robin fashion. - partitionId = null; - } - - try { - String connectionString = EventHubSpoutConfig.buildConnectionString(username, password, namespace); - EventHubClient client = EventHubClient.create(connectionString, entityPath); - EventHubSender sender = client.createPartitionSender(partitionId); - - StringBuilder sb = new StringBuilder(messageSize); - for(int i=1; i<messageCount+1; ++i) { - while(sb.length() < messageSize) { - sb.append(" current message: " + i); - } - sb.setLength(messageSize); - sender.send(sb.toString()); - sb.setLength(0); - if(i % 1000 == 0) { - System.out.println("Number of messages sent: " + i); - } - } - System.out.println("Total Number of messages sent: " + messageCount); - } catch (Exception e) { - System.out.println("Exception: " + e.getMessage()); - } - - System.out.println("done"); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java deleted file mode 100755 index 435893e..0000000 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java +++ /dev/null @@ -1,99 +0,0 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ -package org.apache.storm.eventhubs.client; - -import java.util.concurrent.TimeoutException; -import org.apache.qpid.amqp_1_0.client.LinkDetachedException; -import org.apache.qpid.amqp_1_0.client.Message; -import org.apache.qpid.amqp_1_0.client.Sender; -import org.apache.qpid.amqp_1_0.client.Session; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.messaging.Data; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class EventHubSender { - - private static final Logger logger = LoggerFactory.getLogger(EventHubSender.class); - - private final Session session; - private final String entityPath; - private final String partitionId; - private final String destinationAddress; - - private Sender sender; - - public EventHubSender(Session session, String entityPath, String partitionId) { - this.session = session; - this.entityPath = entityPath; - this.partitionId = partitionId; - this.destinationAddress = this.getDestinationAddress(); - } - - public void send(byte[] data) throws EventHubException { - try { - if (this.sender == null) { - this.ensureSenderCreated(); - } - - Binary bin = new Binary(data); - Message message = new Message(new Data(bin)); - this.sender.send(message); - - } catch (LinkDetachedException e) { - logger.error(e.getMessage()); - - EventHubException eventHubException = new EventHubException("Sender has been closed"); - throw eventHubException; - } catch (TimeoutException e) { - logger.error(e.getMessage()); - - EventHubException eventHubException = new EventHubException("Timed out while waiting to get credit to send"); - throw eventHubException; - } catch (Exception e) { - logger.error(e.getMessage()); - } - } - - public void send(String data) throws EventHubException { - //For interop with other language, convert string to bytes - send(data.getBytes()); - } - - public void close() { - try { - this.sender.close(); - } catch (Sender.SenderClosingException e) { - logger.error("Closing a sender encountered error: " + e.getMessage()); - } - } - - private String getDestinationAddress() { - if (this.partitionId == null || this.partitionId.equals("")) { - return this.entityPath; - } else { - return String.format(Constants.DestinationAddressFormatString, this.entityPath, this.partitionId); - } - } - - private synchronized void ensureSenderCreated() throws Exception { - if (this.sender == null) { - this.sender = this.session.createSender(this.destinationAddress); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilter.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilter.java deleted file mode 100755 index 7869cce..0000000 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilter.java +++ /dev/null @@ -1,38 +0,0 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ -package org.apache.storm.eventhubs.client; - -import org.apache.qpid.amqp_1_0.type.messaging.Filter; - -public class SelectorFilter implements Filter { - - private final String value; - - public SelectorFilter(String value) { - this.value = value; - } - - public String getValue() { - return value; - } - - @Override - public String toString() { - return value; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilterWriter.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilterWriter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilterWriter.java deleted file mode 100755 index 102b6b6..0000000 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilterWriter.java +++ /dev/null @@ -1,64 +0,0 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ -package org.apache.storm.eventhubs.client; - -import org.apache.qpid.amqp_1_0.codec.AbstractDescribedTypeWriter; -import org.apache.qpid.amqp_1_0.codec.ValueWriter; -import org.apache.qpid.amqp_1_0.type.UnsignedLong; - -public class SelectorFilterWriter extends - AbstractDescribedTypeWriter<SelectorFilter> { - - private static final ValueWriter.Factory<SelectorFilter> FACTORY = new ValueWriter.Factory<SelectorFilter>() { - - @Override - public ValueWriter<SelectorFilter> newInstance(ValueWriter.Registry registry) { - return new SelectorFilterWriter(registry); - } - }; - - private SelectorFilter value; - - public SelectorFilterWriter(final ValueWriter.Registry registry) { - super(registry); - } - - public static void register(ValueWriter.Registry registry) { - registry.register(SelectorFilter.class, FACTORY); - } - - @Override - protected void onSetValue(final SelectorFilter value) { - this.value = value; - } - - @Override - protected void clear() { - value = null; - } - - @Override - protected Object getDescriptor() { - return UnsignedLong.valueOf(0x00000137000000AL); - } - - @Override - protected ValueWriter<String> createDescribedWriter() { - return getRegistry().getValueWriter(value.getValue()); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverFilter.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverFilter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverFilter.java deleted file mode 100755 index e80cd25..0000000 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverFilter.java +++ /dev/null @@ -1,56 +0,0 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ -package org.apache.storm.eventhubs.spout; - - -public class EventHubReceiverFilter implements IEventHubReceiverFilter { - String offset = null; - long enqueueTime = 0; - public EventHubReceiverFilter() { - - } - - public EventHubReceiverFilter(String offset) { - //Creates offset only filter - this.offset = offset; - } - - public EventHubReceiverFilter(long enqueueTime) { - //Creates enqueue time only filter - this.enqueueTime = enqueueTime; - } - - public void setOffset(String offset) { - this.offset = offset; - } - - public void setEnqueueTime(long enqueueTime) { - this.enqueueTime = enqueueTime; - } - - @Override - public String getOffset() { - return offset; - } - - @Override - public long getEnqueueTime() { - return enqueueTime; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java index 68302af..7454af4 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java @@ -25,10 +25,10 @@ import backtype.storm.metric.api.CountMetric; import backtype.storm.metric.api.MeanReducer; import backtype.storm.metric.api.ReducedMetric; -import org.apache.storm.eventhubs.client.Constants; -import org.apache.storm.eventhubs.client.EventHubClient; -import org.apache.storm.eventhubs.client.EventHubException; -import org.apache.storm.eventhubs.client.EventHubReceiver; +import com.microsoft.eventhubs.client.Constants; +import com.microsoft.eventhubs.client.EventHubException; +import com.microsoft.eventhubs.client.IEventHubFilter; +import com.microsoft.eventhubs.client.ResilientEventHubReceiver; import java.util.HashMap; import java.util.Map; @@ -39,8 +39,8 @@ import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations; public class EventHubReceiverImpl implements IEventHubReceiver { private static final Logger logger = LoggerFactory.getLogger(EventHubReceiverImpl.class); - private static final Symbol OffsetKey = Symbol.valueOf("x-opt-offset"); - private static final Symbol SequenceNumberKey = Symbol.valueOf("x-opt-sequence-number"); + private static final Symbol OffsetKey = Symbol.valueOf(Constants.OffsetKey); + private static final Symbol SequenceNumberKey = Symbol.valueOf(Constants.SequenceNumberKey); private final String connectionString; private final String entityName; @@ -48,8 +48,7 @@ public class EventHubReceiverImpl implements IEventHubReceiver { private final int defaultCredits; private final String consumerGroupName; - private EventHubReceiver receiver; - private String lastOffset = null; + private ResilientEventHubReceiver receiver; private ReducedMetric receiveApiLatencyMean; private CountMetric receiveApiCallCount; private CountMetric receiveMessageCount; @@ -66,27 +65,13 @@ public class EventHubReceiverImpl implements IEventHubReceiver { } @Override - public void open(IEventHubReceiverFilter filter) throws EventHubException { - logger.info("creating eventhub receiver: partitionId=" + partitionId + ", offset=" + filter.getOffset() - + ", enqueueTime=" + filter.getEnqueueTime()); + public void open(IEventHubFilter filter) throws EventHubException { + logger.info("creating eventhub receiver: partitionId=" + partitionId + + ", filterString=" + filter.getFilterString()); long start = System.currentTimeMillis(); - EventHubClient eventHubClient = EventHubClient.create(connectionString, entityName); - if(filter.getOffset() != null) { - receiver = eventHubClient - .getConsumerGroup(consumerGroupName) - .createReceiver(partitionId, filter.getOffset(), defaultCredits); - } - else if(filter.getEnqueueTime() != 0) { - receiver = eventHubClient - .getConsumerGroup(consumerGroupName) - .createReceiver(partitionId, filter.getEnqueueTime(), defaultCredits); - } - else { - logger.error("Invalid IEventHubReceiverFilter, use default offset as filter"); - receiver = eventHubClient - .getConsumerGroup(consumerGroupName) - .createReceiver(partitionId, Constants.DefaultStartingOffset, defaultCredits); - } + receiver = new ResilientEventHubReceiver(connectionString, entityName, + partitionId, consumerGroupName, defaultCredits, filter); + long end = System.currentTimeMillis(); logger.info("created eventhub receiver, time taken(ms): " + (end-start)); } @@ -113,21 +98,20 @@ public class EventHubReceiverImpl implements IEventHubReceiver { long millis = (end - start); receiveApiLatencyMean.update(millis); receiveApiCallCount.incr(); - + if (message == null) { //Temporary workaround for AMQP/EH bug of failing to receive messages - if(timeoutInMilliseconds > 100 && millis < timeoutInMilliseconds/2) { + /*if(timeoutInMilliseconds > 100 && millis < timeoutInMilliseconds/2) { throw new RuntimeException( "Restart EventHubSpout due to failure of receiving messages in " + millis + " millisecond"); - } + }*/ return null; } + receiveMessageCount.incr(); - //logger.info(String.format("received a message. PartitionId: %s, Offset: %s", partitionId, this.lastOffset)); MessageId messageId = createMessageId(message); - return EventData.create(message, messageId); } http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java index 0238e40..77cd998 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java @@ -18,10 +18,9 @@ package org.apache.storm.eventhubs.spout; import java.io.Serializable; -import java.io.UnsupportedEncodingException; -import java.net.URLEncoder; import java.util.ArrayList; import java.util.List; +import com.microsoft.eventhubs.client.ConnectionStringBuilder; public class EventHubSpoutConfig implements Serializable { private static final long serialVersionUID = 1L; @@ -48,7 +47,8 @@ public class EventHubSpoutConfig implements Serializable { String entityPath, int partitionCount) { this.userName = username; this.password = password; - this.connectionString = buildConnectionString(username, password, namespace); + this.connectionString = new ConnectionStringBuilder(username, password, + namespace).getConnectionString(); this.namespace = namespace; this.entityPath = entityPath; this.partitionCount = partitionCount; @@ -173,28 +173,7 @@ public class EventHubSpoutConfig implements Serializable { } public void setTargetAddress(String targetFqnAddress) { - this.connectionString = buildConnectionString( - userName, password, namespace, targetFqnAddress); + this.connectionString = new ConnectionStringBuilder(userName, password, + namespace, targetFqnAddress).getConnectionString(); } - - public static String buildConnectionString(String username, String password, String namespace) { - return buildConnectionString(username, password, namespace, EH_SERVICE_FQDN_SUFFIX); - } - - public static String buildConnectionString(String username, String password, - String namespace, String targetFqnAddress) { - return "amqps://" + username + ":" + encodeString(password) - + "@" + namespace + "." + targetFqnAddress; - } - - private static String encodeString(String input) { - try { - return URLEncoder.encode(input, "UTF-8"); - } catch (UnsupportedEncodingException e) { - //We don't need to throw this exception because the exception won't - //happen because of user input. Our unit tests will catch this error. - return ""; - } - } - } http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java index 45e9e57..bc2db14 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java @@ -19,11 +19,12 @@ package org.apache.storm.eventhubs.spout; import java.util.Map; -import org.apache.storm.eventhubs.client.EventHubException; +import com.microsoft.eventhubs.client.EventHubException; +import com.microsoft.eventhubs.client.IEventHubFilter; public interface IEventHubReceiver { - void open(IEventHubReceiverFilter filter) throws EventHubException; + void open(IEventHubFilter filter) throws EventHubException; void close(); http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFilter.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFilter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFilter.java deleted file mode 100755 index e5b93cf..0000000 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFilter.java +++ /dev/null @@ -1,35 +0,0 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ -package org.apache.storm.eventhubs.spout; - -/** - * The filter to create an EventHubs receiver - */ -public interface IEventHubReceiverFilter { - /** - * Get offset to filter events based on offset - * @return null if offset not set - */ - String getOffset(); - - /** - * Get timestamp to filter events based on enqueue time. - * @return 0 if enqueue time is not set - */ - long getEnqueueTime(); -} http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java index bcbcbac..b66a785 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java @@ -22,7 +22,10 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.storm.eventhubs.client.Constants; +import com.microsoft.eventhubs.client.Constants; +import com.microsoft.eventhubs.client.EventHubEnqueueTimeFilter; +import com.microsoft.eventhubs.client.EventHubOffsetFilter; +import com.microsoft.eventhubs.client.IEventHubFilter; /** * A simple partition manager that does not re-send failed messages @@ -62,13 +65,13 @@ public class SimplePartitionManager implements IPartitionManager { offset = Constants.DefaultStartingOffset; } - EventHubReceiverFilter filter = new EventHubReceiverFilter(); + IEventHubFilter filter; if (offset.equals(Constants.DefaultStartingOffset) && config.getEnqueueTimeFilter() != 0) { - filter.setEnqueueTime(config.getEnqueueTimeFilter()); + filter = new EventHubEnqueueTimeFilter(config.getEnqueueTimeFilter()); } else { - filter.setOffset(offset); + filter = new EventHubOffsetFilter(offset); } receiver.open(filter); http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java index 3f5f156..8d2c485 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java @@ -25,7 +25,7 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.storm.eventhubs.client.Constants; +import com.microsoft.eventhubs.client.Constants; public class StaticPartitionCoordinator implements IPartitionCoordinator { http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java index 2b92c3c..bf7f339 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java @@ -29,7 +29,7 @@ import org.apache.storm.eventhubs.spout.EventHubReceiverImpl; import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; import org.apache.storm.eventhubs.spout.IEventHubReceiver; import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory; -import org.apache.storm.eventhubs.client.Constants; +import com.microsoft.eventhubs.client.Constants; import storm.trident.operation.TridentCollector; import storm.trident.spout.IOpaquePartitionedTridentSpout; http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java index 60391c3..159fe41 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java @@ -23,10 +23,12 @@ import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.storm.eventhubs.client.Constants; -import org.apache.storm.eventhubs.client.EventHubException; +import com.microsoft.eventhubs.client.Constants; +import com.microsoft.eventhubs.client.EventHubEnqueueTimeFilter; +import com.microsoft.eventhubs.client.EventHubException; +import com.microsoft.eventhubs.client.EventHubOffsetFilter; + import org.apache.storm.eventhubs.spout.EventData; -import org.apache.storm.eventhubs.spout.EventHubReceiverFilter; import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; import org.apache.storm.eventhubs.spout.IEventHubReceiver; @@ -47,10 +49,10 @@ public class TridentPartitionManager implements ITridentPartitionManager { try { if((offset == null || offset.equals(Constants.DefaultStartingOffset)) && spoutConfig.getEnqueueTimeFilter() != 0) { - receiver.open(new EventHubReceiverFilter(spoutConfig.getEnqueueTimeFilter())); + receiver.open(new EventHubEnqueueTimeFilter(spoutConfig.getEnqueueTimeFilter())); } else { - receiver.open(new EventHubReceiverFilter(offset)); + receiver.open(new EventHubOffsetFilter(offset)); } lastOffset = offset; return true; http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java index 740ef63..b176598 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java @@ -24,14 +24,15 @@ import java.util.Map; import org.apache.storm.eventhubs.spout.MessageId; import org.apache.storm.eventhubs.spout.EventData; import org.apache.storm.eventhubs.spout.IEventHubReceiver; - import org.apache.qpid.amqp_1_0.client.Message; import org.apache.qpid.amqp_1_0.jms.impl.TextMessageImpl; import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.Section; import org.apache.qpid.amqp_1_0.type.messaging.Data; -import org.apache.storm.eventhubs.client.EventHubException; +import com.microsoft.eventhubs.client.EventHubException; +import com.microsoft.eventhubs.client.EventHubOffsetFilter; +import com.microsoft.eventhubs.client.IEventHubFilter; /** * A mock receiver that emits fake data with offset starting from given offset @@ -58,17 +59,8 @@ public class EventHubReceiverMock implements IEventHubReceiver { } @Override - public void open(IEventHubReceiverFilter filter) throws EventHubException { - if(filter.getOffset() != null) { - currentOffset = Long.parseLong(filter.getOffset()); - } - else if(filter.getEnqueueTime() != 0) { - //assume if it's time based filter the offset matches the enqueue time. - currentOffset = filter.getEnqueueTime(); - } - else { - throw new EventHubException("Invalid IEventHubReceiverFilter"); - } + public void open(IEventHubFilter filter) throws EventHubException { + currentOffset = Long.parseLong(filter.getFilterValue()); isOpen = true; }
