[GitHub] storm issue #2588: Microsoft Azure EventHubs Storm Spout and Bolt improvemen...

2018-03-23 Thread SreeramGarlapati
Github user SreeramGarlapati commented on the issue:

https://github.com/apache/storm/pull/2588
  
@srdo - when I try to merge my changes with `master` - I ended up with a 
merge conflicts due to `line endings` change. is there any simple tip - which I 
can use to ease through this? like changed my files to a specific `line ending` 
before the merge etc? 
Thanks a lot for the help so far!


---


[GitHub] storm issue #2588: Microsoft Azure EventHubs Storm Spout and Bolt improvemen...

2018-03-10 Thread SreeramGarlapati
Github user SreeramGarlapati commented on the issue:

https://github.com/apache/storm/pull/2588
  
thanks @srdo - i will take a shot at these changes by EOD monday PST.


---


[GitHub] storm pull request #2588: Microsoft Azure EventHubs Storm Spout and Bolt imp...

2018-03-06 Thread SreeramGarlapati
GitHub user SreeramGarlapati opened a pull request:

https://github.com/apache/storm/pull/2588

Microsoft Azure EventHubs Storm Spout and Bolt improvements

This is continuation of work done by @raviperi 

- update to the latest version of eventhubs java client
-Introduce config params to use latest EH client, control request prefetch 
size, batch size of events received per call.
-Refactor the code to group classes more appropriately
-Remove redundant types
-Javadoc comments where applicable
-Preftch config parameter to dictate EH prefetch count
-config parameter to introduce sleep between spout's nexttuple calls
-config parameter to retrieve a batched number of events per call to EH
(opposed to single event)
-New data scheme to group event payload and audit params into a single
type, and expose the single type as the only tuple field to downstream
bolts.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/SreeramGarlapati/storm master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2588.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2588


commit e9fc45d3d604b65ab3e99895b4a7711bf1b7ab30
Author: Ravi Peri 
Date:   2017-09-11T21:35:48Z

Introduce config params to dictate EH request prefetch size, max number
of events received per call.

-Refactor the code to group classes more appropriately
-Remove redundant types
-Javadoc comments where applicable
-Preftch config parameter to dictate EH prefetch count
-config parameter to introduce sleep between spout's nexttuple calls
-config parameter to retrieve a batched number of events per call to EH
(opposed to single event)
-New data scheme to group event payload and audit params into a single
type, and expose the single type as the only tuple field to downstream
bolts.

commit 3f8f37f65551414afdaaf48935b662e9e42836be
Author: Ravi Peri 
Date:   2017-09-13T18:07:18Z

Remove unncessary file.

commit 9a18f9a8d1629f34fac648e05a3ae61d388e0670
Author: Ravi Peri 
Date:   2017-09-14T17:09:35Z

Add license header, remove next tuple sleep time as it is redundant.

commit 5a7eb2e1f5d5f1a3f65e1530b284f364e618f99f
Author: Ravi Peri 
Date:   2017-09-14T17:14:08Z

Remove unused nextTuple sleep interval field

commit 6ba9d11a9343c47add523e55415238b6df010de5
Author: Ravi Peri 
Date:   2017-09-19T19:27:32Z

Add javadoc for EventHubConfig. Remove unused fields. Improve logging when 
returning empty/null values

commit 53569ac4ccdca5e0978ac3b8eae2423c97657197
Author: Ravi Peri 
Date:   2017-09-19T20:33:20Z

Organize usings. Remove extra constructor and fields in EventHubSpoutConfig

commit b1bde9b592024dcf1eaa70365c010cc56cd99313
Author: Ravi Peri 
Date:   2017-09-20T18:30:46Z

Remove obselete Binary and EventDataSchemes, and add unit tests for 
supported schemes. Enable scheme based serialization in spout

commit 4516d01487e4dcb2211ea713fb160f6a45bd34fd
Author: Ravi Peri 
Date:   2017-09-20T18:52:50Z

Mark all fields final in EventHUbMessage

commit 2063f4547939dfde459d8241b80e3b262e5e1816
Author: Ravi Peri 
Date:   2017-09-20T19:15:40Z

Update batch size log message to debug level

commit 480e2fcdaa4e14f503cde1552a11d53830a21cda
Author: Sreeram Garlapati 
Date:   2018-03-07T03:18:56Z

Merge branch 'master' of https://github.com/raviperi/storm

# Conflicts:
#   
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
#   
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/FieldConstants.java
#   
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/format/StringEventDataScheme.java
#   
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
#   
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubException.java
#   
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
#   
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
#   
external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
#   pom.xml

commit a61466d4f4cfda92aea3157abbeec4b3646cdd5d
Author: Sreeram Garlapati 
Date:   2018-03-07T05:32:14Z

remove files as continuation to the merge commit

commit 19bd1c9ddd13f73cc8e6ddfc2d987c7c8ea58efd
Author: Sreeram Garlapati 
Date:   2018-03-07T06:09:12Z

move the library to 1.0.0




---


[GitHub] storm pull request #2098: STORM-2499: Add Serialization plugin for EventHub ...

2017-05-04 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/2098#discussion_r114826039
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
 ---
@@ -23,5 +23,6 @@
   public static final String Offset = "offset";
   public static final String Message = "message";
   public static final String META_DATA = "metadata";
+  public static final String SYSTEM_META_DATA = "systemmetadata";
--- End diff --

>SYSTEM_META_DATA [](start = 29, length = 16)

Too generic; in general being more specific is recommended - feel free to 
use something like eventdata_system_properties


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2098: STORM-2499: Add Serialization plugin for EventHub ...

2017-05-04 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/2098#discussion_r114825627
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataSystemPropertiesScheme.java
 ---
@@ -0,0 +1,51 @@
+package org.apache.storm.eventhubs.spout;
+
+import com.microsoft.azure.eventhubs.EventData;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An Event Data Scheme which deserializes message payload into the raw 
bytes.
+ *
+ * The resulting tuple would contain two items, the first being the message
+ * bytes, and the second a map of properties that include System Metadata
--- End diff --

>properties [](start = 34, length = 10)

Indicate explicitly that EventData.Properties are not included
or I would actually recommend to go ahead and include 
eventdata.getProperties() also in the deserialized output.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2098: STORM-2499: Add Serialization plugin for EventHub ...

2017-05-04 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/2098#discussion_r114821502
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataSystemPropertiesScheme.java
 ---
