Repository: nifi
Updated Branches:
  refs/heads/master 4f9219315 -> 706edeb01


NIFI-855 Add location bounding bix filter to twitter processor.

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3cca0465
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3cca0465
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3cca0465

Branch: refs/heads/master
Commit: 3cca0465508b8b821678ca77f547a3c2589667d5
Parents: 4f92193
Author: Chris Mangold <[email protected]>
Authored: Mon Aug 17 11:09:10 2015 -0400
Committer: Bryan Bende <[email protected]>
Committed: Wed Aug 19 20:42:27 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/twitter/GetTwitter.java     | 93 ++++++++++++++++++--
 1 file changed, 88 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/3cca0465/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
 
b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
index f24b086..d487492 100644
--- 
a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
+++ 
b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
@@ -57,6 +57,8 @@ import org.apache.nifi.processor.util.StandardValidators;
 import com.twitter.hbc.ClientBuilder;
 import com.twitter.hbc.core.Client;
 import com.twitter.hbc.core.Constants;
+import com.twitter.hbc.core.endpoint.Location.Coordinate ;
+import com.twitter.hbc.core.endpoint.Location ;
 import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
 import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
 import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
@@ -98,7 +100,7 @@ public class GetTwitter extends AbstractProcessor {
             .build();
     public static final PropertyDescriptor ACCESS_TOKEN = new 
PropertyDescriptor.Builder()
             .name("Access Token")
-            .description("The Acces Token provided by Twitter")
+            .description("The Access Token provided by Twitter")
             .required(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
@@ -121,6 +123,13 @@ public class GetTwitter extends AbstractProcessor {
             .required(false)
             .addValidator(new FollowingValidator())
             .build();
+    public static final PropertyDescriptor LOCATIONS = new 
PropertyDescriptor.Builder()
+            .name("Location to Filter")
+            .description(" Bounding box for filtering tweets. Enter SW and NE 
corners in field (longitude, latitude, longitude, latitude). Example" +
+                    "-77.55661, 39.25831, -77.14325, 39.5374")
+            .addValidator(new LocationValidator() )
+            .required(false)
+            .build();
     public static final PropertyDescriptor TERMS = new 
PropertyDescriptor.Builder()
             .name("Terms to Filter On")
             .description("A comma-separated list of terms to filter on. 
Ignored unless Endpoint is set to 'Filter Endpoint'."
@@ -155,6 +164,7 @@ public class GetTwitter extends AbstractProcessor {
         descriptors.add(LANGUAGES);
         descriptors.add(TERMS);
         descriptors.add(FOLLOWING);
+        descriptors.add(LOCATIONS);
         this.descriptors = Collections.unmodifiableList(descriptors);
 
         final Set<Relationship> relationships = new HashSet<>();
@@ -189,9 +199,10 @@ public class GetTwitter extends AbstractProcessor {
         final String endpointName = 
validationContext.getProperty(ENDPOINT).getValue();
 
         if (ENDPOINT_FILTER.getValue().equals(endpointName)) {
-            if (!validationContext.getProperty(TERMS).isSet() && 
!validationContext.getProperty(FOLLOWING).isSet()) {
+            if (!validationContext.getProperty(TERMS).isSet() && 
!validationContext.getProperty(FOLLOWING).isSet() && 
!validationContext.getProperty(LOCATIONS).isSet()) {
                 results.add(new 
ValidationResult.Builder().input("").subject(FOLLOWING.getName())
-                        .valid(false).explanation("When using the 'Filter 
Endpoint', at least one of '" + TERMS.getName() + "' or '" + 
FOLLOWING.getName() + "' must be set").build());
+                        .valid(false).explanation("When using the 'Filter 
Endpoint', at least one of '" + TERMS.getName() + "' or '" + 
FOLLOWING.getName() + "'" +
+                                "' or '" + LOCATIONS.getName() + " must be 
set").build());
             }
         }
 
@@ -206,6 +217,8 @@ public class GetTwitter extends AbstractProcessor {
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) throws 
MalformedURLException {
+        messageQueue = new LinkedBlockingQueue<>(100000);
+
         final String endpointName = context.getProperty(ENDPOINT).getValue();
         final Authentication oauth = new 
OAuth1(context.getProperty(CONSUMER_KEY).getValue(),
                 context.getProperty(CONSUMER_SECRET).getValue(),
@@ -284,7 +297,34 @@ public class GetTwitter extends AbstractProcessor {
             if (languages != null) {
                 filterEndpoint.languages(languages);
             }
-            streamingEndpoint = filterEndpoint;
+
+            final String locationString = 
context.getProperty(LOCATIONS).getValue();
+            final String[] corSplit = locationString.split(",");
+
+            final List<Location> locationIds ;
+
+            double swLon = Double.parseDouble( corSplit[0] ) ;
+            double neLon = Double.parseDouble( corSplit[2] ) ;
+            double swLat = Double.parseDouble( corSplit[1] ) ;
+            double neLat = Double.parseDouble( corSplit[3] ) ;
+
+            Coordinate sw = new Coordinate( swLon, swLat ) ;
+            Coordinate ne = new Coordinate( neLon, neLat ) ;
+            Location bbox = new Location ( sw, ne ) ;
+
+            if ( locationString == null ) {
+                locationIds = Collections.emptyList();
+            } else {
+                locationIds = new ArrayList<>();
+                locationIds.add( bbox );
+            }
+
+            if ( !locationIds.isEmpty() ) {
+                filterEndpoint.locations(locationIds);
+            }
+
+            streamingEndpoint = filterEndpoint ;
+
         } else {
             throw new AssertionError("Endpoint was invalid value: " + 
endpointName);
         }
@@ -359,4 +399,47 @@ public class GetTwitter extends AbstractProcessor {
         }
 
     }
-}
+
+    private static class LocationValidator implements Validator {
+
+        private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
+
+        @Override
+        public ValidationResult validate(final String subject, final String 
input, final ValidationContext context) {
+            final String[] splits = input.split(",");
+            if ( splits.length != 4 ) {
+
+                return new 
ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Must
 be comma-separated list of coordinates, SW and NE corners of your bounding 
box.").build();
+
+            } else {
+
+                try {
+                    // Validate longitude coordinates
+                    double swLon = Double.parseDouble(splits[0]);
+                    double neLon = Double.parseDouble(splits[2]);
+                    if (swLon < neLon) {
+
+                        // Validate latitude coordinates
+                        double swLat = Double.parseDouble(splits[1]);
+                        double neLat = Double.parseDouble(splits[3]);
+                        if (swLat < neLat) {
+                            return new 
ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+
+                        } else {
+                            return new 
ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("SW
 Latitude must be less than NE Latitude.").build();
+                        }
+
+                    } else {
+                        return new 
ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("SW
 Longitude must be less than NE Longitude.").build();
+                    }
+                } catch ( Exception e ) {
+                    return new 
ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Bounding
 box location parse failure.").build();
+                }
+
+            }
+
+        }
+
+    }
+
+}
\ No newline at end of file

Reply via email to