Xikui Wang has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1002
Change subject: This patch includes following changes: ...................................................................... This patch includes following changes: 1. ExtendedTweetParser to parse more than fix attributes. 2. Changed the twitter feeds message unit from Status to String. 3. Fixed bug ASTERIXDB-1471. 4. Fixed bug ASTERIXDB-1352. Change-Id: I7021e7b779de05b9ec999a8d5f8464fb0ab413c0 --- M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java M asterixdb/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java 12 files changed, 408 insertions(+), 149 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/02/1002/1 diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java index e31325a..80e716d 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java @@ -28,13 +28,9 @@ import org.apache.asterix.external.util.FeedLogManager; import org.apache.hyracks.api.exceptions.HyracksDataException; -import twitter4j.Query; -import twitter4j.QueryResult; -import twitter4j.Status; -import twitter4j.Twitter; -import twitter4j.TwitterException; +import twitter4j.*; -public class TwitterPullRecordReader implements IRecordReader<Status> { +public class TwitterPullRecordReader implements IRecordReader<String> { private Query query; private Twitter twitter; @@ -42,14 +38,14 @@ private QueryResult result; private int nextTweetIndex = 0; private long lastTweetIdReceived = 0; - private GenericRecord<Status> record; + private GenericRecord<String> record; public TwitterPullRecordReader(Twitter twitter, String keywords, int requestInterval) { this.twitter = twitter; this.requestInterval = requestInterval; this.query = new Query(keywords); this.query.setCount(100); - this.record = new GenericRecord<Status>(); + this.record = new GenericRecord<>(); } @Override @@ -62,7 +58,7 @@ } @Override - public IRawRecord<Status> next() throws IOException, InterruptedException { + public IRawRecord<String> next() throws IOException, InterruptedException { if (result == null || nextTweetIndex >= result.getTweets().size()) { Thread.sleep(1000 * requestInterval); query.setSinceId(lastTweetIdReceived); @@ -79,7 +75,8 @@ if (lastTweetIdReceived < tweet.getId()) { lastTweetIdReceived = tweet.getId(); } - record.set(tweet); + String jsonTweet = TwitterObjectFactory.getRawJSON(tweet); // transform tweet obj to json + record.set(jsonTweet); return record; } else { return null; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java index f04cdb9..02c3963 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java @@ -27,30 +27,25 @@ import org.apache.asterix.external.input.record.GenericRecord; import org.apache.asterix.external.util.FeedLogManager; -import twitter4j.FilterQuery; -import twitter4j.StallWarning; -import twitter4j.Status; -import twitter4j.StatusDeletionNotice; -import twitter4j.StatusListener; -import twitter4j.TwitterStream; +import twitter4j.*; -public class TwitterPushRecordReader implements IRecordReader<Status> { - private LinkedBlockingQueue<Status> inputQ; +public class TwitterPushRecordReader implements IRecordReader<String> { + private LinkedBlockingQueue<String> inputQ; private TwitterStream twitterStream; - private GenericRecord<Status> record; + private GenericRecord<String> record; private boolean closed = false; public TwitterPushRecordReader(TwitterStream twitterStream, FilterQuery query) { - record = new GenericRecord<Status>(); - inputQ = new LinkedBlockingQueue<Status>(); + record = new GenericRecord<>(); + inputQ = new LinkedBlockingQueue<>(); this.twitterStream = twitterStream;//TwitterUtil.getTwitterStream(configuration); this.twitterStream.addListener(new TweetListener(inputQ)); this.twitterStream.filter(query); } public TwitterPushRecordReader(TwitterStream twitterStream) { - record = new GenericRecord<Status>(); - inputQ = new LinkedBlockingQueue<Status>(); + record = new GenericRecord<>(); + inputQ = new LinkedBlockingQueue<>(); this.twitterStream = twitterStream;// this.twitterStream.addListener(new TweetListener(inputQ)); twitterStream.sample(); @@ -72,8 +67,8 @@ } @Override - public IRawRecord<Status> next() throws IOException, InterruptedException { - Status tweet = inputQ.poll(); + public IRawRecord<String> next() throws IOException, InterruptedException { + String tweet = inputQ.poll(); if (tweet == null) { return null; } @@ -93,15 +88,16 @@ private class TweetListener implements StatusListener { - private LinkedBlockingQueue<Status> inputQ; + private LinkedBlockingQueue<String> inputQ; - public TweetListener(LinkedBlockingQueue<Status> inputQ) { + public TweetListener(LinkedBlockingQueue<String> inputQ) { this.inputQ = inputQ; } @Override public void onStatus(Status tweet) { - inputQ.add(tweet); + String jsonTweet = TwitterObjectFactory.getRawJSON(tweet); + inputQ.add(jsonTweet); } @Override diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java index 541737a..172b22b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java @@ -37,7 +37,7 @@ import twitter4j.FilterQuery; import twitter4j.Status; -public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status> { +public class TwitterRecordReaderFactory implements IRecordReaderFactory<String> { private static final long serialVersionUID = 1L; private static final Logger LOGGER = Logger.getLogger(TwitterRecordReaderFactory.class.getName()); @@ -114,7 +114,7 @@ } @Override - public IRecordReader<? extends Status> createRecordReader(IHyracksTaskContext ctx, int partition) + public IRecordReader<? extends String> createRecordReader(IHyracksTaskContext ctx, int partition) throws HyracksDataException { if (pull) { return new TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration), @@ -133,8 +133,8 @@ } @Override - public Class<? extends Status> getRecordClass() { - return Status.class; + public Class<? extends String> getRecordClass() { + return String.class; } private boolean validateConfiguration(Map<String, String> configuration) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java index 5923354..e686b97 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java @@ -70,23 +70,12 @@ import org.apache.asterix.external.library.java.JObjects.JString; import org.apache.asterix.external.library.java.JObjects.JTime; import org.apache.asterix.external.library.java.JObjects.JUnorderedList; -import org.apache.asterix.om.base.ACircle; -import org.apache.asterix.om.base.ADuration; -import org.apache.asterix.om.base.ALine; -import org.apache.asterix.om.base.APoint; -import org.apache.asterix.om.base.APoint3D; -import org.apache.asterix.om.base.APolygon; -import org.apache.asterix.om.base.ARectangle; +import org.apache.asterix.om.base.*; import org.apache.asterix.om.pointables.AFlatValuePointable; import org.apache.asterix.om.pointables.AListVisitablePointable; import org.apache.asterix.om.pointables.ARecordVisitablePointable; import org.apache.asterix.om.pointables.base.IVisitablePointable; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.om.types.AbstractCollectionType; -import org.apache.asterix.om.types.BuiltinType; -import org.apache.asterix.om.types.EnumDeserializer; -import org.apache.asterix.om.types.IAType; +import org.apache.asterix.om.types.*; import org.apache.asterix.om.util.container.IObjectPool; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.util.string.UTF8StringReader; @@ -471,9 +460,14 @@ for (IVisitablePointable fieldPointable : fieldPointables) { closedPart = index < recordType.getFieldTypes().length; IVisitablePointable tt = fieldTypeTags.get(index); - IAType fieldType = closedPart ? recordType.getFieldTypes()[index] : null; ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER .deserialize(tt.getByteArray()[tt.getStartOffset()]); + IAType fieldType = null; + if(closedPart){ + fieldType = recordType.getFieldTypes()[index]; + } + else + fieldType = ATypeMachine(typeTag); IVisitablePointable fieldName = fieldNames.get(index); typeInfo.reset(fieldType, typeTag); switch (typeTag) { @@ -539,12 +533,13 @@ IJObject listItem = null; int index = 0; try { - for (IVisitablePointable itemPointable : items) { IVisitablePointable itemTagPointable = itemTags.get(index); ATypeTag itemTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER .deserialize(itemTagPointable.getByteArray()[itemTagPointable.getStartOffset()]); - typeInfo.reset(listType.getType(), listType.getTypeTag()); + IAType fieldType = null; + fieldType = ATypeMachine(itemTypeTag); + typeInfo.reset(fieldType, itemTypeTag); switch (itemTypeTag) { case RECORD: listItem = pointableVisitor.visit((ARecordVisitablePointable) itemPointable, typeInfo); @@ -557,10 +552,7 @@ throw new IllegalArgumentException( "Cannot parse list item of type " + listType.getTypeTag()); default: - IAType itemType = ((AbstractCollectionType) listType).getItemType(); - typeInfo.reset(itemType, itemType.getTypeTag()); listItem = pointableVisitor.visit((AFlatValuePointable) itemPointable, typeInfo); - } list.add(listItem); } @@ -580,4 +572,65 @@ } } + + public static IAType ATypeMachine(ATypeTag typeTag){ + IAType aType = null; + switch (typeTag){ + case BOOLEAN: + aType = BuiltinType.ABOOLEAN; + break; + case INT8: + aType = BuiltinType.AINT8; + break; + case INT16: + aType = BuiltinType.AINT16; + break; + case INT32: + aType = BuiltinType.AINT32; + break; + case INT64: + aType = BuiltinType.AINT64; + break; + case FLOAT: + aType = BuiltinType.AFLOAT; + break; + case DOUBLE: + aType = BuiltinType.ADOUBLE; + break; + case STRING: + aType = BuiltinType.ASTRING; + break; + case POINT: + aType = BuiltinType.APOINT; + break; + case POINT3D: + aType = BuiltinType.APOINT3D; + break; + case LINE: + aType = BuiltinType.ALINE; + break; + case DATE: + aType = BuiltinType.ADATE; + break; + case DATETIME: + aType = BuiltinType.ADATETIME; + break; + case DURATION: + aType = BuiltinType.ADURATION; + break; + case RECORD: + aType = ARecordType.FULLY_OPEN_RECORD_TYPE; + break; + case UNORDEREDLIST: + aType = AUnorderedListType.FULLY_OPEN_UNORDEREDLIST_TYPE; + break; + case ORDEREDLIST: + aType = AOrderedListType.FULL_OPEN_ORDEREDLIST_TYPE; + break; + default: + aType = BuiltinType.ANY; + break; + } + return aType; + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java index 522da06..fe3ec65 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java @@ -18,111 +18,234 @@ */ package org.apache.asterix.external.parser; -import java.io.DataOutput; -import java.util.HashMap; -import java.util.Map; - -import org.apache.asterix.builders.RecordBuilder; -import org.apache.asterix.external.api.IDataParser; +import org.apache.asterix.builders.*; import org.apache.asterix.external.api.IRawRecord; import org.apache.asterix.external.api.IRecordDataParser; -import org.apache.asterix.external.library.java.JObjectUtil; -import org.apache.asterix.external.util.Datatypes.Tweet; -import org.apache.asterix.om.base.AMutableDouble; -import org.apache.asterix.om.base.AMutableInt32; -import org.apache.asterix.om.base.AMutableRecord; +import org.apache.asterix.om.base.AMutablePoint; import org.apache.asterix.om.base.AMutableString; -import org.apache.asterix.om.base.IAObject; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.om.types.IAType; +import org.apache.asterix.om.base.ANull; +import org.apache.asterix.om.types.*; +import org.apache.asterix.om.util.container.IObjectPool; +import org.apache.asterix.om.util.container.ListObjectPool; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IMutableValueStorage; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.storage.am.common.ophelpers.LongArrayList; +import org.apache.hyracks.util.string.UTF8StringWriter; +import org.eclipse.jetty.util.ajax.JSON; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; -import twitter4j.Status; -import twitter4j.User; +import java.io.DataOutput; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; -public class TweetParser implements IRecordDataParser<Status> { +import static org.apache.asterix.om.types.ATypeTag.*; - private IAObject[] mutableTweetFields; - private IAObject[] mutableUserFields; - private AMutableRecord mutableRecord; - private AMutableRecord mutableUser; - private final Map<String, Integer> userFieldNameMap = new HashMap<>(); - private final Map<String, Integer> tweetFieldNameMap = new HashMap<>(); - private RecordBuilder recordBuilder = new RecordBuilder(); +public class TweetParser extends AbstractDataParser implements IRecordDataParser<String> { + private final IObjectPool<IARecordBuilder, ATypeTag> recordBuilderPool = new ListObjectPool<IARecordBuilder, ATypeTag>( + new RecordBuilderFactory()); + private final IObjectPool<IAsterixListBuilder, ATypeTag> listBuilderPool = new ListObjectPool<IAsterixListBuilder, ATypeTag>( + new ListBuilderFactory()); + private final IObjectPool<IMutableValueStorage, ATypeTag> abvsBuilderPool = new ListObjectPool<IMutableValueStorage, ATypeTag>( + new AbvsBuilderFactory()); + private ARecordType recordType; + private UTF8StringWriter utf8Writer = new UTF8StringWriter(); public TweetParser(ARecordType recordType) { - initFieldNames(recordType); - mutableUserFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableInt32(0), - new AMutableInt32(0), new AMutableString(null), new AMutableInt32(0) }; - mutableUser = new AMutableRecord((ARecordType) recordType.getFieldTypes()[tweetFieldNameMap.get(Tweet.USER)], - mutableUserFields); - - mutableTweetFields = new IAObject[] { new AMutableString(null), mutableUser, new AMutableDouble(0), - new AMutableDouble(0), new AMutableString(null), new AMutableString(null) }; - mutableRecord = new AMutableRecord(recordType, mutableTweetFields); + this.recordType = recordType; } - // Initialize the hashmap values for the field names and positions - private void initFieldNames(ARecordType recordType) { - String tweetFields[] = recordType.getFieldNames(); - for (int i = 0; i < tweetFields.length; i++) { - tweetFieldNameMap.put(tweetFields[i], i); - if (tweetFields[i].equals(Tweet.USER)) { - IAType fieldType = recordType.getFieldTypes()[i]; - if (fieldType.getTypeTag() == ATypeTag.RECORD) { - String userFields[] = ((ARecordType) fieldType).getFieldNames(); - for (int j = 0; j < userFields.length; j++) { - userFieldNameMap.put(userFields[j], j); - } - } + private void parseUnorderedList(JSONArray jArray, DataOutput output) throws IOException, JSONException { + ArrayBackedValueStorage itemBuffer = getTempBuffer(); + UnorderedListBuilder unorderedListBuilder = (UnorderedListBuilder) getUnorderedListBuilder(); + unorderedListBuilder.reset(null); + for (int iter1 = 0; iter1 < jArray.length(); iter1++) { + itemBuffer.reset(); + if(writeField(jArray.get(iter1),null,itemBuffer.getDataOutput())) + unorderedListBuilder.addItem(itemBuffer); + } + unorderedListBuilder.write(output, true); + } + + private boolean writeField(Object fieldObj, IAType fieldType, DataOutput out) + throws IOException, JSONException { + // save fieldType for closed type check + String nstt; + boolean writeResult = true; + if(fieldType!=null){ + if(fieldType.getTypeTag() == BuiltinType.ASTRING.getTypeTag()){ + out.write(BuiltinType.ASTRING.getTypeTag().serialize()); + utf8Writer.writeUTF8(fieldObj.toString(), out); + } + else if (fieldType.getTypeTag() == INT64){ + aInt64.setValue((long) fieldObj); + int64Serde.serialize(aInt64, out); + } + else if(fieldType.getTypeTag() == INT32){ + out.write(BuiltinType.AINT32.getTypeTag().serialize()); + out.writeInt((Integer) fieldObj); + } + else if(fieldType.getTypeTag() == DOUBLE){ + out.write(BuiltinType.ADOUBLE.getTypeTag().serialize()); + out.writeDouble((Double) fieldObj); + } + else if(fieldType.getTypeTag() == BOOLEAN){ + out.write(BuiltinType.ABOOLEAN.getTypeTag().serialize()); + out.writeBoolean((Boolean) fieldObj); + } + else if(fieldType.getTypeTag() == RECORD){ + writeRecord((JSONObject)fieldObj, out,(ARecordType) fieldType); + } + else + writeResult = false; + } + else { + try { + if (fieldObj == JSONObject.NULL) { + nullSerde.serialize(ANull.NULL, out); + } else if (fieldObj instanceof Integer) { + out.write(BuiltinType.AINT32.getTypeTag().serialize()); + out.writeInt((Integer) fieldObj); + } else if (fieldObj instanceof Boolean) { + out.write(BuiltinType.ABOOLEAN.getTypeTag().serialize()); + out.writeBoolean((Boolean) fieldObj); + } else if (fieldObj instanceof Double) { + out.write(BuiltinType.ADOUBLE.getTypeTag().serialize()); + out.writeDouble((Double) fieldObj); + } else if (fieldObj instanceof Long) { + out.write(BuiltinType.AINT64.getTypeTag().serialize()); + out.writeLong((Long) fieldObj); + } else if (fieldObj instanceof String) { + out.write(BuiltinType.ASTRING.getTypeTag().serialize()); + utf8Writer.writeUTF8((String) fieldObj, out); + } else if (fieldObj instanceof JSONArray) { + if (((JSONArray) fieldObj).length() != 0) + parseUnorderedList((JSONArray) fieldObj, out); + else + writeResult = false; + } else if (fieldObj instanceof JSONObject) { + if (((JSONObject) fieldObj).length() != 0) + writeRecord((JSONObject) fieldObj, out, null); + else + writeResult = false; + } + } catch (JSONException e) { + writeResult = false; } } + return writeResult; } + private int checkAttrNameIdx(String[] nameList, String name){ + int idx = 0; + if(nameList!=null) + for(String nln :nameList){ + if(name.equals(nln)) + return idx; + idx++; + } + return -1; + } + + public void writeRecord(JSONObject obj, DataOutput out, ARecordType curRecType) throws IOException, JSONException { + IAType[] curTypes = null; + String[] curFNames = null; + + ArrayBackedValueStorage fieldValueBuffer = getTempBuffer(); + ArrayBackedValueStorage fieldNameBuffer = getTempBuffer(); + IARecordBuilder recBuilder = getRecordBuilder(); + + int fieldN; + int attrIdx; + + if (curRecType != null) { + curTypes = curRecType.getFieldTypes(); + curFNames = curRecType.getFieldNames(); + } + + recBuilder.reset(curRecType); + recBuilder.init(); + + if(curRecType!=null && !curRecType.isOpen()){ + // closed record type + fieldN = curFNames.length; + for (int iter1 = 0; iter1 < fieldN; iter1++) { + fieldValueBuffer.reset(); + DataOutput fieldOutput = fieldValueBuffer.getDataOutput(); + if(obj.isNull(curFNames[iter1])){ + if(curRecType.isClosedField(curFNames[iter1])) + throw new HyracksDataException("Closed field "+curFNames[iter1]+" has null value."); + else + continue; + } + else { + if (writeField(obj.get(curFNames[iter1]), curTypes[iter1], fieldOutput)) + recBuilder.addField(iter1, fieldValueBuffer); + } + } + } else{ + //open record type + int closedFieldCount = 0; + IAType curFieldType = null; + for (String attrName : JSONObject.getNames(obj)){ + if(obj.isNull(attrName)||obj.length()==0) continue; + attrIdx = checkAttrNameIdx(curFNames, attrName); + if(curRecType!=null) + curFieldType = curRecType.getFieldType(attrName); + fieldValueBuffer.reset(); + fieldNameBuffer.reset(); + DataOutput fieldOutput = fieldValueBuffer.getDataOutput(); + if (writeField(obj.get(attrName), curFieldType, fieldOutput)) { + if(attrIdx == -1){ + aString.setValue(attrName); + stringSerde.serialize(aString, fieldNameBuffer.getDataOutput()); + recBuilder.addField(fieldNameBuffer, fieldValueBuffer); + } + else { + recBuilder.addField(attrIdx, fieldValueBuffer); + closedFieldCount++; + } + } + } + if(curRecType!=null && closedFieldCount<curFNames.length) + throw new HyracksDataException("Non-null field is null"); + } + recBuilder.write(out, true); + } + + + private IARecordBuilder getRecordBuilder() { + return recordBuilderPool.allocate(ATypeTag.RECORD); + } + + private IAsterixListBuilder getUnorderedListBuilder() { + return listBuilderPool.allocate(ATypeTag.UNORDEREDLIST); + } + + private ArrayBackedValueStorage getTempBuffer() { + return (ArrayBackedValueStorage) abvsBuilderPool.allocate(ATypeTag.BINARY); + } + + @Override - public void parse(IRawRecord<? extends Status> record, DataOutput out) throws HyracksDataException { - Status tweet = record.get(); - User user = tweet.getUser(); - // Tweet user data - ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.SCREEN_NAME)]) - .setValue(JObjectUtil.getNormalizedString(user.getScreenName())); - ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.LANGUAGE)]) - .setValue(JObjectUtil.getNormalizedString(user.getLang())); - ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.FRIENDS_COUNT)]).setValue(user.getFriendsCount()); - ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.STATUS_COUNT)]).setValue(user.getStatusesCount()); - ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.NAME)]) - .setValue(JObjectUtil.getNormalizedString(user.getName())); - ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.FOLLOWERS_COUNT)]) - .setValue(user.getFollowersCount()); - - // Tweet data - ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.ID)]).setValue(String.valueOf(tweet.getId())); - - int userPos = tweetFieldNameMap.get(Tweet.USER); - for (int i = 0; i < mutableUserFields.length; i++) { - ((AMutableRecord) mutableTweetFields[userPos]).setValueAtPos(i, mutableUserFields[i]); + public void parse(IRawRecord<? extends String> record, DataOutput out) throws HyracksDataException { + try { + //TODO get rid of this temporary json + resetPools(); + JSONObject jsObj = new JSONObject(record.get()); + writeRecord(jsObj, out, recordType); + } catch (Exception e) { + throw new HyracksDataException(e); } - if (tweet.getGeoLocation() != null) { - ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)]) - .setValue(tweet.getGeoLocation().getLatitude()); - ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)]) - .setValue(tweet.getGeoLocation().getLongitude()); - } else { - ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)]).setValue(0); - ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)]).setValue(0); - } - ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.CREATED_AT)]) - .setValue(JObjectUtil.getNormalizedString(tweet.getCreatedAt().toString())); - ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.MESSAGE)]) - .setValue(JObjectUtil.getNormalizedString(tweet.getText())); + } - for (int i = 0; i < mutableTweetFields.length; i++) { - mutableRecord.setValueAtPos(i, mutableTweetFields[i]); - } - recordBuilder.reset(mutableRecord.getType()); - recordBuilder.init(); - IDataParser.writeRecord(mutableRecord, out, recordBuilder); + private void resetPools() { + listBuilderPool.reset(); + recordBuilderPool.reset(); + abvsBuilderPool.reset(); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java index d6e536d..7e0bee5 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java @@ -28,7 +28,7 @@ import twitter4j.Status; -public class TweetParserFactory implements IRecordDataParserFactory<Status> { +public class TweetParserFactory implements IRecordDataParserFactory<String> { private static final long serialVersionUID = 1L; private ARecordType recordType; @@ -44,14 +44,14 @@ } @Override - public IRecordDataParser<Status> createRecordParser(IHyracksTaskContext ctx) { + public IRecordDataParser<String> createRecordParser(IHyracksTaskContext ctx) { TweetParser dataParser = new TweetParser(recordType); return dataParser; } @Override - public Class<? extends Status> getRecordClass() { - return Status.class; + public Class<? extends String> getRecordClass() { + return String.class; } @Override diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java index e1a7911..42214c8 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java @@ -42,21 +42,103 @@ public static class Tweet { public static final String ID = "id"; public static final String USER = "user"; - public static final String LATITUDE = "latitude"; - public static final String LONGITUDE = "longitude"; + public static final String GEOLOCATION = "geo"; public static final String CREATED_AT = "created_at"; - public static final String MESSAGE = "message_text"; - + public static final String TEXT = "text"; public static final String COUNTRY = "country"; + public static final String PLACE = "place"; + public static final String SOURCE = "source"; + public static final String TRUNCATED = "truncated"; + public static final String IN_REPLY_TO_STATUS_ID = "in_reply_to_status_id"; + public static final String IN_REPLY_TO_USER_ID = "in_reply_to_user_id"; + public static final String IN_REPLY_TO_SCREENNAME = "in_reply_to_screen_name"; + public static final String FAVORITED = "favorited"; + public static final String RETWEETED = "retweeted"; + public static final String FAVORITE_COUNT = "favorite_count"; + public static final String RETWEET_COUNT = "retweet_count"; + public static final String CONTRIBUTORS = "contributors"; + public static final String LANGUAGE = "lang"; + public static final String FILTER_LEVEL = "filter_level"; + public static final String TIMESTAMP_MS = "timestamp_ms"; + public static final String IS_QUOTE_STATUS = "is_quote_status"; + // in API but not int JSON +// public static final String SENSITIVE = "sensitive"; +// public static final String RETWEETED_BY_ME = "retweeted_by_me"; +// public static final String CURRENT_USER_RETWEET_ID = "current_user_retweet_id"; + // consistency consider + public static final String MESSAGE = "text_message"; + public static final String LATITUDE = "latitude"; + public static final String LONGITUDE = "longititude"; // User fields (for the sub record "user") public static final String SCREEN_NAME = "screen_name"; - public static final String LANGUAGE = "language"; + public static final String USER_PREFERRED_LANGUAGE = "user_preferred_language"; public static final String FRIENDS_COUNT = "friends_count"; public static final String STATUS_COUNT = "status_count"; public static final String NAME = "name"; public static final String FOLLOWERS_COUNT = "followers_count"; + } + + public static final class Tweet_Place{ + public static final String ID = "id"; + public static final String URL = "url"; + public static final String PLACE_TYPE = "place_type"; + public static final String NAME = "name"; + public static final String FULL_NAME = "full_name"; + public static final String COUNTRY_CODE = "country_code"; + public static final String COUNTRY = "country"; + public static final String BOUNDING_BOX = "bounding_box"; + public static final String ATTRIBUTES = "attributes"; + } + + public static final class Tweet_User{ + public static final String ID = "id"; + public static final String NAME = "name"; + public static final String SCREEN_NAME = "screen_name"; + public static final String LOCATION = "location"; + public static final String DESCRIPTION = "description"; + public static final String CONTRIBUTORS_ENABLED = "contributors_enabled"; + public static final String PROFILE_IMAGE_URL = "profile_image_url"; + public static final String PROFILE_IMAGE_URL_HTTPS = "profile_image_url_https"; + public static final String URL = "url"; + public static final String PROTECTED = "protected"; + public static final String FOLLOWERS_COUNT = "followers_count"; + public static final String PROFILE_BACKGROUND_COLOR = "profile_background_color"; + public static final String PROFILE_TEXT_COLOR = "profile_text_color"; + public static final String PROFILE_LINK_COLOR = "profile_link_color"; + public static final String PROFILE_SIDEBAR_FILL_COLOR = "profile_sidebar_fill_color"; + public static final String PROFILE_SIDEBAR_BORDER_COLOR = "profile_sidebar_border_color"; + public static final String PROFILE_USE_BACKGROUND_IMAGE = "profile_use_background_image"; + public static final String DEFAULT_PROFILE = "default_profile"; + public static final String DEFAULT_PROFILE_IMAGE = "default_profile_image"; + public static final String FRIENDS_COUNT = "friends_count"; + public static final String CREATED_AT = "CREATED_AT"; + public static final String FAVOURITES_COUNT = "favourites_count"; + public static final String UTC_OFFSET = "utc_offset"; + public static final String TIME_ZONE = "time_zone"; + public static final String PROFILE_BACKGROUND_IMAGE_URL = "profile_background_image_url"; + public static final String PROFILE_BACKGROUND_IMAGE_URL_HTTPS = "profile_background_image_url_https"; + public static final String PROFILE_BANNER_URL = "profile_banner_url"; + public static final String LANG = "lang"; + public static final String STATUSES_COUNT = "statuses_count"; + public static final String GEO_ENABLED = "geo_enabled"; + public static final String VERIFIED = "verified"; + public static final String IS_TRANSLATOR = "is_translator"; + public static final String LISTED_COUNT = "listed_count"; + public static final String FOLLOW_REQUEST_SENT = "follow_request_sent"; + // skip Entities, attrs in API but not in JSON is as below +// public static final String WITHHELD_IN_COUNTRIES = "withheld_in_countries"; +// public static final String BIGGER_PROFILE_IMAGE_URL = "bigger_profile_image_url"; +// public static final String MINI_PROFILE_IMAGE_URL = "mini_profile_image_url"; +// public static final String ORIGINAL_PROFILE_IMAGE_URL = "original_profile_image_url"; +// public static final String SHOW_ALL_INLINE_MEDIA = "show_all_inline_media"; +// public static final String PROFILE_BANNER_RETINA_URL = "profile_banner_retina_url"; +// public static final String PROFILE_BANNER_IPAD_URL = "profile_banner_ipad_url"; +// public static final String PROFILE_BANNER_IPAD_RETINA_URL = "profile_banner_ipad_retina_url"; +// public static final String PROFILE_BANNER_MOBILE_URL = "profile_banner_mobile_url"; +// public static final String PROFILE_BANNER_MOBILE_RETINA_URL = "profile_banner_mobile_retina_url"; +// public static final String PROFILE_BACKGROUND_TILED = "profile_background_tiled"; } /* @@ -77,4 +159,5 @@ public static final String TOPICS = "topics"; } + } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index 55dee04..0bc38df 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@ -221,7 +221,7 @@ public static final String KEY_STREAM_SOURCE = "stream-source"; public static final String EXTERNAL = "external"; public static final String KEY_READER_FACTORY = "reader-factory"; - public static final String READER_RSS = "rss"; + public static final String READER_RSS = "rss_feed"; public static final String FORMAT_CSV = "csv"; public static final String ERROR_PARSE_RECORD = "Parser failed to parse record"; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java index 4fb602b..6aac9f1 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java @@ -205,6 +205,7 @@ private static ConfigurationBuilder getAuthConfiguration(Map<String, String> configuration) { ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setDebugEnabled(true); + cb.setJSONStoreEnabled(true); String oAuthConsumerKey = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_KEY); String oAuthConsumerSecret = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_SECRET); String oAuthAccessToken = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN); diff --git a/asterixdb/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java b/asterixdb/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java index 60e6e89..8dcac13 100644 --- a/asterixdb/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java +++ b/asterixdb/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java @@ -85,7 +85,7 @@ this.buffer = buffer; tokenBegin = bufpos = 0; containsEscapes = false; - line++; +// line++; tokenBegin = -1; } diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java index e16633e..a5af127 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.om.types; +import org.apache.asterix.om.base.AOrderedList; import org.apache.asterix.om.base.IAObject; import org.json.JSONException; import org.json.JSONObject; @@ -26,6 +27,8 @@ private static final long serialVersionUID = 1L; + public static final AOrderedListType FULL_OPEN_ORDEREDLIST_TYPE = new AOrderedListType(null,""); + /** * @param itemType * if null, the list is untyped diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java index 80b13b5..febc6ad 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.om.types; +import org.apache.asterix.om.base.AUnorderedList; import org.apache.asterix.om.base.IAObject; import org.json.JSONException; import org.json.JSONObject; @@ -26,6 +27,8 @@ private static final long serialVersionUID = 1L; + public static final AUnorderedListType FULLY_OPEN_UNORDEREDLIST_TYPE = new AUnorderedListType(null,""); + /** * @param itemType * if null, the collection is untyped -- To view, visit https://asterix-gerrit.ics.uci.edu/1002 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I7021e7b779de05b9ec999a8d5f8464fb0ab413c0 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Xikui Wang <xkk...@gmail.com>