@@ -0,0 +1,51 @@
+package org.apache.storm.eventhubs.spout;
+
+import com.microsoft.azure.eventhubs.EventData;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An Event Data Scheme which deserializes message payload into the raw 
bytes.
+ *
+ * The resulting tuple would contain two items, the first being the message
+ * bytes, and the second a map of properties that include System Metadata
+ * properties. System Properties in eventhub exposes offset, sequencenumber
+ * and EnqueueTime
+ */
+public class BinaryEventDataSystemPropertiesScheme implements 
IEventDataScheme{
--- End diff --

>BinaryEventDataSystemPropertiesScheme [](start = 13, length = 37)

add a with in the name.. EventDataWithSys


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2026: Eventhub3

2017-03-30 Thread SreeramGarlapati
Github user SreeramGarlapati commented on the issue:

https://github.com/apache/storm/pull/2026
  
:shipit:


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2026: Eventhub3

2017-03-30 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/2026#discussion_r109074402
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
 ---
@@ -44,21 +38,32 @@
 public class StringEventDataScheme implements IEventDataScheme {
 
   private static final long serialVersionUID = 1L;
+  private static final Logger logger = 
LoggerFactory.getLogger(StringEventDataScheme.class);
 
   @Override
-  public List deserialize(Message message) {
+  public List deserialize(EventData eventData) {
 final List fieldContents = new ArrayList();
-
-for (Section section : message.getPayload()) {
-  if (section instanceof Data) {
-Data data = (Data) section;
-fieldContents.add(new String(data.getValue().getArray()));
-  } else if (section instanceof AmqpValue) {
-AmqpValue amqpValue = (AmqpValue) section;
-fieldContents.add(amqpValue.getValue().toString());
+String messageData = "";
+if (eventData.getBytes()!=null) {
+  messageData = new String(eventData.getBytes());
+}
+/*Will only serialize AMQPValue type*/
+else if (eventData.getObject()!=null) {
+  try {
+if (!(eventData.getObject() instanceof List)) {
+  messageData = eventData.getObject().toString();
+} else {
+  throw new RuntimeException("Cannot serialize the given AMQP 
type.");
--- End diff --

>serialize [](start = 45, length = 9)

cannot deserialize...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2026: Eventhub3

2017-03-30 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/2026#discussion_r109074150
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SerializeDeserializeUtil.java
 ---
@@ -0,0 +1,35 @@

+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
***/
+
+package org.apache.storm.eventhubs.spout;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+
+public class SerializeDeserializeUtil {
+public static byte[] serialize(Object obj) throws IOException { 
+try (ByteArrayOutputStream b = new ByteArrayOutputStream()) {
+try (ObjectOutputStream o = new ObjectOutputStream(b)) {
+o.writeObject(obj);
+o.close();
--- End diff --

>o.close(); [](start = 16, length = 10)

not needed...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2026: Eventhub3

2017-03-30 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/2026#discussion_r109073792
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java
 ---
@@ -41,29 +41,29 @@ public PartitionManager(
 
 super(spoutConfig, partitionId, stateStore, receiver);
 
-this.pending = new LinkedHashMap();
-this.toResend = new TreeSet();
+this.pending = new LinkedHashMap();
+this.toResend = new TreeSet();
--- End diff --

>() [](start = 46, length = 2)

order by sequence number


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2026: Eventhub3

2017-03-30 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/2026#discussion_r109072842
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
 ---
@@ -44,27 +41,31 @@
 public class EventDataScheme implements IEventDataScheme {
 
private static final long serialVersionUID = 1L;
-
+   private static final Logger logger = 
LoggerFactory.getLogger(EventDataScheme.class);
@Override
-   public List deserialize(Message message) {
+   public List deserialize(EventData eventData) {
final List fieldContents = new ArrayList();
-
-   Map metaDataMap = new HashMap();
String messageData = "";
-
-   for (Section section : message.getPayload()) {
-   if (section instanceof Data) {
-   Data data = (Data) section;
-   messageData = new 
String(data.getValue().getArray());
-   } else if (section instanceof AmqpValue) {
-   AmqpValue amqpValue = (AmqpValue) section;
-   messageData = amqpValue.getValue().toString();
-   } else if (section instanceof ApplicationProperties) {
-   final ApplicationProperties 
applicationProperties = (ApplicationProperties) section;
-   metaDataMap = applicationProperties.getValue();
+   if (eventData.getBytes()!=null) {
+   messageData = new String(eventData.getBytes());
+   }
+   /*Will only serialize AMQPValue type*/
+   else if (eventData.getObject()!=null) {
+   try {
+   if (!(eventData.getObject() instanceof List)) {
+   messageData = 
eventData.getObject().toString();
+   } else {
+   throw new RuntimeException("Cannot 
serialize the given AMQP type");
+   }
+   } catch (RuntimeException e) {
+   logger.error("Failed to serialize EventData 
payload class"
+   + 
eventData.getObject().getClass());
+   logger.error("Exception encountered while 
serializing EventData payload is"
+   + e.toString());
+   throw e;
}
}
-
+   Map metaDataMap = eventData.getProperties().size() > 0 ? 
eventData.getProperties() : null;
fieldContents.add(messageData);
fieldContents.add(metaDataMap);
--- End diff --

>fieldContents.add(metaDataMap); [](start = 2, length = 31)

Do you still have to add this if the properties are null ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2026: Eventhub3

2017-03-24 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/2026#discussion_r108012858
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
 ---
@@ -44,27 +43,21 @@
 public class EventDataScheme implements IEventDataScheme {
 
private static final long serialVersionUID = 1L;
-
+   private static final Logger logger = 
LoggerFactory.getLogger(EventDataScheme.class);
@Override
-   public List deserialize(Message message) {
+   public List deserialize(EventData eventData) {
final List fieldContents = new ArrayList();
-
-   Map metaDataMap = new HashMap();
String messageData = "";
-
-   for (Section section : message.getPayload()) {
-   if (section instanceof Data) {
-   Data data = (Data) section;
-   messageData = new 
String(data.getValue().getArray());
-   } else if (section instanceof AmqpValue) {
-   AmqpValue amqpValue = (AmqpValue) section;
-   messageData = amqpValue.getValue().toString();
-   } else if (section instanceof ApplicationProperties) {
-   final ApplicationProperties 
applicationProperties = (ApplicationProperties) section;
-   metaDataMap = applicationProperties.getValue();
-   }
+   if(eventData.getBytes()!=null)
+   messageData = new String (eventData.getBytes());
+   else if(eventData.getObject()!=null){
+   try{
+   messageData = new 
String(Serializedeserializeutil.serialize(eventData.getObject()),Charset.defaultCharset());
+   }catch (IOException e){
+   logger.error("Failed to serialize object"+e.toString());
}
-
+   }
+   Map metaDataMap = eventData.getProperties();
--- End diff --

>Map metaDataMap = eventData.getProperties(); [](start = 2, length = 44)

the old solution is not optimal - will result into too many unnecessary map 
objects initialized; pls evaluate if you want to change it
if map.count()==0 (ie., empty map) - dont even return them...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2026: Eventhub3

2017-03-22 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/2026#discussion_r107514839
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
 ---
@@ -65,16 +60,13 @@ public void open() throws Exception {
   offset = Constants.DefaultStartingOffset;
 }
 
-IEventHubFilter filter;
 if (offset.equals(Constants.DefaultStartingOffset)
 && config.getEnqueueTimeFilter() != 0) {
-  filter = new 
EventHubEnqueueTimeFilter(config.getEnqueueTimeFilter());
-}
-else {
-  filter = new EventHubOffsetFilter(offset);
+  offset = Long.toString(config.getEnqueueTimeFilter());
--- End diff --

>getEnqueueTimeFilter [](start = 36, length = 20)

why is TimeFilter translated to Offset filter ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2026: Eventhub3

2017-03-22 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/2026#discussion_r107512537
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
 ---
@@ -65,77 +57,81 @@ public EventHubReceiverImpl(EventHubSpoutConfig config, 
String partitionId) {
   }
 
   @Override
-  public void open(IEventHubFilter filter) throws EventHubException {
-logger.info("creating eventhub receiver: partitionId=" + partitionId + 
-   ", filterString=" + filter.getFilterString());
+  public void open(String offset) throws EventHubException {
+logger.info("creating eventhub receiver: partitionId=" + partitionId +
+", offset=" + offset);
 long start = System.currentTimeMillis();
-receiver = new ResilientEventHubReceiver(connectionString, entityName,
-   partitionId, consumerGroupName, defaultCredits, filter);
-receiver.initialize();
-
+try {
+  ehClient = 
EventHubClient.createFromConnectionStringSync(connectionString);
+  receiver = ehClient.createEpochReceiverSync(
+  consumerGroupName,
+  partitionId,
+  offset,
+  false,
+  1);
+}catch (Exception e){
+  logger.info("Exception in creating EventhubClient"+e.toString());
+}
 long end = System.currentTimeMillis();
 logger.info("created eventhub receiver, time taken(ms): " + 
(end-start));
   }
 
   @Override
-  public void close() {
+  public void close(){
 if(receiver != null) {
-  receiver.close();
+  try {
+receiver.close().whenComplete((voidargs,error)->{
+  try{
+if(error!=null){
+  logger.error("Exception during receiver close 
phase"+error.toString());
+}
+ehClient.closeSync();
+  }catch (Exception e){
+logger.error("Exception during ehclient close 
phase"+e.toString());
+  }
+}).get();
+  }catch (InterruptedException e){
+logger.error("Exception occured during close phase"+e.toString());
+  }catch (ExecutionException e){
+logger.error("Exception occured during close phase"+e.toString());
+  }
   logger.info("closed eventhub receiver: partitionId=" + partitionId );
   receiver = null;
+  ehClient =  null;
 }
   }
-  
+
+
   @Override
   public boolean isOpen() {
 return (receiver != null);
   }
 
   @Override
-  public EventData receive(long timeoutInMilliseconds) {
+  public EventDataWrap receive() {
 long start = System.currentTimeMillis();
-Message message = receiver.receive(timeoutInMilliseconds);
+Iterable receivedEvents=null;
+/*Get one message at a time for backward compatibility behaviour*/
+try {
+  receivedEvents = receiver.receiveSync(1);
--- End diff --

>receiver [](start = 23, length = 8)

this will result in NPE if in the Open() - createReceiver call threw 
exception (and the try catch swallowed it)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2026: Eventhub3

2017-03-22 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/2026#discussion_r107511737
  
--- Diff: pom.xml ---
@@ -297,6 +297,7 @@
 3.1.0
 1.0
 0.32
+0.13.0
 1.0.1
--- End diff --

>1.0.1 [](start = 8, 
length = 58)

which client is this ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2026: Eventhub3

2017-03-22 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/2026#discussion_r107511467
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/Serializedeserializeutil.java
 ---
@@ -0,0 +1,21 @@
+package org.apache.storm.eventhubs.spout;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+
+/**
+ * Created by rabaner on 3/22/2017.
+ */
--- End diff --

nit: license header and fix name of class to camel casing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2026: Eventhub3

2017-03-22 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/2026#discussion_r107485639
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java
 ---
@@ -37,23 +37,21 @@
  */
 public class BinaryEventDataScheme implements IEventDataScheme {
 
+   private static final Logger logger = 
LoggerFactory.getLogger(BinaryEventDataScheme.class);
@Override
-   public List deserialize(Message message) {
+   public List deserialize(EventData eventData){
final List fieldContents = new ArrayList();
-
-   Map metaDataMap = new HashMap();
-   byte[] messageData = new byte[0];
-
-   for (Section section : message.getPayload()) {
-   if (section instanceof Data) {
-   Data data = (Data) section;
-   messageData = data.getValue().getArray();
-   } else if (section instanceof ApplicationProperties) {
-   final ApplicationProperties 
applicationProperties = (ApplicationProperties) section;
-   metaDataMap = applicationProperties.getValue();
+   byte [] messageData = null;
+   if(eventData.getBytes() != null)
+   messageData =  eventData.getBytes();
+   else if(eventData.getObject()!=null) {
+   try {
+   messageData = 
Serializedeserializeutil.serialize(eventData.getObject());
+   }catch (IOException e){
+   logger.error("Failed to serialize 
object"+e.toString());
--- End diff --

>logger.error("Failed to serialize object"+e.toString()); [](start = 4, 
length = 56)

Will this not result into message loss..? if so don't Swallow the 
exception...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2026: Eventhub3

2017-03-22 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/2026#discussion_r107484015
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
 ---
@@ -44,21 +40,23 @@
 public class StringEventDataScheme implements IEventDataScheme {
 
   private static final long serialVersionUID = 1L;
+  private static final Logger logger = 
LoggerFactory.getLogger(StringEventDataScheme.class);
 
   @Override
-  public List deserialize(Message message) {
+  public List deserialize(EventData eventData) {
 final List fieldContents = new ArrayList();
-
-for (Section section : message.getPayload()) {
-  if (section instanceof Data) {
-Data data = (Data) section;
-fieldContents.add(new String(data.getValue().getArray()));
-  } else if (section instanceof AmqpValue) {
-AmqpValue amqpValue = (AmqpValue) section;
-fieldContents.add(amqpValue.getValue().toString());
+String messageData = "";
+if(eventData.getBytes()!=null)
+  messageData = new String (eventData.getBytes());
+else if(eventData.getObject()!=null){
+  try{
+messageData = new 
String(Serializedeserializeutil.serialize(eventData.getObject()),Charset.defaultCharset());
+  }catch (IOException e){
+logger.error("Failed to serialize object"+e.toString());
--- End diff --

>object [](start = 42, length = 6)

nit: i would pitch the msg a bit more specific, like - failed to Serialize 
EventData payload


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2026: Eventhub3

2017-03-22 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/2026#discussion_r107483782
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
 ---
@@ -44,21 +40,23 @@
 public class StringEventDataScheme implements IEventDataScheme {
 
   private static final long serialVersionUID = 1L;
+  private static final Logger logger = 
LoggerFactory.getLogger(StringEventDataScheme.class);
 
   @Override
-  public List deserialize(Message message) {
+  public List deserialize(EventData eventData) {
 final List fieldContents = new ArrayList();
-
-for (Section section : message.getPayload()) {
-  if (section instanceof Data) {
-Data data = (Data) section;
-fieldContents.add(new String(data.getValue().getArray()));
-  } else if (section instanceof AmqpValue) {
-AmqpValue amqpValue = (AmqpValue) section;
-fieldContents.add(amqpValue.getValue().toString());
+String messageData = "";
+if(eventData.getBytes()!=null)
+  messageData = new String (eventData.getBytes());
+else if(eventData.getObject()!=null){
+  try{
+messageData = new 
String(Serializedeserializeutil.serialize(eventData.getObject()),Charset.defaultCharset());
--- End diff --

>new 
String(Serializedeserializeutil.serialize(eventData.getObject()),Charset.defaultCharset());
 [](start = 22, length = 95)

eventData.getObject() would return java type of the corresponding Amqp 
object (if there is no direct java type mapping available - you would see a 
proton-j type here).
If the type is a primitive data type - simply return .tostring().
else - follow the serialization approach.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1985: STORM-2371: New eventhub implementation

2017-03-03 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/1985#discussion_r104280323
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
 ---
@@ -46,25 +42,11 @@
private static final long serialVersionUID = 1L;
 
@Override
-   public List deserialize(Message message) {
+   public List deserialize(EventData eventData) {
final List fieldContents = new ArrayList();
-
-   Map metaDataMap = new HashMap();
String messageData = "";
-
-   for (Section section : message.getPayload()) {
-   if (section instanceof Data) {
-   Data data = (Data) section;
-   messageData = new 
String(data.getValue().getArray());
-   } else if (section instanceof AmqpValue) {
-   AmqpValue amqpValue = (AmqpValue) section;
-   messageData = amqpValue.getValue().toString();
-   } else if (section instanceof ApplicationProperties) {
-   final ApplicationProperties 
applicationProperties = (ApplicationProperties) section;
-   metaDataMap = applicationProperties.getValue();
-   }
-   }
-
+   messageData = new String 
(eventData.getBody(),eventData.getBodyOffset(),eventData.getBodyLength(),Charset.defaultCharset());
--- End diff --

Null check getBody


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1985: STORM-2371: New eventhub implementation

2017-03-03 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/1985#discussion_r104280244
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
 ---
@@ -46,19 +39,11 @@
   private static final long serialVersionUID = 1L;
 
   @Override
-  public List deserialize(Message message) {
+  public List deserialize(EventData eventData) {
 final List fieldContents = new ArrayList();
-
-for (Section section : message.getPayload()) {
-  if (section instanceof Data) {
-Data data = (Data) section;
-fieldContents.add(new String(data.getValue().getArray()));
-  } else if (section instanceof AmqpValue) {
-AmqpValue amqpValue = (AmqpValue) section;
-fieldContents.add(amqpValue.getValue().toString());
-  }
-}
-
+String messageData = "";
+messageData = new String 
(eventData.getBody(),eventData.getBodyOffset(),eventData.getBodyLength(),Charset.defaultCharset());
--- End diff --

Add null check for getbody()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1985: STORM-2371: New eventhub implementation

2017-03-03 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/1985#discussion_r104280523
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
 ---
@@ -65,77 +60,80 @@ public EventHubReceiverImpl(EventHubSpoutConfig config, 
String partitionId) {
   }
 
   @Override
-  public void open(IEventHubFilter filter) throws EventHubException {
-logger.info("creating eventhub receiver: partitionId=" + partitionId + 
-   ", filterString=" + filter.getFilterString());
+  public void open(String offset) throws EventHubException {
+logger.info("creating eventhub receiver: partitionId=" + partitionId +
+", offset=" + offset);
 long start = System.currentTimeMillis();
-receiver = new ResilientEventHubReceiver(connectionString, entityName,
-   partitionId, consumerGroupName, defaultCredits, filter);
-receiver.initialize();
-
+try {
+  ehClient = 
EventHubClient.createFromConnectionStringSync(connectionString);
+  receiver = ehClient.createEpochReceiverSync(
+  consumerGroupName,
+  partitionId,
+  offset,
+  false,
+  1);
--- End diff --

Please refrain from using catch all exceptions and move to catching more 
specific exceptions...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1985: STORM-2371: New eventhub implementation

2017-03-03 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/1985#discussion_r104280360
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
 ---
@@ -46,25 +42,11 @@
private static final long serialVersionUID = 1L;
 
@Override
-   public List deserialize(Message message) {
+   public List deserialize(EventData eventData) {
final List fieldContents = new ArrayList();
-
-   Map metaDataMap = new HashMap();
String messageData = "";
-
-   for (Section section : message.getPayload()) {
-   if (section instanceof Data) {
-   Data data = (Data) section;
-   messageData = new 
String(data.getValue().getArray());
-   } else if (section instanceof AmqpValue) {
-   AmqpValue amqpValue = (AmqpValue) section;
-   messageData = amqpValue.getValue().toString();
-   } else if (section instanceof ApplicationProperties) {
-   final ApplicationProperties 
applicationProperties = (ApplicationProperties) section;
-   metaDataMap = applicationProperties.getValue();
-   }
-   }
-
+   messageData = new String 
(eventData.getBody(),eventData.getBodyOffset(),eventData.getBodyLength(),Charset.defaultCharset());
--- End diff --

The old code handles the case when AmqpValue is species as payload - if you 
need that support here - you will need to wait until 0.12.0
Refer to this pr for details : 
https://github.com/Azure/azure-event-hubs-java/pull/66


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1951: STORM-2371 Implementing new eventhub driver

2017-02-23 Thread SreeramGarlapati
Github user SreeramGarlapati commented on the issue:

https://github.com/apache/storm/pull/1951
  
:clock1:


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1951: STORM-2371 Implementing new eventhub driver

2017-02-23 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/1951#discussion_r102774770
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
 ---
@@ -92,50 +110,34 @@ public boolean isOpen() {
   }
 
   @Override
-  public EventData receive(long timeoutInMilliseconds) {
+  public EventData receive() {
 long start = System.currentTimeMillis();
-Message message = receiver.receive(timeoutInMilliseconds);
+Iterable receivedEvents=null;
+/*Get one message at a time for backward compatibility behaviour*/
+try {
+  receivedEvents = receiver.receive(1).get();
+}catch (Exception e){
+  logger.error("Exception occured during receive"+e.toString());
+}
 long end = System.currentTimeMillis();
 long millis = (end - start);
 receiveApiLatencyMean.update(millis);
 receiveApiCallCount.incr();
-
-if (message == null) {
-  //Temporary workaround for AMQP/EH bug of failing to receive messages
-  /*if(timeoutInMilliseconds > 100 && millis < 
timeoutInMilliseconds/2) {
-throw new RuntimeException(
-"Restart EventHubSpout due to failure of receiving messages in 
"
-+ millis + " millisecond");
-  }*/
+if (receivedEvents == null) {
   return null;
 }
-
 receiveMessageCount.incr();
-
-MessageId messageId = createMessageId(message);
-return EventData.create(message, messageId);
-  }
-  
-  private MessageId createMessageId(Message message) {
-String offset = null;
-long sequenceNumber = 0;
-
-for (Section section : message.getPayload()) {
-  if (section instanceof MessageAnnotations) {
-MessageAnnotations annotations = (MessageAnnotations) section;
-HashMap annonationMap = (HashMap) annotations.getValue();
-
-if (annonationMap.containsKey(OffsetKey)) {
-  offset = (String) annonationMap.get(OffsetKey);
-}
-
-if (annonationMap.containsKey(SequenceNumberKey)) {
-  sequenceNumber = (Long) annonationMap.get(SequenceNumberKey);
-}
-  }
+MessageId messageId=null;
+Message message=null;
+for (com.microsoft.azure.eventhubs.EventData receivedEvent : 
receivedEvents) {
+  messageId = new MessageId(partitionId,
+  receivedEvent.getSystemProperties().getOffset(),
+  receivedEvent.getSystemProperties().getSequenceNumber());
+  List body = new ArrayList();
+  body.add(new Data(new Binary((new String(receivedEvent.getBody(), 
Charset.defaultCharset())).getBytes(;
+  message = new Message(body);
--- End diff --

This is not equivalent to the existing behavior. Message could have other 
amqp sections - ApplicationProperties and SystemProperties.
Ideally, you should return com.microsoft.azure.eventhubs.EventData. Please 
remove the EventData type created in the spout library - to eliminate confusion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1951: STORM-2371 Implementing new eventhub driver

2017-02-23 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/1951#discussion_r102773626
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
 ---
@@ -92,50 +110,34 @@ public boolean isOpen() {
   }
 
   @Override
-  public EventData receive(long timeoutInMilliseconds) {
+  public EventData receive() {
 long start = System.currentTimeMillis();
-Message message = receiver.receive(timeoutInMilliseconds);
+Iterable receivedEvents=null;
+/*Get one message at a time for backward compatibility behaviour*/
+try {
+  receivedEvents = receiver.receive(1).get();
+}catch (Exception e){
+  logger.error("Exception occured during receive"+e.toString());
+}
 long end = System.currentTimeMillis();
 long millis = (end - start);
 receiveApiLatencyMean.update(millis);
 receiveApiCallCount.incr();
-
-if (message == null) {
-  //Temporary workaround for AMQP/EH bug of failing to receive messages
-  /*if(timeoutInMilliseconds > 100 && millis < 
timeoutInMilliseconds/2) {
-throw new RuntimeException(
-"Restart EventHubSpout due to failure of receiving messages in 
"
-+ millis + " millisecond");
-  }*/
+if (receivedEvents == null) {
   return null;
 }
-
 receiveMessageCount.incr();
-
-MessageId messageId = createMessageId(message);
-return EventData.create(message, messageId);
-  }
-  
-  private MessageId createMessageId(Message message) {
-String offset = null;
-long sequenceNumber = 0;
-
-for (Section section : message.getPayload()) {
-  if (section instanceof MessageAnnotations) {
-MessageAnnotations annotations = (MessageAnnotations) section;
-HashMap annonationMap = (HashMap) annotations.getValue();
-
-if (annonationMap.containsKey(OffsetKey)) {
-  offset = (String) annonationMap.get(OffsetKey);
-}
-
-if (annonationMap.containsKey(SequenceNumberKey)) {
-  sequenceNumber = (Long) annonationMap.get(SequenceNumberKey);
-}
-  }
+MessageId messageId=null;
+Message message=null;
+for (com.microsoft.azure.eventhubs.EventData receivedEvent : 
receivedEvents) {
--- End diff --

>for (com.microsoft.azure.eventhubs.EventData receivedEvent : 
receivedEvents) { [](start = 4, length = 78)

Why ? In the receive(1) call above code - you explicitly specified receive 
1 message - so remove this..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1951: STORM-2371 Implementing new eventhub driver

2017-02-23 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/1951#discussion_r102770531
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
 ---
@@ -65,24 +69,38 @@ public EventHubReceiverImpl(EventHubSpoutConfig config, 
String partitionId) {
   }
 
   @Override
-  public void open(IEventHubFilter filter) throws EventHubException {
-logger.info("creating eventhub receiver: partitionId=" + partitionId + 
-   ", filterString=" + filter.getFilterString());
+  public void open(String offset) throws EventHubException {
+logger.info("creating eventhub receiver: partitionId=" + partitionId +
+", offset=" + offset);
 long start = System.currentTimeMillis();
-receiver = new ResilientEventHubReceiver(connectionString, entityName,
-   partitionId, consumerGroupName, defaultCredits, filter);
-receiver.initialize();
-
+try {
+  ehClient = 
EventHubClient.createFromConnectionString(connectionString).get();
+  receiver = ehClient.createEpochReceiver(
+  consumerGroupName,
+  partitionId,
+  offset,
+  false,
+  1).get();
+}catch (Exception e){
+  logger.info("Exception in creating EventhubClient"+e.toString());
+}
 long end = System.currentTimeMillis();
 logger.info("created eventhub receiver, time taken(ms): " + 
(end-start));
   }
 
   @Override
-  public void close() {
+  public void close(){
 if(receiver != null) {
-  receiver.close();
+  try {
+receiver.close().get();
--- End diff --

>.get() [](start = 24, length = 6)

if receiver.close() fails - this results into connection leak; handle this 
case. something like:
receiver.whenComplete(() -> { ehClient.closeSync() }).get();


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1951: STORM-2371 Implementing new eventhub driver

2017-02-23 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/1951#discussion_r102769617
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
 ---
@@ -84,12 +82,39 @@ public void prepare(Map config, TopologyContext context,
@Override
public void execute(Tuple tuple) {
try {
-   
sender.send(boltConfig.getEventDataFormat().serialize(tuple));
+   EventData sendEvent = new 
EventData(boltConfig.getEventDataFormat().serialize(tuple));
+   if(boltConfig.getPartitionMode() && sender!=null)
+   sender.send(sendEvent).get();
+   else if(boltConfig.getPartitionMode() && sender==null)
+   throw new EventHubException("Sender is null");
+   else if(!boltConfig.getPartitionMode() && 
ehClient!=null)
+   ehClient.send(sendEvent).get();
+   else if(!boltConfig.getPartitionMode() && 
ehClient==null)
+   throw new EventHubException("ehclient is null");
collector.ack(tuple);
} catch (EventHubException ex) {
collector.reportError(ex);
collector.fail(tuple);
}
+   catch (Exception e){
+
+   }
--- End diff --

Why catch-all ?
This could potentially make this bolt a blackhole :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1951: STORM-2371 Implementing new eventhub driver

2017-02-23 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/1951#discussion_r102768443
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
 ---
@@ -70,10 +69,9 @@ public void prepare(Map config, TopologyContext context,
logger.info("creating sender: " + 
boltConfig.getConnectionString()
+ ", " + boltConfig.getEntityPath() + ", " + 
myPartitionId);
try {
-   EventHubClient eventHubClient = EventHubClient.create(
-   boltConfig.getConnectionString(),
-   boltConfig.getEntityPath());
-   sender = 
eventHubClient.createPartitionSender(myPartitionId);
+   ehClient = 
EventHubClient.createFromConnectionString(boltConfig.getConnectionString()).get();
--- End diff --

>get [](start = 90, length = 3)

use methods ending with sync(...) - which could return cleaner exception 
stacks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1951: STORM-2371 Implementing new eventhub driver

2017-02-23 Thread SreeramGarlapati
Github user SreeramGarlapati commented on a diff in the pull request:

https://github.com/apache/storm/pull/1951#discussion_r102767172
  
--- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
 ---
@@ -22,10 +22,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.microsoft.eventhubs.client.EventHubClient;
+import com.microsoft.azure.eventhubs.*;
--- End diff --

>.*; [](start = 36, length = 3)

nit: keep them expanded - usual pattern in OS is to specifically import


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---