Ian Maxon has submitted this change and it was merged. Change subject: New feed fixes: Updated the ExternalFunctionProvider to improve handling "setResult" ......................................................................
New feed fixes: Updated the ExternalFunctionProvider to improve handling "setResult" Change-Id: I7c026e2f3b927bda2628835c15318d6c96f8b043 Reviewed-on: https://asterix-gerrit.ics.uci.edu/321 Reviewed-by: Ian Maxon <[email protected]> Tested-by: Jenkins <[email protected]> --- M asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java M asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java M asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java M asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java M asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java M asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java M asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JTypeTag.java M asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TweetProcessor.java M asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java 9 files changed, 175 insertions(+), 56 deletions(-) Approvals: Ian Maxon: Looks good to me, approved Jenkins: Verified diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java index 9e13e21..76abeb4 100755 --- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java +++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java @@ -15,10 +15,12 @@ package edu.uci.ics.asterix.external.library; import edu.uci.ics.asterix.om.functions.IExternalFunctionInfo; +import edu.uci.ics.asterix.om.types.ATypeTag; import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator; import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory; import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider; +import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage; import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference; public class ExternalFunctionProvider { @@ -38,6 +40,8 @@ } class ExternalScalarFunction extends ExternalFunction implements IExternalScalarFunction, ICopyEvaluator { + private final static byte SER_RECORD_TYPE_TAG = ATypeTag.RECORD.serialize(); + private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize(); public ExternalScalarFunction(IExternalFunctionInfo finfo, ICopyEvaluatorFactory args[], IDataOutputProvider outputProvider) throws AlgebricksException { @@ -57,12 +61,22 @@ functionHelper.reset(); } catch (Exception e) { e.printStackTrace(); - //throw new AlgebricksException(e); + throw new AlgebricksException(e); } } public void evaluate(IFunctionHelper argumentProvider) throws Exception { ((IExternalScalarFunction) externalFunction).evaluate(argumentProvider); + /* + * Make sure that if "setResult" is not called, + * or the result object is null we let Hyracks storage manager know + * we want to discard a null object + */ + byte byteOutput = ((ArrayBackedValueStorage) out).getByteArray()[0]; + if (!argumentProvider.isValidResult() || byteOutput == SER_NULL_TYPE_TAG) { + out.getDataOutput().writeByte(SER_NULL_TYPE_TAG); + } } + } diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java index b2d2061..f16ff4a 100755 --- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java +++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java @@ -14,11 +14,11 @@ */ package edu.uci.ics.asterix.external.library; -import java.io.IOException; - import edu.uci.ics.asterix.common.exceptions.AsterixException; import edu.uci.ics.asterix.external.library.java.IJObject; import edu.uci.ics.asterix.external.library.java.JTypeTag; + +import java.io.IOException; public interface IFunctionHelper { @@ -28,6 +28,8 @@ public void setResult(IJObject result) throws IOException, AsterixException; + public boolean isValidResult(); + public IJObject getObject(JTypeTag jtypeTag); public void reset(); diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java index e2e694a..f272008 100644 --- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java +++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java @@ -14,13 +14,10 @@ */ package edu.uci.ics.asterix.external.library; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - import edu.uci.ics.asterix.common.exceptions.AsterixException; import edu.uci.ics.asterix.external.library.java.IJObject; import edu.uci.ics.asterix.external.library.java.JObjectPointableVisitor; +import edu.uci.ics.asterix.external.library.java.JObjects.JNull; import edu.uci.ics.asterix.external.library.java.JTypeTag; import edu.uci.ics.asterix.om.functions.IExternalFunctionInfo; import edu.uci.ics.asterix.om.pointables.AFlatValuePointable; @@ -33,9 +30,14 @@ import edu.uci.ics.asterix.om.util.container.IObjectPool; import edu.uci.ics.asterix.om.util.container.ListObjectPool; import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; +import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException; import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider; import edu.uci.ics.hyracks.data.std.api.IValueReference; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; public class JavaFunctionHelper implements IFunctionHelper { @@ -48,6 +50,8 @@ private final JObjectPointableVisitor pointableVisitor; private final PointableAllocator pointableAllocator; private final Map<Integer, TypeInfo> poolTypeInfo; + + private boolean isValidResult = false; public JavaFunctionHelper(IExternalFunctionInfo finfo, IDataOutputProvider outputProvider) throws AlgebricksException { @@ -72,12 +76,29 @@ @Override public void setResult(IJObject result) throws IOException, AsterixException { - try { - result.serialize(outputProvider.getDataOutput(), true); - result.reset(); - } catch (IOException | AlgebricksException e) { - throw new HyracksDataException(e); + if (result == null) { + JNull.INSTANCE.serialize(outputProvider.getDataOutput(), true); + isValidResult = false; + } else { + try { + isValidResult = true; + result.serialize(outputProvider.getDataOutput(), true); + result.reset(); + } catch (IOException | AlgebricksException e) { + throw new HyracksDataException(e); + } } + } + + /** + * Gets the value of the result flag + * + * @return + * boolean True is the setResult is called and result is not null + */ + @Override + public boolean isValidResult() { + return this.isValidResult; } public void setArgument(int index, IValueReference valueReference) throws IOException, AsterixException { @@ -134,6 +155,19 @@ case STRING: retValue = objectPool.allocate(BuiltinType.ASTRING); break; + case DOUBLE: + retValue = objectPool.allocate(BuiltinType.ADOUBLE); + break; + case NULL: + retValue = JNull.INSTANCE; + break; + default: + try { + throw new NotImplementedException("Object of type " + jtypeTag.name() + " not supported."); + } catch (IllegalStateException e) { + e.printStackTrace(); + } + break; } return retValue; } diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java index c2e4cf3..cde4495 100644 --- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java +++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java @@ -62,7 +62,6 @@ import edu.uci.ics.asterix.external.library.java.JObjects.JString; import edu.uci.ics.asterix.external.library.java.JObjects.JTime; import edu.uci.ics.asterix.external.library.java.JObjects.JUnorderedList; -import edu.uci.ics.asterix.external.util.TweetProcessor; import edu.uci.ics.asterix.om.base.ACircle; import edu.uci.ics.asterix.om.base.ADuration; import edu.uci.ics.asterix.om.base.ALine; @@ -233,9 +232,10 @@ v = AStringSerializerDeserializer.INSTANCE.deserialize( new DataInputStream(new ByteArrayInputStream(b, s+1, l-1))).getStringValue(); //v = new String(b, s+1, l, "UTF-8"); - TweetProcessor.getNormalizedString(v); + JObjectUtil.getNormalizedString(v); + IJObject jObject = objectPool.allocate(BuiltinType.ASTRING); - ((JString) jObject).setValue(TweetProcessor.getNormalizedString(v)); + ((JString) jObject).setValue(JObjectUtil.getNormalizedString(v)); return jObject; } } @@ -555,8 +555,8 @@ throw new IllegalArgumentException("Cannot parse list item of type " + listType.getTypeTag()); default: - typeInfo.reset(((AbstractCollectionType) listType).getItemType(), - ((AbstractCollectionType) listType).getTypeTag()); + IAType itemType = ((AbstractCollectionType) listType).getItemType(); + typeInfo.reset(itemType, itemType.getTypeTag()); listItem = pointableVisitor.visit((AFlatValuePointable) itemPointable, typeInfo); } diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java index f5f404a..95a9efa 100644 --- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java +++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java @@ -14,10 +14,6 @@ */ package edu.uci.ics.asterix.external.library.java; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - import edu.uci.ics.asterix.common.exceptions.AsterixException; import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer; import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer; @@ -56,8 +52,39 @@ import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + public class JObjectUtil { + /** + * Normalize an input string by removing linebreaks, and replace them with space + * Also remove non-readable special characters + * + * @param originalString + * The input String + * @return + * String - the normalized string + */ + public static String getNormalizedString(String originalString) { + int len = originalString.length(); + char asciiBuff[] = new char[len]; + int j = 0; + for (int i = 0; i < len; i++) { + char c = originalString.charAt(i); + if (c == '\n' || c == '\t' || c == '\r') { + asciiBuff[j] = ' '; + j++; + } else if (c > 0 && c <= 0x7f) { + asciiBuff[j] = c; + j++; + } + } + + return new String(asciiBuff).trim(); + } + public static IJObject getJType(ATypeTag typeTag, IAType type, ByteArrayAccessibleDataInputStream dis, IObjectPool<IJObject, IAType> objectPool) throws IOException, AsterixException { IJObject jObject; diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java index 61a60b6..19d7f01 100644 --- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java +++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java @@ -14,18 +14,6 @@ */ package edu.uci.ics.asterix.external.library.java; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - import edu.uci.ics.asterix.builders.IAsterixListBuilder; import edu.uci.ics.asterix.builders.RecordBuilder; import edu.uci.ics.asterix.builders.UnorderedListBuilder; @@ -77,6 +65,7 @@ import edu.uci.ics.asterix.om.base.AMutableString; import edu.uci.ics.asterix.om.base.AMutableTime; import edu.uci.ics.asterix.om.base.AMutableUnorderedList; +import edu.uci.ics.asterix.om.base.ANull; import edu.uci.ics.asterix.om.base.APoint; import edu.uci.ics.asterix.om.base.ARectangle; import edu.uci.ics.asterix.om.base.AString; @@ -89,6 +78,18 @@ import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; public class JObjects { @@ -109,6 +110,47 @@ @Override public IAObject getIAObject() { return value; + } + + } + + /* + * This class is necessary to be able to serialize null objects + * in cases of setting "null" results + * + * + */ + public static class JNull implements IJObject { + public final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize(); + + public final static JNull INSTANCE = new JNull(); + + private JNull() { + } + + @Override + public ATypeTag getTypeTag() { + return ATypeTag.NULL; + } + + @Override + public IAObject getIAObject() { + return ANull.NULL; + } + + @Override + public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException { + if (writeTypeTag) { + try { + dataOutput.writeByte(SER_NULL_TYPE_TAG); + } catch (IOException e) { + throw new HyracksDataException(e); + } + } + } + + @Override + public void reset() { } } @@ -1026,6 +1068,10 @@ return fields; } + public Map<String, IJObject> getOpenFields() { + return this.openFields; + } + public RecordBuilder getRecordBuilder() { RecordBuilder recordBuilder = new RecordBuilder(); recordBuilder.reset(recordType); diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JTypeTag.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JTypeTag.java index 9cf45e8..2465705 100644 --- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JTypeTag.java +++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JTypeTag.java @@ -22,5 +22,6 @@ DOUBLE, FLOAT, LIST, - OBJECT + OBJECT, + NULL } diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TweetProcessor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TweetProcessor.java index b0ff3cb..a1e784b 100644 --- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TweetProcessor.java +++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TweetProcessor.java @@ -14,6 +14,7 @@ */ package edu.uci.ics.asterix.external.util; +import edu.uci.ics.asterix.external.library.java.JObjectUtil; import twitter4j.Status; import twitter4j.User; import edu.uci.ics.asterix.om.base.AMutableDouble; @@ -43,11 +44,11 @@ public AMutableRecord processNextTweet(Status tweet) { User user = tweet.getUser(); - ((AMutableString) mutableUserFields[0]).setValue(getNormalizedString(user.getScreenName())); - ((AMutableString) mutableUserFields[1]).setValue(getNormalizedString(user.getLang())); + ((AMutableString) mutableUserFields[0]).setValue(JObjectUtil.getNormalizedString(user.getScreenName())); + ((AMutableString) mutableUserFields[1]).setValue(JObjectUtil.getNormalizedString(user.getLang())); ((AMutableInt32) mutableUserFields[2]).setValue(user.getFriendsCount()); ((AMutableInt32) mutableUserFields[3]).setValue(user.getStatusesCount()); - ((AMutableString) mutableUserFields[4]).setValue(getNormalizedString(user.getName())); + ((AMutableString) mutableUserFields[4]).setValue(JObjectUtil.getNormalizedString(user.getName())); ((AMutableInt32) mutableUserFields[5]).setValue(user.getFollowersCount()); ((AMutableString) mutableTweetFields[0]).setValue(tweet.getId() + ""); @@ -62,20 +63,15 @@ ((AMutableDouble) mutableTweetFields[2]).setValue(0); ((AMutableDouble) mutableTweetFields[3]).setValue(0); } - ((AMutableString) mutableTweetFields[4]).setValue(getNormalizedString(tweet.getCreatedAt().toString())); - ((AMutableString) mutableTweetFields[5]).setValue(getNormalizedString(tweet.getText())); + ((AMutableString) mutableTweetFields[4]).setValue(JObjectUtil.getNormalizedString( + tweet.getCreatedAt().toString())); + ((AMutableString) mutableTweetFields[5]).setValue(JObjectUtil.getNormalizedString(tweet.getText())); for (int i = 0; i < 6; i++) { mutableRecord.setValueAtPos(i, mutableTweetFields[i]); } return mutableRecord; - - } - - public static String getNormalizedString(String originalString) { - String asciiText = originalString.replaceAll("[^\\x00-\\x7F]", "").replaceAll("\n", " "); - return asciiText.trim(); } diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java index 11016e0..ca6fb4d 100644 --- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java +++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java @@ -14,17 +14,6 @@ */ package edu.uci.ics.asterix.runtime.formats; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang3.mutable.Mutable; -import org.apache.commons.lang3.mutable.MutableObject; - import edu.uci.ics.asterix.common.config.GlobalConfig; import edu.uci.ics.asterix.common.exceptions.AsterixRuntimeException; import edu.uci.ics.asterix.common.parse.IParseFileSplitsDecl; @@ -358,6 +347,16 @@ import edu.uci.ics.hyracks.dataflow.common.data.parsers.LongParserFactory; import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory; import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory; +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableObject; + +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; public class NonTaggedDataFormat implements IDataFormat { -- To view, visit https://asterix-gerrit.ics.uci.edu/321 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I7c026e2f3b927bda2628835c15318d6c96f8b043 Gerrit-PatchSet: 5 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Heri Ramampiaro <[email protected]> Gerrit-Reviewer: Heri Ramampiaro <[email protected]> Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Raman Grover <[email protected]> Gerrit-Reviewer: Steven Jacobs <[email protected]>
