Revision: 18325
http://sourceforge.net/p/gate/code/18325
Author: adamfunk
Date: 2014-09-12 13:45:45 +0000 (Fri, 12 Sep 2014)
Log Message:
-----------
Added stream-reading code
Modified Paths:
--------------
gate/branches/twitter-pop-dev/plugins/Twitter/src/gate/corpora/twitter/TweetStreamIterator.java
Modified:
gate/branches/twitter-pop-dev/plugins/Twitter/src/gate/corpora/twitter/TweetStreamIterator.java
===================================================================
---
gate/branches/twitter-pop-dev/plugins/Twitter/src/gate/corpora/twitter/TweetStreamIterator.java
2014-09-12 12:22:38 UTC (rev 18324)
+++
gate/branches/twitter-pop-dev/plugins/Twitter/src/gate/corpora/twitter/TweetStreamIterator.java
2014-09-12 13:45:45 UTC (rev 18325)
@@ -11,14 +11,11 @@
*/
package gate.corpora.twitter;
-import gate.Document;
-import gate.Factory;
-import gate.FeatureMap;
-
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.List;
+import java.util.zip.GZIPInputStream;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
@@ -28,36 +25,43 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+
public class TweetStreamIterator implements Iterator<Tweet> {
// Borrowed from gcp IOConstants
public static final String ID_POINTER = "/id_str";
+ public static final String SEARCH_KEY = "search_metadata";
+ public static final String STATUS_KEY = "statuses";
-
private ObjectMapper objectMapper;
private JsonParser jsonParser;
private MappingIterator<JsonNode> iterator;
- private boolean gzip;
private List<String> contentKeys, featureKeys;
protected JsonPointer idPointer;
+ private boolean nested;
+ private Iterator<JsonNode> nestedStatuses;
public TweetStreamIterator(InputStream input, List<String> contentKeys,
List<String> featureKeys, boolean gzip) throws JsonParseException,
IOException {
this.contentKeys = contentKeys;
this.featureKeys = featureKeys;
- this.gzip = gzip;
+ InputStream workingInput;
+ // Following borrowed from gcp JSONStreamingInputHandler
+ idPointer = JsonPointer.compile(ID_POINTER);
+ objectMapper = new ObjectMapper();
+
if (gzip) {
- throw new IllegalArgumentException("gzip not yet supported!");
+ workingInput = new GZIPInputStream(input);
}
- // TODO support compression
+ else {
+ workingInput = input;
+ }
- // Following borrowed from gcp JSONStreamingInputHandler
- idPointer = JsonPointer.compile(ID_POINTER);
- objectMapper = new ObjectMapper();
- jsonParser =
objectMapper.getFactory().createParser(input).enable(Feature.AUTO_CLOSE_SOURCE);
+ jsonParser =
objectMapper.getFactory().createParser(workingInput).enable(Feature.AUTO_CLOSE_SOURCE);
// If the first token in the stream is the start of an array ("[")
// then
// assume the stream as a whole is an array of objects, one per
@@ -69,44 +73,83 @@
jsonParser.clearCurrentToken();
}
iterator = objectMapper.readValues(jsonParser, JsonNode.class);
+ this.nested = false;
+ this.nestedStatuses = null;
}
@Override
public boolean hasNext() {
- return iterator.hasNext();
- // should this be hasNextValue() ?
+ return this.iterator.hasNext() || this.nestedStatuses.hasNext();
+ // should that be iterator.hasNextValue() ?
}
@Override
public Tweet next() {
+ Tweet result = null;
try {
- // why while not if?
- while(iterator.hasNextValue()) {
+ if (this.nested && this.nestedStatuses.hasNext()) {
+ result = Tweet.readTweet(this.nestedStatuses.next(), contentKeys,
featureKeys);
+ // Clear the nested flag once the last item in the statuses
+ // value's list has been used, so that the next call to next()
+ // will drop to the else if clause.
+ this.nested = this.nestedStatuses.hasNext();
+ }
+
+ else if (iterator.hasNextValue()) {
JsonNode json = iterator.nextValue();
- String id = json.at(idPointer).asText();
- // Is it worth testing IDs here?
- return Tweet.readTweet(json, contentKeys, featureKeys);
+
+ if (isSearchResultList(json)) {
+ this.nestedStatuses = getStatuses(json).iterator();
+ this.nested = this.nestedStatuses.hasNext();
+ // Set the nested flag according as there is anything in
+ // the statuses value array (it could be empty).
+ }
+
+ // Test nested now: true IFF we are in a search result thingy AND
+ // the statuses array is non-empty.
+ if (this.nested) {
+ result = Tweet.readTweet(this.nestedStatuses.next(), contentKeys,
featureKeys);
+ // Set the nested flag again for the next call to next()
+ this.nested = this.nestedStatuses.hasNext();
+ }
+ else {
+ result = Tweet.readTweet(json, contentKeys, featureKeys);
+ }
}
}
catch (IOException e) {
e.printStackTrace();
}
- return null;
+ return result;
}
+
@Override
public void remove() {
- // TODO Auto-generated method stub
-
+ throw new UnsupportedOperationException("The TweetStream is read-only.");
}
- public void close() {
- // TODO
+ public void close() throws IOException {
+ iterator.close();
+ jsonParser.close();
}
+ public static boolean isSearchResultList(JsonNode json) {
+ return json.has(SEARCH_KEY) && json.has(STATUS_KEY);
+ }
+ public static ArrayNode getStatuses(JsonNode json) throws IOException {
+ JsonNode statusList = json.get(STATUS_KEY);
+ if (! (statusList instanceof ArrayNode)) {
+ throw new IOException("Bad tweet format: value of 'statuses' is not an
array!");
+ }
+ return (ArrayNode) statusList;
+ }
+
+
+
}
This was sent by the SourceForge.net collaborative development platform, the
world's largest Open Source development site.
------------------------------------------------------------------------------
Want excitement?
Manually upgrade your production database.
When you want reliability, choose Perforce
Perforce version control. Predictably reliable.
http://pubads.g.doubleclick.net/gampad/clk?id=157508191&iu=/4140/ostg.clktrk
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs