The checks are enabled in Travis CI now. On Thu, Oct 15, 2015 at 3:07 AM, Tushar Gosavi <tus...@datatorrent.com> wrote:
> My bad > I pulled changes on my laptop, and checked that compilation and new tests > cases passed before merge. > I did not pay attention to rat check, sorry for that. We have fixed rat > check failure through PR#62 > > Thanks, > - Tushar. > > > On Thu, Oct 15, 2015 at 9:15 AM, Thomas Weise <tho...@datatorrent.com> > wrote: > >> This does not pass apache-rat:check >> >> https://api.travis-ci.org/jobs/85458770/log.txt?deansi=true >> >> >> ******************************* >> >> Unapproved licenses: >> >> >> src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java >> >> src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java >> >> src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java >> src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java >> src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java >> src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java >> >> >> On Wed, Oct 14, 2015 at 4:48 AM, <tus...@apache.org> wrote: >> >>> Repository: incubator-apex-malhar >>> Updated Branches: >>> refs/heads/devel-3 e1a45507b -> 3f4fe1866 >>> >>> >>> MLHR-1838 Added pojo parsers and formatters(csv,json,xml) >>> >>> >>> Project: >>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo >>> Commit: >>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/3f4fe186 >>> Tree: >>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/3f4fe186 >>> Diff: >>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/3f4fe186 >>> >>> Branch: refs/heads/devel-3 >>> Commit: 3f4fe18665c59dadb8ad289f696df983bdc451ce >>> Parents: e1a4550 >>> Author: shubham <shubham-patha...@github.com> >>> Authored: Fri Sep 11 16:26:03 2015 +0530 >>> Committer: shubham <shubham-patha...@github.com> >>> Committed: Wed Oct 14 10:53:12 2015 +0530 >>> >>> ---------------------------------------------------------------------- >>> contrib/pom.xml | 12 + >>> .../contrib/converter/Converter.java | 43 +++ >>> .../contrib/schema/formatter/CsvFormatter.java | 285 +++++++++++++++++ >>> .../contrib/schema/formatter/Formatter.java | 101 ++++++ >>> .../contrib/schema/formatter/JsonFormatter.java | 109 +++++++ >>> .../contrib/schema/formatter/XmlFormatter.java | 172 ++++++++++ >>> .../contrib/schema/parser/CsvParser.java | 314 >>> +++++++++++++++++++ >>> .../contrib/schema/parser/JsonParser.java | 106 +++++++ >>> .../contrib/schema/parser/Parser.java | 102 ++++++ >>> .../contrib/schema/parser/XmlParser.java | 141 +++++++++ >>> .../schema/formatter/CsvFormatterTest.java | 147 +++++++++ >>> .../schema/formatter/JsonFormatterTest.java | 186 +++++++++++ >>> .../schema/formatter/XmlFormatterTest.java | 226 +++++++++++++ >>> .../contrib/schema/parser/CsvParserTest.java | 172 ++++++++++ >>> .../contrib/schema/parser/JsonParserTest.java | 212 +++++++++++++ >>> .../contrib/schema/parser/XmlParserTest.java | 254 +++++++++++++++ >>> 16 files changed, 2582 insertions(+) >>> ---------------------------------------------------------------------- >>> >>> >>> >>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/pom.xml >>> ---------------------------------------------------------------------- >>> diff --git a/contrib/pom.xml b/contrib/pom.xml >>> index abed040..91ef5c7 100755 >>> --- a/contrib/pom.xml >>> +++ b/contrib/pom.xml >>> @@ -606,5 +606,17 @@ >>> <version>${dt.framework.version}</version> >>> <type>jar</type> >>> </dependency> >>> + <dependency> >>> + <!-- required by Xml parser and formatter --> >>> + <groupId>com.thoughtworks.xstream</groupId> >>> + <artifactId>xstream</artifactId> >>> + <version>1.4.8</version> >>> + </dependency> >>> + <dependency> >>> + <!-- required by Csv parser and formatter --> >>> + <groupId>net.sf.supercsv</groupId> >>> + <artifactId>super-csv-joda</artifactId> >>> + <version>2.3.1</version> >>> + </dependency> >>> </dependencies> >>> </project> >>> >>> >>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java >>> ---------------------------------------------------------------------- >>> diff --git >>> a/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java >>> b/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java >>> new file mode 100644 >>> index 0000000..ebf2925 >>> --- /dev/null >>> +++ >>> b/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java >>> @@ -0,0 +1,43 @@ >>> +/** >>> + * Licensed to the Apache Software Foundation (ASF) under one >>> + * or more contributor license agreements. See the NOTICE file >>> + * distributed with this work for additional information >>> + * regarding copyright ownership. The ASF licenses this file >>> + * to you under the Apache License, Version 2.0 (the >>> + * "License"); you may not use this file except in compliance >>> + * with the License. You may obtain a copy of the License at >>> + * >>> + * http://www.apache.org/licenses/LICENSE-2.0 >>> + * >>> + * Unless required by applicable law or agreed to in writing, >>> + * software distributed under the License is distributed on an >>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >>> + * KIND, either express or implied. See the License for the >>> + * specific language governing permissions and limitations >>> + * under the License. >>> + */ >>> +package com.datatorrent.contrib.converter; >>> + >>> +import org.apache.hadoop.classification.InterfaceStability; >>> + >>> +/** >>> + * Operators that are converting tuples from one format to another must >>> + * implement this interface. Eg. Parsers or formatters , that parse >>> data of >>> + * certain format and convert them to another format. >>> + * >>> + * @param <INPUT> >>> + * @param <OUTPUT> >>> + */ >>> +@InterfaceStability.Evolving >>> +public interface Converter<INPUT, OUTPUT> >>> +{ >>> + /** >>> + * Provide the implementation for converting tuples from one format >>> to the >>> + * other >>> + * >>> + * @param INPUT >>> + * tuple of certain format >>> + * @return OUTPUT tuple of converted format >>> + */ >>> + public OUTPUT convert(INPUT tuple); >>> +} >>> >>> >>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java >>> ---------------------------------------------------------------------- >>> diff --git >>> a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java >>> b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java >>> new file mode 100644 >>> index 0000000..924acc6 >>> --- /dev/null >>> +++ >>> b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java >>> @@ -0,0 +1,285 @@ >>> +/** >>> + * Licensed to the Apache Software Foundation (ASF) under one >>> + * or more contributor license agreements. See the NOTICE file >>> + * distributed with this work for additional information >>> + * regarding copyright ownership. The ASF licenses this file >>> + * to you under the Apache License, Version 2.0 (the >>> + * "License"); you may not use this file except in compliance >>> + * with the License. You may obtain a copy of the License at >>> + * >>> + * http://www.apache.org/licenses/LICENSE-2.0 >>> + * >>> + * Unless required by applicable law or agreed to in writing, >>> + * software distributed under the License is distributed on an >>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >>> + * KIND, either express or implied. See the License for the >>> + * specific language governing permissions and limitations >>> + * under the License. >>> + */ >>> +package com.datatorrent.contrib.schema.formatter; >>> + >>> +import java.io.IOException; >>> +import java.io.StringWriter; >>> +import java.util.ArrayList; >>> + >>> +import javax.validation.constraints.NotNull; >>> + >>> +import org.slf4j.Logger; >>> +import org.slf4j.LoggerFactory; >>> +import org.supercsv.cellprocessor.FmtDate; >>> +import org.supercsv.cellprocessor.Optional; >>> +import org.supercsv.cellprocessor.ift.CellProcessor; >>> +import org.supercsv.exception.SuperCsvException; >>> +import org.supercsv.io.CsvBeanWriter; >>> +import org.supercsv.io.ICsvBeanWriter; >>> +import org.supercsv.prefs.CsvPreference; >>> + >>> +import org.apache.hadoop.classification.InterfaceStability; >>> + >>> +import com.datatorrent.api.Context; >>> +import com.datatorrent.netlet.util.DTThrowable; >>> + >>> +/** >>> + * Operator that converts POJO to CSV string <br> >>> + * Assumption is that each field in the delimited data should map to a >>> simple >>> + * java type.<br> >>> + * <br> >>> + * <b>Properties</b> <br> >>> + * <b>fieldInfo</b>:User need to specify fields and their types as a >>> comma >>> + * separated string having format >>> <NAME>:<TYPE>|<FORMAT> in >>> + * the same order as incoming data. FORMAT refers to dates with >>> dd/mm/yyyy as >>> + * default e.g >>> name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy >>> + * >>> + * @displayName CsvFormatter >>> + * @category Formatter >>> + * @tags pojo csv formatter >>> + */ >>> +@InterfaceStability.Evolving >>> +public class CsvFormatter extends Formatter<String> >>> +{ >>> + >>> + private ArrayList<Field> fields; >>> + @NotNull >>> + protected String classname; >>> + @NotNull >>> + protected int fieldDelimiter; >>> + protected String lineDelimiter; >>> + >>> + @NotNull >>> + protected String fieldInfo; >>> + >>> + public enum FIELD_TYPE >>> + { >>> + BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, >>> DATE >>> + }; >>> + >>> + protected transient String[] nameMapping; >>> + protected transient CellProcessor[] processors; >>> + protected transient CsvPreference preference; >>> + >>> + public CsvFormatter() >>> + { >>> + fields = new ArrayList<Field>(); >>> + fieldDelimiter = ','; >>> + lineDelimiter = "\r\n"; >>> + >>> + } >>> + >>> + @Override >>> + public void setup(Context.OperatorContext context) >>> + { >>> + super.setup(context); >>> + >>> + //fieldInfo information >>> + fields = new ArrayList<Field>(); >>> + String[] fieldInfoTuple = fieldInfo.split(","); >>> + for (int i = 0; i < fieldInfoTuple.length; i++) { >>> + String[] fieldTuple = fieldInfoTuple[i].split(":"); >>> + Field field = new Field(); >>> + field.setName(fieldTuple[0]); >>> + String[] typeFormat = fieldTuple[1].split("\\|"); >>> + field.setType(typeFormat[0].toUpperCase()); >>> + if (typeFormat.length > 1) { >>> + field.setFormat(typeFormat[1]); >>> + } >>> + getFields().add(field); >>> + } >>> + preference = new CsvPreference.Builder('"', fieldDelimiter, >>> lineDelimiter).build(); >>> + int countKeyValue = getFields().size(); >>> + nameMapping = new String[countKeyValue]; >>> + processors = new CellProcessor[countKeyValue]; >>> + initialise(nameMapping, processors); >>> + >>> + } >>> + >>> + private void initialise(String[] nameMapping, CellProcessor[] >>> processors) >>> + { >>> + for (int i = 0; i < getFields().size(); i++) { >>> + FIELD_TYPE type = getFields().get(i).type; >>> + nameMapping[i] = getFields().get(i).name; >>> + if (type == FIELD_TYPE.DATE) { >>> + String dateFormat = getFields().get(i).format; >>> + processors[i] = new Optional(new FmtDate(dateFormat == null ? >>> "dd/MM/yyyy" : dateFormat)); >>> + } else { >>> + processors[i] = new Optional(); >>> + } >>> + } >>> + >>> + } >>> + >>> + @Override >>> + public void activate(Context context) >>> + { >>> + >>> + } >>> + >>> + @Override >>> + public void deactivate() >>> + { >>> + >>> + } >>> + >>> + @Override >>> + public String convert(Object tuple) >>> + { >>> + try { >>> + StringWriter stringWriter = new StringWriter(); >>> + ICsvBeanWriter beanWriter = new CsvBeanWriter(stringWriter, >>> preference); >>> + beanWriter.write(tuple, nameMapping, processors); >>> + beanWriter.flush(); >>> + beanWriter.close(); >>> + return stringWriter.toString(); >>> + } catch (SuperCsvException e) { >>> + logger.debug("Error while converting tuple {} >>> {}",tuple,e.getMessage()); >>> + } catch (IOException e) { >>> + DTThrowable.rethrow(e); >>> + } >>> + return null; >>> + } >>> + >>> + public static class Field >>> + { >>> + String name; >>> + String format; >>> + FIELD_TYPE type; >>> + >>> + public String getName() >>> + { >>> + return name; >>> + } >>> + >>> + public void setName(String name) >>> + { >>> + this.name = name; >>> + } >>> + >>> + public FIELD_TYPE getType() >>> + { >>> + return type; >>> + } >>> + >>> + public void setType(String type) >>> + { >>> + this.type = FIELD_TYPE.valueOf(type); >>> + } >>> + >>> + public String getFormat() >>> + { >>> + return format; >>> + } >>> + >>> + public void setFormat(String format) >>> + { >>> + this.format = format; >>> + } >>> + } >>> + >>> + /** >>> + * Gets the array list of the fields, a field being a POJO containing >>> the name >>> + * of the field and type of field. >>> + * >>> + * @return An array list of Fields. >>> + */ >>> + public ArrayList<Field> getFields() >>> + { >>> + return fields; >>> + } >>> + >>> + /** >>> + * Sets the array list of the fields, a field being a POJO containing >>> the name >>> + * of the field and type of field. >>> + * >>> + * @param fields >>> + * An array list of Fields. >>> + */ >>> + public void setFields(ArrayList<Field> fields) >>> + { >>> + this.fields = fields; >>> + } >>> + >>> + /** >>> + * Gets the delimiter which separates fields in incoming data. >>> + * >>> + * @return fieldDelimiter >>> + */ >>> + public int getFieldDelimiter() >>> + { >>> + return fieldDelimiter; >>> + } >>> + >>> + /** >>> + * Sets the delimiter which separates fields in incoming data. >>> + * >>> + * @param fieldDelimiter >>> + */ >>> + public void setFieldDelimiter(int fieldDelimiter) >>> + { >>> + this.fieldDelimiter = fieldDelimiter; >>> + } >>> + >>> + /** >>> + * Gets the delimiter which separates lines in incoming data. >>> + * >>> + * @return lineDelimiter >>> + */ >>> + public String getLineDelimiter() >>> + { >>> + return lineDelimiter; >>> + } >>> + >>> + /** >>> + * Sets the delimiter which separates line in incoming data. >>> + * >>> + * @param lineDelimiter >>> + */ >>> + public void setLineDelimiter(String lineDelimiter) >>> + { >>> + this.lineDelimiter = lineDelimiter; >>> + } >>> + >>> + /** >>> + * Gets the name of the fields with type and format in data as comma >>> separated >>> + * string in same order as incoming data. e.g >>> + * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy >>> + * >>> + * @return fieldInfo >>> + */ >>> + public String getFieldInfo() >>> + { >>> + return fieldInfo; >>> + } >>> + >>> + /** >>> + * Sets the name of the fields with type and format in data as comma >>> separated >>> + * string in same order as incoming data. e.g >>> + * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy >>> + * >>> + * @param fieldInfo >>> + */ >>> + public void setFieldInfo(String fieldInfo) >>> + { >>> + this.fieldInfo = fieldInfo; >>> + } >>> + >>> + private static final Logger logger = >>> LoggerFactory.getLogger(CsvFormatter.class); >>> +} >>> >>> >>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java >>> ---------------------------------------------------------------------- >>> diff --git >>> a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java >>> b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java >>> new file mode 100644 >>> index 0000000..19a78e0 >>> --- /dev/null >>> +++ >>> b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java >>> @@ -0,0 +1,101 @@ >>> +/** >>> + * Licensed to the Apache Software Foundation (ASF) under one >>> + * or more contributor license agreements. See the NOTICE file >>> + * distributed with this work for additional information >>> + * regarding copyright ownership. The ASF licenses this file >>> + * to you under the Apache License, Version 2.0 (the >>> + * "License"); you may not use this file except in compliance >>> + * with the License. You may obtain a copy of the License at >>> + * >>> + * http://www.apache.org/licenses/LICENSE-2.0 >>> + * >>> + * Unless required by applicable law or agreed to in writing, >>> + * software distributed under the License is distributed on an >>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >>> + * KIND, either express or implied. See the License for the >>> + * specific language governing permissions and limitations >>> + * under the License. >>> + */ >>> +package com.datatorrent.contrib.schema.formatter; >>> + >>> +import org.apache.hadoop.classification.InterfaceStability; >>> + >>> +import com.datatorrent.api.Context; >>> +import com.datatorrent.api.Context.PortContext; >>> +import com.datatorrent.api.DefaultInputPort; >>> +import com.datatorrent.api.DefaultOutputPort; >>> +import com.datatorrent.api.Operator.ActivationListener; >>> +import com.datatorrent.api.annotation.InputPortFieldAnnotation; >>> +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; >>> +import com.datatorrent.common.util.BaseOperator; >>> +import com.datatorrent.contrib.converter.Converter; >>> + >>> +/** >>> + * Abstract class that implements Converter interface. This is a schema >>> enabled >>> + * Formatter <br> >>> + * Sub classes need to implement the convert method <br> >>> + * <b>Port Interface</b><br> >>> + * <b>in</b>: expects <Object> this is a schema enabled port<br> >>> + * <b>out</b>: emits <OUTPUT> <br> >>> + * <b>err</b>: emits <Object> error port that emits input tuple >>> that could >>> + * not be converted<br> >>> + * <br> >>> + * >>> + * @displayName Parser >>> + * @tags parser converter >>> + * @param <INPUT> >>> + */ >>> +@InterfaceStability.Evolving >>> +public abstract class Formatter<OUTPUT> extends BaseOperator implements >>> Converter<Object, OUTPUT>, >>> + ActivationListener<Context> >>> +{ >>> + protected transient Class<?> clazz; >>> + >>> + @OutputPortFieldAnnotation >>> + public transient DefaultOutputPort<OUTPUT> out = new >>> DefaultOutputPort<OUTPUT>(); >>> + >>> + @OutputPortFieldAnnotation(optional = true) >>> + public transient DefaultOutputPort<Object> err = new >>> DefaultOutputPort<Object>(); >>> + >>> + @InputPortFieldAnnotation(schemaRequired = true) >>> + public transient DefaultInputPort<Object> in = new >>> DefaultInputPort<Object>() >>> + { >>> + public void setup(PortContext context) >>> + { >>> + clazz = context.getValue(Context.PortContext.TUPLE_CLASS); >>> + } >>> + >>> + @Override >>> + public void process(Object inputTuple) >>> + { >>> + OUTPUT tuple = convert(inputTuple); >>> + if (tuple == null && err.isConnected()) { >>> + err.emit(inputTuple); >>> + return; >>> + } >>> + if (out.isConnected()) { >>> + out.emit(tuple); >>> + } >>> + } >>> + }; >>> + >>> + /** >>> + * Get the class that needs to be formatted >>> + * >>> + * @return Class<?> >>> + */ >>> + public Class<?> getClazz() >>> + { >>> + return clazz; >>> + } >>> + >>> + /** >>> + * Set the class of tuple that needs to be formatted >>> + * >>> + * @param clazz >>> + */ >>> + public void setClazz(Class<?> clazz) >>> + { >>> + this.clazz = clazz; >>> + } >>> +} >>> >>> >>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java >>> ---------------------------------------------------------------------- >>> diff --git >>> a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java >>> b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java >>> new file mode 100644 >>> index 0000000..344ac60 >>> --- /dev/null >>> +++ >>> b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java >>> @@ -0,0 +1,109 @@ >>> +/** >>> + * Licensed to the Apache Software Foundation (ASF) under one >>> + * or more contributor license agreements. See the NOTICE file >>> + * distributed with this work for additional information >>> + * regarding copyright ownership. The ASF licenses this file >>> + * to you under the Apache License, Version 2.0 (the >>> + * "License"); you may not use this file except in compliance >>> + * with the License. You may obtain a copy of the License at >>> + * >>> + * http://www.apache.org/licenses/LICENSE-2.0 >>> + * >>> + * Unless required by applicable law or agreed to in writing, >>> + * software distributed under the License is distributed on an >>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >>> + * KIND, either express or implied. See the License for the >>> + * specific language governing permissions and limitations >>> + * under the License. >>> + */ >>> +package com.datatorrent.contrib.schema.formatter; >>> + >>> +import java.io.IOException; >>> +import java.text.SimpleDateFormat; >>> + >>> +import org.codehaus.jackson.JsonGenerationException; >>> +import org.codehaus.jackson.map.JsonMappingException; >>> +import org.codehaus.jackson.map.ObjectMapper; >>> +import org.codehaus.jackson.map.ObjectWriter; >>> +import org.codehaus.jackson.map.SerializationConfig; >>> +import org.slf4j.Logger; >>> +import org.slf4j.LoggerFactory; >>> + >>> +import org.apache.hadoop.classification.InterfaceStability; >>> + >>> +import com.datatorrent.api.Context; >>> +import com.datatorrent.netlet.util.DTThrowable; >>> + >>> +/** >>> + * Operator that converts POJO to JSON string <br> >>> + * <b>Properties</b> <br> >>> + * <b>dateFormat</b>: date format e.g dd/MM/yyyy >>> + * >>> + * @displayName JsonFormatter >>> + * @category Formatter >>> + * @tags pojo json formatter >>> + */ >>> +@InterfaceStability.Evolving >>> +public class JsonFormatter extends Formatter<String> >>> +{ >>> + private transient ObjectWriter writer; >>> + protected String dateFormat; >>> + >>> + @Override >>> + public void activate(Context context) >>> + { >>> + try { >>> + ObjectMapper mapper = new ObjectMapper(); >>> + if (dateFormat != null) { >>> + mapper.setDateFormat(new SimpleDateFormat(dateFormat)); >>> + } >>> + writer = mapper.writerWithType(clazz); >>> + mapper.configure(SerializationConfig.Feature.AUTO_DETECT_FIELDS, >>> true); >>> + mapper.configure(SerializationConfig.Feature.AUTO_DETECT_GETTERS, >>> true); >>> + >>> mapper.configure(SerializationConfig.Feature.AUTO_DETECT_IS_GETTERS, true); >>> + } catch (Throwable e) { >>> + throw new RuntimeException("Unable find provided class"); >>> + } >>> + } >>> + >>> + @Override >>> + public void deactivate() >>> + { >>> + >>> + } >>> + >>> + @Override >>> + public String convert(Object tuple) >>> + { >>> + try { >>> + return writer.writeValueAsString(tuple); >>> + } catch (JsonGenerationException | JsonMappingException e) { >>> + logger.debug("Error while converting tuple {} >>> {}",tuple,e.getMessage()); >>> + } catch (IOException e) { >>> + DTThrowable.rethrow(e); >>> + } >>> + return null; >>> + } >>> + >>> + /** >>> + * Get the date format >>> + * >>> + * @return Date format string >>> + */ >>> + public String getDateFormat() >>> + { >>> + return dateFormat; >>> + } >>> + >>> + /** >>> + * Set the date format >>> + * >>> + * @param dateFormat >>> + */ >>> + public void setDateFormat(String dateFormat) >>> + { >>> + this.dateFormat = dateFormat; >>> + } >>> + >>> + private static final Logger logger = >>> LoggerFactory.getLogger(JsonFormatter.class); >>> +} >>> >>> >>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java >>> ---------------------------------------------------------------------- >>> diff --git >>> a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java >>> b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java >>> new file mode 100644 >>> index 0000000..b387031 >>> --- /dev/null >>> +++ >>> b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java >>> @@ -0,0 +1,172 @@ >>> +/** >>> + * Licensed to the Apache Software Foundation (ASF) under one >>> + * or more contributor license agreements. See the NOTICE file >>> + * distributed with this work for additional information >>> + * regarding copyright ownership. The ASF licenses this file >>> + * to you under the Apache License, Version 2.0 (the >>> + * "License"); you may not use this file except in compliance >>> + * with the License. You may obtain a copy of the License at >>> + * >>> + * http://www.apache.org/licenses/LICENSE-2.0 >>> + * >>> + * Unless required by applicable law or agreed to in writing, >>> + * software distributed under the License is distributed on an >>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >>> + * KIND, either express or implied. See the License for the >>> + * specific language governing permissions and limitations >>> + * under the License. >>> + */ >>> +package com.datatorrent.contrib.schema.formatter; >>> + >>> +import java.io.Writer; >>> + >>> +import org.slf4j.Logger; >>> +import org.slf4j.LoggerFactory; >>> + >>> +import org.apache.hadoop.classification.InterfaceStability; >>> + >>> +import com.datatorrent.api.Context; >>> + >>> +import com.thoughtworks.xstream.XStream; >>> +import com.thoughtworks.xstream.XStreamException; >>> +import com.thoughtworks.xstream.converters.basic.DateConverter; >>> +import com.thoughtworks.xstream.io.HierarchicalStreamWriter; >>> +import com.thoughtworks.xstream.io.xml.CompactWriter; >>> +import com.thoughtworks.xstream.io.xml.XppDriver; >>> + >>> +/** >>> + * @displayName XmlParser >>> + * @category Formatter >>> + * @tags xml pojo formatter >>> + */ >>> +@InterfaceStability.Evolving >>> +public class XmlFormatter extends Formatter<String> >>> +{ >>> + >>> + private transient XStream xstream; >>> + >>> + protected String alias; >>> + protected String dateFormat; >>> + protected boolean prettyPrint; >>> + >>> + public XmlFormatter() >>> + { >>> + alias = null; >>> + dateFormat = null; >>> + } >>> + >>> + @Override >>> + public void activate(Context context) >>> + { >>> + if (prettyPrint) { >>> + xstream = new XStream(); >>> + } else { >>> + xstream = new XStream(new XppDriver() >>> + { >>> + @Override >>> + public HierarchicalStreamWriter createWriter(Writer out) >>> + { >>> + return new CompactWriter(out, getNameCoder()); >>> + } >>> + }); >>> + } >>> + if (alias != null) { >>> + try { >>> + xstream.alias(alias, clazz); >>> + } catch (Throwable e) { >>> + throw new RuntimeException("Unable find provided class"); >>> + } >>> + } >>> + if (dateFormat != null) { >>> + xstream.registerConverter(new DateConverter(dateFormat, new >>> String[] {})); >>> + } >>> + } >>> + >>> + @Override >>> + public void deactivate() >>> + { >>> + >>> + } >>> + >>> + @Override >>> + public String convert(Object tuple) >>> + { >>> + try { >>> + return xstream.toXML(tuple); >>> + } catch (XStreamException e) { >>> + logger.debug("Error while converting tuple {} {} >>> ",tuple,e.getMessage()); >>> + return null; >>> + } >>> + } >>> + >>> + /** >>> + * Gets the alias This is an optional step. Without it XStream would >>> work >>> + * fine, but the XML element names would contain the fully qualified >>> name of >>> + * each class (including package) which would bulk up the XML a bit. >>> + * >>> + * @return alias. >>> + */ >>> + public String getAlias() >>> + { >>> + return alias; >>> + } >>> + >>> + /** >>> + * Sets the alias This is an optional step. Without it XStream would >>> work >>> + * fine, but the XML element names would contain the fully qualified >>> name of >>> + * each class (including package) which would bulk up the XML a bit. >>> + * >>> + * @param alias >>> + * . >>> + */ >>> + public void setAlias(String alias) >>> + { >>> + this.alias = alias; >>> + } >>> + >>> + /** >>> + * Gets the date format e.g dd/mm/yyyy - this will be how a date >>> would be >>> + * formatted >>> + * >>> + * @return dateFormat. >>> + */ >>> + public String getDateFormat() >>> + { >>> + return dateFormat; >>> + } >>> + >>> + /** >>> + * Sets the date format e.g dd/mm/yyyy - this will be how a date >>> would be >>> + * formatted >>> + * >>> + * @param dateFormat >>> + * . >>> + */ >>> + public void setDateFormat(String dateFormat) >>> + { >>> + this.dateFormat = dateFormat; >>> + } >>> + >>> + /** >>> + * Returns true if pretty print is enabled. >>> + * >>> + * @return prettyPrint >>> + */ >>> + public boolean isPrettyPrint() >>> + { >>> + return prettyPrint; >>> + } >>> + >>> + /** >>> + * Sets pretty print option. >>> + * >>> + * @param prettyPrint >>> + */ >>> + public void setPrettyPrint(boolean prettyPrint) >>> + { >>> + this.prettyPrint = prettyPrint; >>> + } >>> + >>> + private static final Logger logger = >>> LoggerFactory.getLogger(XmlFormatter.class); >>> + >>> +} >>> >>> >>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java >>> ---------------------------------------------------------------------- >>> diff --git >>> a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java >>> b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java >>> new file mode 100644 >>> index 0000000..4fd39fb >>> --- /dev/null >>> +++ >>> b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java >>> @@ -0,0 +1,314 @@ >>> +/** >>> + * Licensed to the Apache Software Foundation (ASF) under one >>> + * or more contributor license agreements. See the NOTICE file >>> + * distributed with this work for additional information >>> + * regarding copyright ownership. The ASF licenses this file >>> + * to you under the Apache License, Version 2.0 (the >>> + * "License"); you may not use this file except in compliance >>> + * with the License. You may obtain a copy of the License at >>> + * >>> + * http://www.apache.org/licenses/LICENSE-2.0 >>> + * >>> + * Unless required by applicable law or agreed to in writing, >>> + * software distributed under the License is distributed on an >>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >>> + * KIND, either express or implied. See the License for the >>> + * specific language governing permissions and limitations >>> + * under the License. >>> + */ >>> +package com.datatorrent.contrib.schema.parser; >>> + >>> +import java.io.IOException; >>> +import java.util.ArrayList; >>> + >>> +import javax.validation.constraints.NotNull; >>> + >>> +import org.slf4j.Logger; >>> +import org.slf4j.LoggerFactory; >>> +import org.supercsv.cellprocessor.Optional; >>> +import org.supercsv.cellprocessor.ParseBool; >>> +import org.supercsv.cellprocessor.ParseChar; >>> +import org.supercsv.cellprocessor.ParseDate; >>> +import org.supercsv.cellprocessor.ParseDouble; >>> +import org.supercsv.cellprocessor.ParseInt; >>> +import org.supercsv.cellprocessor.ParseLong; >>> +import org.supercsv.cellprocessor.ift.CellProcessor; >>> +import org.supercsv.io.CsvBeanReader; >>> +import org.supercsv.prefs.CsvPreference; >>> + >>> +import org.apache.hadoop.classification.InterfaceStability; >>> + >>> +import com.datatorrent.api.Context; >>> +import com.datatorrent.api.Context.OperatorContext; >>> +import com.datatorrent.lib.util.ReusableStringReader; >>> +import com.datatorrent.netlet.util.DTThrowable; >>> + >>> +/** >>> + * Operator that converts CSV string to Pojo <br> >>> + * Assumption is that each field in the delimited data should map to a >>> simple >>> + * java type.<br> >>> + * <br> >>> + * <b>Properties</b> <br> >>> + * <b>fieldInfo</b>:User need to specify fields and their types as a >>> comma >>> + * separated string having format >>> <NAME>:<TYPE>|<FORMAT> in >>> + * the same order as incoming data. FORMAT refers to dates with >>> dd/mm/yyyy as >>> + * default e.g >>> name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy <br> >>> + * <b>fieldDelimiter</b>: Default is comma <br> >>> + * <b>lineDelimiter</b>: Default is '\r\n' >>> + * >>> + * @displayName CsvParser >>> + * @category Parsers >>> + * @tags csv pojo parser >>> + */ >>> +@InterfaceStability.Evolving >>> +public class CsvParser extends Parser<String> >>> +{ >>> + >>> + private ArrayList<Field> fields; >>> + @NotNull >>> + protected int fieldDelimiter; >>> + protected String lineDelimiter; >>> + >>> + @NotNull >>> + protected String fieldInfo; >>> + >>> + protected transient String[] nameMapping; >>> + protected transient CellProcessor[] processors; >>> + private transient CsvBeanReader csvReader; >>> + >>> + public enum FIELD_TYPE >>> + { >>> + BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, >>> DATE >>> + }; >>> + >>> + @NotNull >>> + private transient ReusableStringReader csvStringReader = new >>> ReusableStringReader(); >>> + >>> + public CsvParser() >>> + { >>> + fields = new ArrayList<Field>(); >>> + fieldDelimiter = ','; >>> + lineDelimiter = "\r\n"; >>> + } >>> + >>> + @Override >>> + public void setup(OperatorContext context) >>> + { >>> + super.setup(context); >>> + >>> + logger.info("field info {}", fieldInfo); >>> + fields = new ArrayList<Field>(); >>> + String[] fieldInfoTuple = fieldInfo.split(","); >>> + for (int i = 0; i < fieldInfoTuple.length; i++) { >>> + String[] fieldTuple = fieldInfoTuple[i].split(":"); >>> + Field field = new Field(); >>> + field.setName(fieldTuple[0]); >>> + String[] typeFormat = fieldTuple[1].split("\\|"); >>> + field.setType(typeFormat[0].toUpperCase()); >>> + if (typeFormat.length > 1) { >>> + field.setFormat(typeFormat[1]); >>> + } >>> + getFields().add(field); >>> + } >>> + >>> + CsvPreference preference = new CsvPreference.Builder('"', >>> fieldDelimiter, lineDelimiter).build(); >>> + csvReader = new CsvBeanReader(csvStringReader, preference); >>> + int countKeyValue = getFields().size(); >>> + logger.info("countKeyValue {}", countKeyValue); >>> + nameMapping = new String[countKeyValue]; >>> + processors = new CellProcessor[countKeyValue]; >>> + initialise(nameMapping, processors); >>> + } >>> + >>> + private void initialise(String[] nameMapping, CellProcessor[] >>> processors) >>> + { >>> + for (int i = 0; i < getFields().size(); i++) { >>> + FIELD_TYPE type = getFields().get(i).type; >>> + nameMapping[i] = getFields().get(i).name; >>> + if (type == FIELD_TYPE.DOUBLE) { >>> + processors[i] = new Optional(new ParseDouble()); >>> + } else if (type == FIELD_TYPE.INTEGER) { >>> + processors[i] = new Optional(new ParseInt()); >>> + } else if (type == FIELD_TYPE.FLOAT) { >>> + processors[i] = new Optional(new ParseDouble()); >>> + } else if (type == FIELD_TYPE.LONG) { >>> + processors[i] = new Optional(new ParseLong()); >>> + } else if (type == FIELD_TYPE.SHORT) { >>> + processors[i] = new Optional(new ParseInt()); >>> + } else if (type == FIELD_TYPE.STRING) { >>> + processors[i] = new Optional(); >>> + } else if (type == FIELD_TYPE.CHARACTER) { >>> + processors[i] = new Optional(new ParseChar()); >>> + } else if (type == FIELD_TYPE.BOOLEAN) { >>> + processors[i] = new Optional(new ParseBool()); >>> + } else if (type == FIELD_TYPE.DATE) { >>> + String dateFormat = getFields().get(i).format; >>> + processors[i] = new Optional(new ParseDate(dateFormat == null ? >>> "dd/MM/yyyy" : dateFormat)); >>> + } >>> + } >>> + } >>> + >>> + @Override >>> + public void activate(Context context) >>> + { >>> + >>> + } >>> + >>> + @Override >>> + public void deactivate() >>> + { >>> + >>> + } >>> + >>> + @Override >>> + public Object convert(String tuple) >>> + { >>> + try { >>> + csvStringReader.open(tuple); >>> + return csvReader.read(clazz, nameMapping, processors); >>> + } catch (IOException e) { >>> + logger.debug("Error while converting tuple {} >>> {}",tuple,e.getMessage()); >>> + return null; >>> + } >>> + } >>> + >>> + @Override >>> + public void teardown() >>> + { >>> + try { >>> + if (csvReader != null) { >>> + csvReader.close(); >>> + } >>> + } catch (IOException e) { >>> + DTThrowable.rethrow(e); >>> + } >>> + } >>> + >>> + public static class Field >>> + { >>> + String name; >>> + String format; >>> + FIELD_TYPE type; >>> + >>> + public String getName() >>> + { >>> + return name; >>> + } >>> + >>> + public void setName(String name) >>> + { >>> + this.name = name; >>> + } >>> + >>> + public FIELD_TYPE getType() >>> + { >>> + return type; >>> + } >>> + >>> + public void setType(String type) >>> + { >>> + this.type = FIELD_TYPE.valueOf(type); >>> + } >>> + >>> + public String getFormat() >>> + { >>> + return format; >>> + } >>> + >>> + public void setFormat(String format) >>> + { >>> + this.format = format; >>> + } >>> + >>> + } >>> + >>> + /** >>> + * Gets the array list of the fields, a field being a POJO containing >>> the name >>> + * of the field and type of field. >>> + * >>> + * @return An array list of Fields. >>> + */ >>> + public ArrayList<Field> getFields() >>> + { >>> + return fields; >>> + } >>> + >>> + /** >>> + * Sets the array list of the fields, a field being a POJO containing >>> the name >>> + * of the field and type of field. >>> + * >>> + * @param fields >>> + * An array list of Fields. >>> + */ >>> + public void setFields(ArrayList<Field> fields) >>> + { >>> + this.fields = fields; >>> + } >>> + >>> + /** >>> + * Gets the delimiter which separates fields in incoming data. >>> + * >>> + * @return fieldDelimiter >>> + */ >>> + public int getFieldDelimiter() >>> + { >>> + return fieldDelimiter; >>> + } >>> + >>> + /** >>> + * Sets the delimiter which separates fields in incoming data. >>> + * >>> + * @param fieldDelimiter >>> + */ >>> + public void setFieldDelimiter(int fieldDelimiter) >>> + { >>> + this.fieldDelimiter = fieldDelimiter; >>> + } >>> + >>> + /** >>> + * Gets the delimiter which separates lines in incoming data. >>> + * >>> + * @return lineDelimiter >>> + */ >>> + public String getLineDelimiter() >>> + { >>> + return lineDelimiter; >>> + } >>> + >>> + /** >>> + * Sets the delimiter which separates line in incoming data. >>> + * >>> + * @param lineDelimiter >>> + */ >>> + public void setLineDelimiter(String lineDelimiter) >>> + { >>> + this.lineDelimiter = lineDelimiter; >>> + } >>> + >>> + /** >>> + * Gets the name of the fields with type and format ( for date ) as >>> comma >>> + * separated string in same order as incoming data. e.g >>> + * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy >>> + * >>> + * @return fieldInfo >>> + */ >>> + public String getFieldInfo() >>> + { >>> + return fieldInfo; >>> + } >>> + >>> + /** >>> + * Sets the name of the fields with type and format ( for date ) as >>> comma >>> + * separated string in same order as incoming data. e.g >>> + * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy >>> + * >>> + * @param fieldInfo >>> + */ >>> + public void setFieldInfo(String fieldInfo) >>> + { >>> + this.fieldInfo = fieldInfo; >>> + } >>> + >>> + private static final Logger logger = >>> LoggerFactory.getLogger(CsvParser.class); >>> + >>> +} >>> >>> >>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java >>> ---------------------------------------------------------------------- >>> diff --git >>> a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java >>> b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java >>> new file mode 100644 >>> index 0000000..db45b33 >>> --- /dev/null >>> +++ >>> b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java >>> @@ -0,0 +1,106 @@ >>> +/** >>> + * Licensed to the Apache Software Foundation (ASF) under one >>> + * or more contributor license agreements. See the NOTICE file >>> + * distributed with this work for additional information >>> + * regarding copyright ownership. The ASF licenses this file >>> + * to you under the Apache License, Version 2.0 (the >>> + * "License"); you may not use this file except in compliance >>> + * with the License. You may obtain a copy of the License at >>> + * >>> + * http://www.apache.org/licenses/LICENSE-2.0 >>> + * >>> + * Unless required by applicable law or agreed to in writing, >>> + * software distributed under the License is distributed on an >>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >>> + * KIND, either express or implied. See the License for the >>> + * specific language governing permissions and limitations >>> + * under the License. >>> + */ >>> +package com.datatorrent.contrib.schema.parser; >>> + >>> +import java.io.IOException; >>> +import java.text.SimpleDateFormat; >>> + >>> +import org.codehaus.jackson.JsonProcessingException; >>> +import org.codehaus.jackson.map.DeserializationConfig; >>> +import org.codehaus.jackson.map.ObjectMapper; >>> +import org.codehaus.jackson.map.ObjectReader; >>> +import org.slf4j.Logger; >>> +import org.slf4j.LoggerFactory; >>> + >>> +import org.apache.hadoop.classification.InterfaceStability; >>> + >>> +import com.datatorrent.api.Context; >>> +import com.datatorrent.netlet.util.DTThrowable; >>> + >>> +/** >>> + * Operator that converts JSON string to Pojo <br> >>> + * <b>Properties</b> <br> >>> + * <b>dateFormat</b>: date format e.g dd/MM/yyyy >>> + * >>> + * @displayName JsonParser >>> + * @category Parsers >>> + * @tags json pojo parser >>> + */ >>> +@InterfaceStability.Evolving >>> +public class JsonParser extends Parser<String> >>> +{ >>> + >>> + private transient ObjectReader reader; >>> + protected String dateFormat; >>> + >>> + @Override >>> + public void activate(Context context) >>> + { >>> + try { >>> + ObjectMapper mapper = new ObjectMapper(); >>> + >>> mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, >>> false); >>> + if (dateFormat != null) { >>> + mapper.setDateFormat(new SimpleDateFormat(dateFormat)); >>> + } >>> + reader = mapper.reader(clazz); >>> + } catch (Throwable e) { >>> + throw new RuntimeException("Unable find provided class"); >>> + } >>> + } >>> + >>> + @Override >>> + public void deactivate() >>> + { >>> + } >>> + >>> + @Override >>> + public Object convert(String tuple) >>> + { >>> + try { >>> + return reader.readValue(tuple); >>> + } catch (JsonProcessingException e) { >>> + logger.debug("Error while converting tuple {} >>> {}",tuple,e.getMessage()); >>> + } catch (IOException e) { >>> + DTThrowable.rethrow(e); >>> + } >>> + return null; >>> + } >>> + >>> + /** >>> + * Get the date format >>> + * >>> + * @return Date format string >>> + */ >>> + public String getDateFormat() >>> + { >>> + return dateFormat; >>> + } >>> + >>> + /** >>> + * Set the date format >>> + * >>> + * @param dateFormat >>> + */ >>> + public void setDateFormat(String dateFormat) >>> + { >>> + this.dateFormat = dateFormat; >>> + } >>> + >>> + private static final Logger logger = >>> LoggerFactory.getLogger(JsonParser.class); >>> +} >>> >>> >>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java >>> ---------------------------------------------------------------------- >>> diff --git >>> a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java >>> b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java >>> new file mode 100644 >>> index 0000000..e5ff7f5 >>> --- /dev/null >>> +++ >>> b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java >>> @@ -0,0 +1,102 @@ >>> +/** >>> + * Licensed to the Apache Software Foundation (ASF) under one >>> + * or more contributor license agreements. See the NOTICE file >>> + * distributed with this work for additional information >>> + * regarding copyright ownership. The ASF licenses this file >>> + * to you under the Apache License, Version 2.0 (the >>> + * "License"); you may not use this file except in compliance >>> + * with the License. You may obtain a copy of the License at >>> + * >>> + * http://www.apache.org/licenses/LICENSE-2.0 >>> + * >>> + * Unless required by applicable law or agreed to in writing, >>> + * software distributed under the License is distributed on an >>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >>> + * KIND, either express or implied. See the License for the >>> + * specific language governing permissions and limitations >>> + * under the License. >>> + */ >>> +package com.datatorrent.contrib.schema.parser; >>> + >>> +import org.apache.hadoop.classification.InterfaceStability; >>> + >>> +import com.datatorrent.api.Context; >>> +import com.datatorrent.api.Context.PortContext; >>> +import com.datatorrent.api.DefaultInputPort; >>> +import com.datatorrent.api.DefaultOutputPort; >>> +import com.datatorrent.api.Operator.ActivationListener; >>> +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; >>> +import com.datatorrent.common.util.BaseOperator; >>> +import com.datatorrent.contrib.converter.Converter; >>> + >>> +/** >>> + * Abstract class that implements Converter interface. This is a schema >>> enabled >>> + * Parser <br> >>> + * Sub classes need to implement the convert method <br> >>> + * <br> >>> + * <b>Port Interface</b><br> >>> + * <b>in</b>: expects <INPUT><br> >>> + * <b>out</b>: emits <Object> this is a schema enabled port<br> >>> + * <b>err</b>: emits <INPUT> error port that emits input tuple >>> that could >>> + * not be converted<br> >>> + * <br> >>> + * >>> + * @displayName Parser >>> + * @tags parser converter >>> + * @param <INPUT> >>> + */ >>> +@InterfaceStability.Evolving >>> +public abstract class Parser<INPUT> extends BaseOperator implements >>> Converter<INPUT, Object>, >>> + ActivationListener<Context> >>> +{ >>> + protected transient Class<?> clazz; >>> + >>> + @OutputPortFieldAnnotation(schemaRequired = true) >>> + public transient DefaultOutputPort<Object> out = new >>> DefaultOutputPort<Object>() >>> + { >>> + public void setup(PortContext context) >>> + { >>> + clazz = context.getValue(Context.PortContext.TUPLE_CLASS); >>> + } >>> + }; >>> + >>> + @OutputPortFieldAnnotation(optional = true) >>> + public transient DefaultOutputPort<INPUT> err = new >>> DefaultOutputPort<INPUT>(); >>> + >>> + public transient DefaultInputPort<INPUT> in = new >>> DefaultInputPort<INPUT>() >>> + { >>> + @Override >>> + public void process(INPUT inputTuple) >>> + { >>> + Object tuple = convert(inputTuple); >>> + if (tuple == null && err.isConnected()) { >>> + err.emit(inputTuple); >>> + return; >>> + } >>> + if (out.isConnected()) { >>> + out.emit(tuple); >>> + } >>> + } >>> + }; >>> + >>> + /** >>> + * Get the class that needs to be formatted >>> + * >>> + * @return Class<?> >>> + */ >>> + public Class<?> getClazz() >>> + { >>> + return clazz; >>> + } >>> + >>> + /** >>> + * Set the class of tuple that needs to be formatted >>> + * >>> + * @param clazz >>> + */ >>> + public void setClazz(Class<?> clazz) >>> + { >>> + this.clazz = clazz; >>> + } >>> + >>> +} >>> >>> >>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java >>> ---------------------------------------------------------------------- >>> diff --git >>> a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java >>> b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java >>> new file mode 100644 >>> index 0000000..4931497 >>> --- /dev/null >>> +++ >>> b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java >>> @@ -0,0 +1,141 @@ >>> +/** >>> + * Licensed to the Apache Software Foundation (ASF) under one >>> + * or more contributor license agreements. See the NOTICE file >>> + * distributed with this work for additional information >>> + * regarding copyright ownership. The ASF licenses this file >>> + * to you under the Apache License, Version 2.0 (the >>> + * "License"); you may not use this file except in compliance >>> + * with the License. You may obtain a copy of the License at >>> + * >>> + * http://www.apache.org/licenses/LICENSE-2.0 >>> + * >>> + * Unless required by applicable law or agreed to in writing, >>> + * software distributed under the License is distributed on an >>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >>> + * KIND, either express or implied. See the License for the >>> + * specific language governing permissions and limitations >>> + * under the License. >>> + */ >>> +package com.datatorrent.contrib.schema.parser; >>> + >>> +import org.slf4j.Logger; >>> +import org.slf4j.LoggerFactory; >>> + >>> +import org.apache.hadoop.classification.InterfaceStability; >>> + >>> +import com.thoughtworks.xstream.XStream; >>> +import com.thoughtworks.xstream.XStreamException; >>> +import com.thoughtworks.xstream.converters.basic.DateConverter; >>> + >>> +import com.datatorrent.api.Context; >>> + >>> +/** >>> + * Operator that converts XML string to Pojo <br> >>> + * <b>Properties</b> <br> >>> + * <b>alias</b>:This maps to the root element of the XML string. If not >>> + * specified, parser would expect the root element to be fully >>> qualified name of >>> + * the Pojo Class. <br> >>> + * <b>dateFormats</b>: Comma separated string of date formats e.g >>> + * dd/mm/yyyy,dd-mmm-yyyy where first one would be considered default >>> + * >>> + * @displayName XmlParser >>> + * @category Parsers >>> + * @tags xml pojo parser >>> + */ >>> +@InterfaceStability.Evolving >>> +public class XmlParser extends Parser<String> >>> +{ >>> + >>> + private transient XStream xstream; >>> + protected String alias; >>> + protected String dateFormats; >>> + >>> + public XmlParser() >>> + { >>> + alias = null; >>> + dateFormats = null; >>> + } >>> + >>> + @Override >>> + public void activate(Context context) >>> + { >>> + xstream = new XStream(); >>> + if (alias != null) { >>> + try { >>> + xstream.alias(alias, clazz); >>> + } catch (Throwable e) { >>> + throw new RuntimeException("Unable find provided class"); >>> + } >>> + } >>> + if (dateFormats != null) { >>> + String[] dateFormat = dateFormats.split(","); >>> + xstream.registerConverter(new DateConverter(dateFormat[0], >>> dateFormat)); >>> + } >>> + } >>> + >>> + @Override >>> + public void deactivate() >>> + { >>> + >>> + } >>> + >>> + @Override >>> + public Object convert(String tuple) >>> + { >>> + try { >>> + return xstream.fromXML(tuple); >>> + } catch (XStreamException e) { >>> + logger.debug("Error while converting tuple {} {}", >>> tuple,e.getMessage()); >>> + return null; >>> + } >>> + } >>> + >>> + /** >>> + * Gets the alias >>> + * >>> + * @return alias. >>> + */ >>> + public String getAlias() >>> + { >>> + return alias; >>> + } >>> + >>> + /** >>> + * Sets the alias This maps to the root element of the XML string. If >>> not >>> + * specified, parser would expect the root element to be fully >>> qualified name >>> + * of the Pojo Class. >>> + * >>> + * @param alias >>> + * . >>> + */ >>> + public void setAlias(String alias) >>> + { >>> + this.alias = alias; >>> + } >>> + >>> + /** >>> + * Gets the comma separated string of date formats e.g >>> dd/mm/yyyy,dd-mmm-yyyy >>> + * where first one would be considered default >>> + * >>> + * @return dateFormats. >>> + */ >>> + public String getDateFormats() >>> + { >>> + return dateFormats; >>> + } >>> + >>> + /** >>> + * Sets the comma separated string of date formats e.g >>> dd/mm/yyyy,dd-mmm-yyyy >>> + * where first one would be considered default >>> + * >>> + * @param dateFormats >>> + * . >>> + */ >>> + public void setDateFormats(String dateFormats) >>> + { >>> + this.dateFormats = dateFormats; >>> + } >>> + >>> + private static final Logger logger = >>> LoggerFactory.getLogger(XmlParser.class); >>> + >>> +} >>> >>> >>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java >>> ---------------------------------------------------------------------- >>> diff --git >>> a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java >>> b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java >>> new file mode 100644 >>> index 0000000..8ecc088 >>> --- /dev/null >>> +++ >>> b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java >>> @@ -0,0 +1,147 @@ >>> +package com.datatorrent.contrib.schema.formatter; >>> + >>> +import java.util.Date; >>> + >>> +import org.joda.time.DateTime; >>> +import org.junit.Assert; >>> +import org.junit.Rule; >>> +import org.junit.Test; >>> +import org.junit.rules.TestWatcher; >>> +import org.junit.runner.Description; >>> + >>> +import com.datatorrent.contrib.schema.formatter.CsvFormatter; >>> +import com.datatorrent.lib.testbench.CollectorTestSink; >>> +import com.datatorrent.lib.util.TestUtils; >>> + >>> +public class CsvFormatterTest >>> +{ >>> + >>> + CsvFormatter operator; >>> + CollectorTestSink<Object> validDataSink; >>> + CollectorTestSink<String> invalidDataSink; >>> + >>> + @Rule >>> + public Watcher watcher = new Watcher(); >>> + >>> + public class Watcher extends TestWatcher >>> + { >>> + >>> + @Override >>> + protected void starting(Description description) >>> + { >>> + super.starting(description); >>> + operator = new CsvFormatter(); >>> + >>> operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date"); >>> + operator.setLineDelimiter("\r\n"); >>> + validDataSink = new CollectorTestSink<Object>(); >>> + invalidDataSink = new CollectorTestSink<String>(); >>> + TestUtils.setSink(operator.out, validDataSink); >>> + TestUtils.setSink(operator.err, invalidDataSink); >>> + } >>> + >>> + @Override >>> + protected void finished(Description description) >>> + { >>> + super.finished(description); >>> + operator.teardown(); >>> + } >>> + >>> + } >>> + >>> + @Test >>> + public void testPojoReaderToCsv() >>> + { >>> + operator.setup(null); >>> + EmployeeBean emp = new EmployeeBean(); >>> + emp.setName("john"); >>> + emp.setDept("cs"); >>> + emp.setEid(1); >>> + emp.setDateOfJoining(new DateTime().withDate(2015, 1, 1).toDate()); >>> + operator.in.process(emp); >>> + Assert.assertEquals(1, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); >>> + String csvOp = (String)validDataSink.collectedTuples.get(0); >>> + Assert.assertNotNull(csvOp); >>> + Assert.assertEquals("john,cs,1,01/01/2015" + >>> operator.getLineDelimiter(), csvOp); >>> + } >>> + >>> + @Test >>> + public void testPojoReaderToCsvMultipleDate() >>> + { >>> + >>> operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date,dateOfBirth:date|dd-MMM-yyyy"); >>> + operator.setup(null); >>> + EmployeeBean emp = new EmployeeBean(); >>> + emp.setName("john"); >>> + emp.setDept("cs"); >>> + emp.setEid(1); >>> + emp.setDateOfJoining(new DateTime().withDate(2015, 1, 1).toDate()); >>> + emp.setDateOfBirth(new DateTime().withDate(2015, 1, 1).toDate()); >>> + operator.in.process(emp); >>> + Assert.assertEquals(1, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); >>> + String csvOp = (String)validDataSink.collectedTuples.get(0); >>> + Assert.assertNotNull(csvOp); >>> + Assert.assertEquals("john,cs,1,01/01/2015,01-Jan-2015" + >>> operator.getLineDelimiter(), csvOp); >>> + } >>> + >>> + public static class EmployeeBean >>> + { >>> + >>> + private String name; >>> + private String dept; >>> + private int eid; >>> + private Date dateOfJoining; >>> + private Date dateOfBirth; >>> + >>> + public String getName() >>> + { >>> + return name; >>> + } >>> + >>> + public void setName(String name) >>> + { >>> + this.name = name; >>> + } >>> + >>> + public String getDept() >>> + { >>> + return dept; >>> + } >>> + >>> + public void setDept(String dept) >>> + { >>> + this.dept = dept; >>> + } >>> + >>> + public int getEid() >>> + { >>> + return eid; >>> + } >>> + >>> + public void setEid(int eid) >>> + { >>> + this.eid = eid; >>> + } >>> + >>> + public Date getDateOfJoining() >>> + { >>> + return dateOfJoining; >>> + } >>> + >>> + public void setDateOfJoining(Date dateOfJoining) >>> + { >>> + this.dateOfJoining = dateOfJoining; >>> + } >>> + >>> + public Date getDateOfBirth() >>> + { >>> + return dateOfBirth; >>> + } >>> + >>> + public void setDateOfBirth(Date dateOfBirth) >>> + { >>> + this.dateOfBirth = dateOfBirth; >>> + } >>> + } >>> + >>> +} >>> >>> >>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java >>> ---------------------------------------------------------------------- >>> diff --git >>> a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java >>> b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java >>> new file mode 100644 >>> index 0000000..4040c63 >>> --- /dev/null >>> +++ >>> b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java >>> @@ -0,0 +1,186 @@ >>> +package com.datatorrent.contrib.schema.formatter; >>> + >>> +import java.io.ByteArrayOutputStream; >>> +import java.io.File; >>> +import java.io.IOException; >>> +import java.io.PrintStream; >>> +import java.util.Date; >>> +import java.util.List; >>> + >>> +import org.apache.commons.io.FileUtils; >>> +import org.joda.time.DateTime; >>> +import org.junit.Assert; >>> +import org.junit.Rule; >>> +import org.junit.Test; >>> +import org.junit.runner.Description; >>> + >>> +import >>> com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher; >>> +import com.datatorrent.lib.testbench.CollectorTestSink; >>> +import com.datatorrent.lib.util.TestUtils; >>> +import com.datatorrent.lib.util.TestUtils.TestInfo; >>> +import com.google.common.collect.Lists; >>> + >>> +public class JsonFormatterTest >>> +{ >>> + JsonFormatter operator; >>> + CollectorTestSink<Object> validDataSink; >>> + CollectorTestSink<String> invalidDataSink; >>> + >>> + final ByteArrayOutputStream myOut = new ByteArrayOutputStream(); >>> + >>> + public JsonFormatterTest() >>> + { >>> + // So that the output is cleaner. >>> + System.setErr(new PrintStream(myOut)); >>> + } >>> + >>> + @Rule >>> + public TestInfo testMeta = new FSTestWatcher() >>> + { >>> + private void deleteDirectory() >>> + { >>> + try { >>> + FileUtils.deleteDirectory(new File(getDir())); >>> + } catch (IOException ex) { >>> + throw new RuntimeException(ex); >>> + } >>> + } >>> + >>> + @Override >>> + protected void starting(Description descriptor) >>> + { >>> + super.starting(descriptor); >>> + deleteDirectory(); >>> + >>> + operator = new JsonFormatter(); >>> + >>> + validDataSink = new CollectorTestSink<Object>(); >>> + invalidDataSink = new CollectorTestSink<String>(); >>> + TestUtils.setSink(operator.out, validDataSink); >>> + TestUtils.setSink(operator.err, invalidDataSink); >>> + operator.setup(null); >>> + operator.activate(null); >>> + >>> + operator.beginWindow(0); >>> + } >>> + >>> + @Override >>> + protected void finished(Description description) >>> + { >>> + operator.endWindow(); >>> + operator.teardown(); >>> + >>> + deleteDirectory(); >>> + super.finished(description); >>> + } >>> + }; >>> + >>> + @Test >>> + public void testJSONToPOJO() >>> + { >>> + Test1Pojo pojo = new Test1Pojo(); >>> + pojo.a = 123; >>> + pojo.b = 234876274; >>> + pojo.c = "HowAreYou?"; >>> + pojo.d = Lists.newArrayList("ABC", "PQR", "XYZ"); >>> + >>> + operator.in.put(pojo); >>> + Assert.assertEquals(1, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); >>> + String expectedJSONString = >>> "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":null}"; >>> + Assert.assertEquals(expectedJSONString, >>> validDataSink.collectedTuples.get(0)); >>> + } >>> + >>> + @Test >>> + public void testJSONToPOJODate() >>> + { >>> + Test1Pojo pojo = new Test1Pojo(); >>> + pojo.a = 123; >>> + pojo.b = 234876274; >>> + pojo.c = "HowAreYou?"; >>> + pojo.d = Lists.newArrayList("ABC", "PQR", "XYZ"); >>> + pojo.date = new >>> DateTime().withYear(2015).withMonthOfYear(9).withDayOfMonth(15).toDate(); >>> + operator.setDateFormat("dd-MM-yyyy"); >>> + operator.setup(null); >>> + operator.activate(null); >>> + operator.in.put(pojo); >>> + Assert.assertEquals(1, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); >>> + String expectedJSONString = >>> "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}"; >>> + Assert.assertEquals(expectedJSONString, >>> validDataSink.collectedTuples.get(0)); >>> + } >>> + >>> + @Test >>> + public void testJSONToPOJONullFields() >>> + { >>> + Test1Pojo pojo = new Test1Pojo(); >>> + pojo.a = 123; >>> + pojo.b = 234876274; >>> + pojo.c = "HowAreYou?"; >>> + pojo.d = null; >>> + >>> + operator.in.put(pojo); >>> + Assert.assertEquals(1, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); >>> + String expectedJSONString = >>> "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":null,\"date\":null}"; >>> + Assert.assertEquals(expectedJSONString, >>> validDataSink.collectedTuples.get(0)); >>> + } >>> + >>> + @Test >>> + public void testJSONToPOJOEmptyPOJO() >>> + { >>> + Test1Pojo pojo = new Test1Pojo(); >>> + operator.in.put(pojo); >>> + Assert.assertEquals(1, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); >>> + String expectedJSONString = >>> "{\"a\":0,\"b\":0,\"c\":null,\"d\":null,\"date\":null}"; >>> + System.out.println(validDataSink.collectedTuples.get(0)); >>> + Assert.assertEquals(expectedJSONString, >>> validDataSink.collectedTuples.get(0)); >>> + } >>> + >>> + @Test >>> + public void testJSONToPOJONullPOJO() >>> + { >>> + operator.in.put(null); >>> + Assert.assertEquals(1, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); >>> + String expectedJSONString = "null"; >>> + Assert.assertEquals(expectedJSONString, >>> validDataSink.collectedTuples.get(0)); >>> + } >>> + >>> + @Test >>> + public void testJSONToPOJONoFieldPOJO() >>> + { >>> + operator.endWindow(); >>> + operator.teardown(); >>> + operator.setClazz(Test2Pojo.class); >>> + operator.setup(null); >>> + operator.beginWindow(1); >>> + >>> + Test2Pojo o = new Test2Pojo(); >>> + operator.in.put(o); >>> + Assert.assertEquals(0, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); >>> + Assert.assertEquals(o, invalidDataSink.collectedTuples.get(0)); >>> + } >>> + >>> + public static class Test1Pojo >>> + { >>> + public int a; >>> + public long b; >>> + public String c; >>> + public List<String> d; >>> + public Date date; >>> + >>> + @Override >>> + public String toString() >>> + { >>> + return "Test1Pojo [a=" + a + ", b=" + b + ", c=" + c + ", d=" + d >>> + ", date=" + date + "]"; >>> + } >>> + } >>> + >>> + public static class Test2Pojo >>> + { >>> + } >>> + >>> +} >>> >>> >>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java >>> ---------------------------------------------------------------------- >>> diff --git >>> a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java >>> b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java >>> new file mode 100644 >>> index 0000000..2bc1aec >>> --- /dev/null >>> +++ >>> b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java >>> @@ -0,0 +1,226 @@ >>> +package com.datatorrent.contrib.schema.formatter; >>> + >>> +import java.util.Date; >>> + >>> +import org.joda.time.DateTime; >>> +import org.junit.Assert; >>> +import org.junit.Rule; >>> +import org.junit.Test; >>> +import org.junit.rules.TestWatcher; >>> +import org.junit.runner.Description; >>> + >>> +import com.datatorrent.contrib.schema.formatter.XmlFormatter; >>> +import com.datatorrent.lib.testbench.CollectorTestSink; >>> +import com.datatorrent.lib.util.TestUtils; >>> + >>> +public class XmlFormatterTest >>> +{ >>> + >>> + XmlFormatter operator; >>> + CollectorTestSink<Object> validDataSink; >>> + CollectorTestSink<String> invalidDataSink; >>> + >>> + @Rule >>> + public Watcher watcher = new Watcher(); >>> + >>> + public class Watcher extends TestWatcher >>> + { >>> + >>> + @Override >>> + protected void starting(Description description) >>> + { >>> + super.starting(description); >>> + operator = new XmlFormatter(); >>> + operator.setClazz(EmployeeBean.class); >>> + operator.setDateFormat("yyyy-MM-dd"); >>> + validDataSink = new CollectorTestSink<Object>(); >>> + invalidDataSink = new CollectorTestSink<String>(); >>> + TestUtils.setSink(operator.out, validDataSink); >>> + TestUtils.setSink(operator.err, invalidDataSink); >>> + } >>> + >>> + @Override >>> + protected void finished(Description description) >>> + { >>> + super.finished(description); >>> + operator.teardown(); >>> + } >>> + >>> + } >>> + >>> + @Test >>> + public void testPojoToXmlWithoutAlias() >>> + { >>> + EmployeeBean e = new EmployeeBean(); >>> + e.setName("john"); >>> + e.setEid(1); >>> + e.setDept("cs"); >>> + e.setDateOfJoining(new >>> DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate()); >>> + >>> + operator.setup(null); >>> + operator.activate(null); >>> + operator.in.process(e); >>> + Assert.assertEquals(1, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); >>> + String expected = >>> "<com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>" >>> + "<name>john</name>" >>> + + "<dept>cs</dept>" + "<eid>1</eid>" + >>> "<dateOfJoining>2015-01-01</dateOfJoining>" >>> + + >>> "</com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>"; >>> + Assert.assertEquals(expected, validDataSink.collectedTuples.get(0)); >>> + } >>> + >>> + @Test >>> + public void testXmlToPojoWithAlias() >>> + { >>> + EmployeeBean e = new EmployeeBean(); >>> + e.setName("john"); >>> + e.setEid(1); >>> + e.setDept("cs"); >>> + e.setDateOfJoining(new >>> DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate()); >>> + >>> + operator.setAlias("EmployeeBean"); >>> + operator.setup(null); >>> + operator.activate(null); >>> + operator.in.process(e); >>> + Assert.assertEquals(1, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); >>> + String expected = "<EmployeeBean>" + "<name>john</name>" + >>> "<dept>cs</dept>" + "<eid>1</eid>" >>> + + "<dateOfJoining>2015-01-01</dateOfJoining>" + >>> "</EmployeeBean>"; >>> + Assert.assertEquals(expected, validDataSink.collectedTuples.get(0)); >>> + } >>> + >>> + @Test >>> + public void testXmlToPojoWithPrettyPrint() >>> + { >>> + EmployeeBean e = new EmployeeBean(); >>> + e.setName("john"); >>> + e.setEid(1); >>> + e.setDept("cs"); >>> + e.setDateOfJoining(new >>> DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate()); >>> + >>> + operator.setAlias("EmployeeBean"); >>> + operator.setPrettyPrint(true); >>> + operator.setup(null); >>> + operator.activate(null); >>> + operator.in.process(e); >>> + Assert.assertEquals(1, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); >>> + String expected = "<EmployeeBean>\n" + " <name>john</name>\n" + " >>> <dept>cs</dept>\n" + " <eid>1</eid>\n" >>> + + " <dateOfJoining>2015-01-01</dateOfJoining>\n" + >>> "</EmployeeBean>"; >>> + Assert.assertEquals(expected, validDataSink.collectedTuples.get(0)); >>> + } >>> + >>> + @Test >>> + public void testPojoToXmlWithoutAliasHeirarchical() >>> + { >>> + EmployeeBean e = new EmployeeBean(); >>> + e.setName("john"); >>> + e.setEid(1); >>> + e.setDept("cs"); >>> + e.setDateOfJoining(new >>> DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate()); >>> + Address address = new Address(); >>> + address.setCity("new york"); >>> + address.setCountry("US"); >>> + e.setAddress(address); >>> + >>> + operator.setup(null); >>> + operator.activate(null); >>> + operator.in.process(e); >>> + System.out.println(validDataSink.collectedTuples.get(0)); >>> + Assert.assertEquals(1, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); >>> + String expected = >>> "<com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>" >>> + "<name>john</name>" >>> + + "<dept>cs</dept>" + "<eid>1</eid>" + >>> "<dateOfJoining>2015-01-01</dateOfJoining>" + "<address>" >>> + + "<city>new york</city>" + "<country>US</country>" + >>> "</address>" >>> + + >>> "</com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>"; >>> + Assert.assertEquals(expected, validDataSink.collectedTuples.get(0)); >>> + } >>> + >>> + public static class EmployeeBean >>> + { >>> + >>> + private String name; >>> + private String dept; >>> + private int eid; >>> + private Date dateOfJoining; >>> + private Address address; >>> + >>> + public String getName() >>> + { >>> + return name; >>> + } >>> + >>> + public void setName(String name) >>> + { >>> + this.name = name; >>> + } >>> + >>> + public String getDept() >>> + { >>> + return dept; >>> + } >>> + >>> + public void setDept(String dept) >>> + { >>> + this.dept = dept; >>> + } >>> + >>> + public int getEid() >>> + { >>> + return eid; >>> + } >>> + >>> + public void setEid(int eid) >>> + { >>> + this.eid = eid; >>> + } >>> + >>> + public Date getDateOfJoining() >>> + { >>> + return dateOfJoining; >>> + } >>> + >>> + public void setDateOfJoining(Date dateOfJoining) >>> + { >>> + this.dateOfJoining = dateOfJoining; >>> + } >>> + >>> + public Address getAddress() >>> + { >>> + return address; >>> + } >>> + >>> + public void setAddress(Address address) >>> + { >>> + this.address = address; >>> + } >>> + } >>> + >>> + public static class Address >>> + { >>> + >>> + private String city; >>> + private String country; >>> + >>> + public String getCity() >>> + { >>> + return city; >>> + } >>> + >>> + public void setCity(String city) >>> + { >>> + this.city = city; >>> + } >>> + >>> + public String getCountry() >>> + { >>> + return country; >>> + } >>> + >>> + public void setCountry(String country) >>> + { >>> + this.country = country; >>> + } >>> + >>> + } >>> + >>> +} >>> >>> >>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java >>> ---------------------------------------------------------------------- >>> diff --git >>> a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java >>> b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java >>> new file mode 100644 >>> index 0000000..3c31ad0 >>> --- /dev/null >>> +++ >>> b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java >>> @@ -0,0 +1,172 @@ >>> +package com.datatorrent.contrib.schema.parser; >>> + >>> +import java.util.Date; >>> + >>> +import org.joda.time.DateTime; >>> +import org.junit.Assert; >>> +import org.junit.Rule; >>> +import org.junit.Test; >>> +import org.junit.rules.TestWatcher; >>> +import org.junit.runner.Description; >>> + >>> +import com.datatorrent.contrib.schema.parser.CsvParser; >>> +import com.datatorrent.lib.testbench.CollectorTestSink; >>> +import com.datatorrent.lib.util.TestUtils; >>> + >>> +public class CsvParserTest >>> +{ >>> + >>> + CsvParser operator; >>> + CollectorTestSink<Object> validDataSink; >>> + CollectorTestSink<String> invalidDataSink; >>> + >>> + @Rule >>> + public Watcher watcher = new Watcher(); >>> + >>> + public class Watcher extends TestWatcher >>> + { >>> + >>> + @Override >>> + protected void starting(Description description) >>> + { >>> + super.starting(description); >>> + operator = new CsvParser(); >>> + operator.setClazz(EmployeeBean.class); >>> + >>> operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date"); >>> + validDataSink = new CollectorTestSink<Object>(); >>> + invalidDataSink = new CollectorTestSink<String>(); >>> + TestUtils.setSink(operator.out, validDataSink); >>> + TestUtils.setSink(operator.err, invalidDataSink); >>> + } >>> + >>> + @Override >>> + protected void finished(Description description) >>> + { >>> + super.finished(description); >>> + operator.teardown(); >>> + } >>> + >>> + } >>> + >>> + @Test >>> + public void testCsvToPojoWriterDefault() >>> + { >>> + operator.setup(null); >>> + String tuple = "john,cs,1,01/01/2015"; >>> + operator.in.process(tuple); >>> + Assert.assertEquals(1, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); >>> + Object obj = validDataSink.collectedTuples.get(0); >>> + Assert.assertNotNull(obj); >>> + Assert.assertEquals(EmployeeBean.class, obj.getClass()); >>> + EmployeeBean pojo = (EmployeeBean)obj; >>> + Assert.assertEquals("john", pojo.getName()); >>> + Assert.assertEquals("cs", pojo.getDept()); >>> + Assert.assertEquals(1, pojo.getEid()); >>> + Assert.assertEquals(new DateTime().withDate(2015, 1, >>> 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( >>> + pojo.getDateOfJoining())); >>> + } >>> + >>> + @Test >>> + public void testCsvToPojoWriterDateFormat() >>> + { >>> + >>> operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy"); >>> + operator.setup(null); >>> + String tuple = "john,cs,1,01-JAN-2015"; >>> + operator.in.process(tuple); >>> + Assert.assertEquals(1, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); >>> + Object obj = validDataSink.collectedTuples.get(0); >>> + Assert.assertNotNull(obj); >>> + Assert.assertEquals(EmployeeBean.class, obj.getClass()); >>> + EmployeeBean pojo = (EmployeeBean)obj; >>> + Assert.assertEquals("john", pojo.getName()); >>> + Assert.assertEquals("cs", pojo.getDept()); >>> + Assert.assertEquals(1, pojo.getEid()); >>> + Assert.assertEquals(new DateTime().withDate(2015, 1, >>> 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( >>> + pojo.getDateOfJoining())); >>> + } >>> + >>> + @Test >>> + public void testCsvToPojoWriterDateFormatMultiple() >>> + { >>> + >>> operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy,dateOfBirth:date"); >>> + operator.setup(null); >>> + String tuple = "john,cs,1,01-JAN-2015,01/01/2015"; >>> + operator.in.process(tuple); >>> + Assert.assertEquals(1, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); >>> + Object obj = validDataSink.collectedTuples.get(0); >>> + Assert.assertNotNull(obj); >>> + Assert.assertEquals(EmployeeBean.class, obj.getClass()); >>> + EmployeeBean pojo = (EmployeeBean)obj; >>> + Assert.assertEquals("john", pojo.getName()); >>> + Assert.assertEquals("cs", pojo.getDept()); >>> + Assert.assertEquals(1, pojo.getEid()); >>> + Assert.assertEquals(new DateTime().withDate(2015, 1, >>> 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( >>> + pojo.getDateOfJoining())); >>> + Assert.assertEquals(new DateTime().withDate(2015, 1, >>> 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( >>> + pojo.getDateOfBirth())); >>> + } >>> + >>> + public static class EmployeeBean >>> + { >>> + >>> + private String name; >>> + private String dept; >>> + private int eid; >>> + private Date dateOfJoining; >>> + private Date dateOfBirth; >>> + >>> + public String getName() >>> + { >>> + return name; >>> + } >>> + >>> + public void setName(String name) >>> + { >>> + this.name = name; >>> + } >>> + >>> + public String getDept() >>> + { >>> + return dept; >>> + } >>> + >>> + public void setDept(String dept) >>> + { >>> + this.dept = dept; >>> + } >>> + >>> + public int getEid() >>> + { >>> + return eid; >>> + } >>> + >>> + public void setEid(int eid) >>> + { >>> + this.eid = eid; >>> + } >>> + >>> + public Date getDateOfJoining() >>> + { >>> + return dateOfJoining; >>> + } >>> + >>> + public void setDateOfJoining(Date dateOfJoining) >>> + { >>> + this.dateOfJoining = dateOfJoining; >>> + } >>> + >>> + public Date getDateOfBirth() >>> + { >>> + return dateOfBirth; >>> + } >>> + >>> + public void setDateOfBirth(Date dateOfBirth) >>> + { >>> + this.dateOfBirth = dateOfBirth; >>> + } >>> + } >>> + >>> +} >>> >>> >>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java >>> ---------------------------------------------------------------------- >>> diff --git >>> a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java >>> b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java >>> new file mode 100644 >>> index 0000000..b453508 >>> --- /dev/null >>> +++ >>> b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java >>> @@ -0,0 +1,212 @@ >>> +package com.datatorrent.contrib.schema.parser; >>> + >>> +import java.io.ByteArrayOutputStream; >>> +import java.io.File; >>> +import java.io.IOException; >>> +import java.io.PrintStream; >>> +import java.util.Date; >>> +import java.util.List; >>> + >>> +import org.apache.commons.io.FileUtils; >>> +import org.joda.time.DateTime; >>> +import org.junit.Assert; >>> +import org.junit.Rule; >>> +import org.junit.Test; >>> +import org.junit.runner.Description; >>> + >>> +import >>> com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher; >>> +import com.datatorrent.lib.testbench.CollectorTestSink; >>> +import com.datatorrent.lib.util.TestUtils; >>> +import com.datatorrent.lib.util.TestUtils.TestInfo; >>> + >>> +public class JsonParserTest >>> +{ >>> + JsonParser operator; >>> + CollectorTestSink<Object> validDataSink; >>> + CollectorTestSink<String> invalidDataSink; >>> + >>> + final ByteArrayOutputStream myOut = new ByteArrayOutputStream(); >>> + >>> + public JsonParserTest() >>> + { >>> + // So that the output is cleaner. >>> + System.setErr(new PrintStream(myOut)); >>> + } >>> + >>> + @Rule >>> + public TestInfo testMeta = new FSTestWatcher() >>> + { >>> + private void deleteDirectory() >>> + { >>> + try { >>> + FileUtils.deleteDirectory(new File(getDir())); >>> + } catch (IOException ex) { >>> + throw new RuntimeException(ex); >>> + } >>> + } >>> + >>> + @Override >>> + protected void starting(Description descriptor) >>> + { >>> + >>> + super.starting(descriptor); >>> + deleteDirectory(); >>> + >>> + operator = new JsonParser(); >>> + operator.setClazz(Test1Pojo.class); >>> + validDataSink = new CollectorTestSink<Object>(); >>> + invalidDataSink = new CollectorTestSink<String>(); >>> + TestUtils.setSink(operator.out, validDataSink); >>> + TestUtils.setSink(operator.err, invalidDataSink); >>> + operator.setup(null); >>> + operator.activate(null); >>> + >>> + operator.beginWindow(0); >>> + } >>> + >>> + @Override >>> + protected void finished(Description description) >>> + { >>> + operator.endWindow(); >>> + operator.teardown(); >>> + >>> + deleteDirectory(); >>> + super.finished(description); >>> + } >>> + }; >>> + >>> + @Test >>> + public void testJSONToPOJO() >>> + { >>> + String tuple = >>> "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}"; >>> + operator.in.put(tuple); >>> + Assert.assertEquals(1, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); >>> + Object obj = validDataSink.collectedTuples.get(0); >>> + Assert.assertNotNull(obj); >>> + Assert.assertEquals(Test1Pojo.class, obj.getClass()); >>> + Test1Pojo pojo = (Test1Pojo)obj; >>> + Assert.assertEquals(123, pojo.a); >>> + Assert.assertEquals(234876274, pojo.b); >>> + Assert.assertEquals("HowAreYou?", pojo.c); >>> + Assert.assertEquals(3, pojo.d.size()); >>> + Assert.assertEquals("ABC", pojo.d.get(0)); >>> + Assert.assertEquals("PQR", pojo.d.get(1)); >>> + Assert.assertEquals("XYZ", pojo.d.get(2)); >>> + } >>> + >>> + @Test >>> + public void testJSONToPOJODate() >>> + { >>> + String tuple = >>> "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}"; >>> + operator.setDateFormat("dd-MM-yyyy"); >>> + operator.setup(null); >>> + operator.activate(null); >>> + operator.in.put(tuple); >>> + Assert.assertEquals(1, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); >>> + Object obj = validDataSink.collectedTuples.get(0); >>> + Assert.assertNotNull(obj); >>> + Assert.assertEquals(Test1Pojo.class, obj.getClass()); >>> + Test1Pojo pojo = (Test1Pojo)obj; >>> + Assert.assertEquals(123, pojo.a); >>> + Assert.assertEquals(234876274, pojo.b); >>> + Assert.assertEquals("HowAreYou?", pojo.c); >>> + Assert.assertEquals(3, pojo.d.size()); >>> + Assert.assertEquals("ABC", pojo.d.get(0)); >>> + Assert.assertEquals("PQR", pojo.d.get(1)); >>> + Assert.assertEquals("XYZ", pojo.d.get(2)); >>> + Assert.assertEquals(2015, new DateTime(pojo.date).getYear()); >>> + Assert.assertEquals(9, new DateTime(pojo.date).getMonthOfYear()); >>> + Assert.assertEquals(15, new DateTime(pojo.date).getDayOfMonth()); >>> + } >>> + >>> + @Test >>> + public void testJSONToPOJOInvalidData() >>> + { >>> + String tuple = "{\"a\":123\"b\":234876274,\"c\":\"HowAreYou?\"}"; >>> + operator.in.put(tuple); >>> + Assert.assertEquals(0, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); >>> + Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); >>> + } >>> + >>> + @Test >>> + public void testJSONToPOJOUnknownFields() >>> + { >>> + String tuple = >>> "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"asd\":433.6}"; >>> + operator.in.put(tuple); >>> + Assert.assertEquals(1, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); >>> + Object obj = validDataSink.collectedTuples.get(0); >>> + Assert.assertNotNull(obj); >>> + Assert.assertEquals(Test1Pojo.class, obj.getClass()); >>> + Test1Pojo pojo = (Test1Pojo)obj; >>> + Assert.assertEquals(123, pojo.a); >>> + Assert.assertEquals(234876274, pojo.b); >>> + Assert.assertEquals("HowAreYou?", pojo.c); >>> + Assert.assertEquals(null, pojo.d); >>> + } >>> + >>> + @Test >>> + public void testJSONToPOJOMismatchingFields() >>> + { >>> + String tuple = >>> "{\"a\":123,\"c\":234876274,\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}"; >>> + operator.in.put(tuple); >>> + Assert.assertEquals(0, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); >>> + Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); >>> + } >>> + >>> + @Test >>> + public void testJSONToPOJOEmptyString() >>> + { >>> + String tuple = ""; >>> + operator.in.put(tuple); >>> + Assert.assertEquals(0, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); >>> + Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); >>> + } >>> + >>> + @Test >>> + public void testJSONToPOJOEmptyJSON() >>> + { >>> + String tuple = "{}"; >>> + operator.in.put(tuple); >>> + Assert.assertEquals(1, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); >>> + Object obj = validDataSink.collectedTuples.get(0); >>> + Assert.assertNotNull(obj); >>> + Assert.assertEquals(Test1Pojo.class, obj.getClass()); >>> + Test1Pojo pojo = (Test1Pojo)obj; >>> + Assert.assertEquals(0, pojo.a); >>> + Assert.assertEquals(0, pojo.b); >>> + Assert.assertEquals(null, pojo.c); >>> + Assert.assertEquals(null, pojo.d); >>> + } >>> + >>> + @Test >>> + public void testJSONToPOJOArrayInJson() >>> + { >>> + String tuple = >>> "{\"a\":123,\"c\":[234,65,23],\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}"; >>> + operator.in.put(tuple); >>> + Assert.assertEquals(0, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); >>> + Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); >>> + } >>> + >>> + public static class Test1Pojo >>> + { >>> + public int a; >>> + public long b; >>> + public String c; >>> + public List<String> d; >>> + public Date date; >>> + >>> + @Override >>> + public String toString() >>> + { >>> + return "Test1Pojo [a=" + a + ", b=" + b + ", c=" + c + ", d=" + d >>> + ", date=" + date + "]"; >>> + } >>> + } >>> +} >>> >>> >>> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java >>> ---------------------------------------------------------------------- >>> diff --git >>> a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java >>> b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java >>> new file mode 100644 >>> index 0000000..4298951 >>> --- /dev/null >>> +++ >>> b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java >>> @@ -0,0 +1,254 @@ >>> +package com.datatorrent.contrib.schema.parser; >>> + >>> +import java.util.Date; >>> + >>> +import org.joda.time.DateTime; >>> +import org.junit.Assert; >>> +import org.junit.Rule; >>> +import org.junit.Test; >>> +import org.junit.rules.TestWatcher; >>> +import org.junit.runner.Description; >>> + >>> +import com.datatorrent.lib.testbench.CollectorTestSink; >>> +import com.datatorrent.lib.util.TestUtils; >>> + >>> +public class XmlParserTest >>> +{ >>> + XmlParser operator; >>> + CollectorTestSink<Object> validDataSink; >>> + CollectorTestSink<String> invalidDataSink; >>> + >>> + @Rule >>> + public Watcher watcher = new Watcher(); >>> + >>> + public class Watcher extends TestWatcher >>> + { >>> + >>> + @Override >>> + protected void starting(Description description) >>> + { >>> + super.starting(description); >>> + operator = new XmlParser(); >>> + operator.setClazz(EmployeeBean.class); >>> + operator.setDateFormats("yyyy-MM-dd"); //setting default date >>> pattern >>> + validDataSink = new CollectorTestSink<Object>(); >>> + invalidDataSink = new CollectorTestSink<String>(); >>> + TestUtils.setSink(operator.out, validDataSink); >>> + TestUtils.setSink(operator.err, invalidDataSink); >>> + } >>> + >>> + @Override >>> + protected void finished(Description description) >>> + { >>> + super.finished(description); >>> + operator.teardown(); >>> + } >>> + >>> + } >>> + >>> + @Test >>> + public void testXmlToPojoWithoutAlias() >>> + { >>> + String tuple = >>> "<com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>" + >>> "<name>john</name>" >>> + + "<dept>cs</dept>" + "<eid>1</eid>" + >>> "<dateOfJoining>2015-01-01</dateOfJoining>" >>> + + >>> "</com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>"; >>> + >>> + operator.setup(null); >>> + operator.activate(null); >>> + operator.in.process(tuple); >>> + Assert.assertEquals(1, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); >>> + Object obj = validDataSink.collectedTuples.get(0); >>> + Assert.assertNotNull(obj); >>> + Assert.assertEquals(EmployeeBean.class, obj.getClass()); >>> + EmployeeBean pojo = (EmployeeBean)obj; >>> + Assert.assertEquals("john", pojo.getName()); >>> + Assert.assertEquals("cs", pojo.getDept()); >>> + Assert.assertEquals(1, pojo.getEid()); >>> + Assert.assertEquals(2015, new >>> DateTime(pojo.getDateOfJoining()).getYear()); >>> + Assert.assertEquals(1, new >>> DateTime(pojo.getDateOfJoining()).getMonthOfYear()); >>> + Assert.assertEquals(1, new >>> DateTime(pojo.getDateOfJoining()).getDayOfMonth()); >>> + } >>> + >>> + @Test >>> + public void testXmlToPojoWithAliasDateFormat() >>> + { >>> + String tuple = "<EmployeeBean>" + "<name>john</name>" + >>> "<dept>cs</dept>" + "<eid>1</eid>" >>> + + "<dateOfJoining>2015-JAN-01</dateOfJoining>" + >>> "</EmployeeBean>"; >>> + >>> + operator.setAlias("EmployeeBean"); >>> + operator.setDateFormats("yyyy-MM-dd,yyyy-MMM-dd"); >>> + operator.setup(null); >>> + operator.activate(null); >>> + operator.in.process(tuple); >>> + Assert.assertEquals(1, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); >>> + Object obj = validDataSink.collectedTuples.get(0); >>> + Assert.assertNotNull(obj); >>> + Assert.assertEquals(EmployeeBean.class, obj.getClass()); >>> + EmployeeBean pojo = (EmployeeBean)obj; >>> + Assert.assertEquals("john", pojo.getName()); >>> + Assert.assertEquals("cs", pojo.getDept()); >>> + Assert.assertEquals(1, pojo.getEid()); >>> + Assert.assertEquals(2015, new >>> DateTime(pojo.getDateOfJoining()).getYear()); >>> + Assert.assertEquals(1, new >>> DateTime(pojo.getDateOfJoining()).getMonthOfYear()); >>> + Assert.assertEquals(1, new >>> DateTime(pojo.getDateOfJoining()).getDayOfMonth()); >>> + } >>> + >>> + @Test >>> + public void testXmlToPojoWithAlias() >>> + { >>> + String tuple = "<EmployeeBean>" + "<name>john</name>" + >>> "<dept>cs</dept>" + "<eid>1</eid>" >>> + + "<dateOfJoining>2015-01-01</dateOfJoining>" + >>> "</EmployeeBean>"; >>> + >>> + operator.setAlias("EmployeeBean"); >>> + operator.setup(null); >>> + operator.activate(null); >>> + operator.in.process(tuple); >>> + Assert.assertEquals(1, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); >>> + Object obj = validDataSink.collectedTuples.get(0); >>> + Assert.assertNotNull(obj); >>> + Assert.assertEquals(EmployeeBean.class, obj.getClass()); >>> + EmployeeBean pojo = (EmployeeBean)obj; >>> + Assert.assertEquals("john", pojo.getName()); >>> + Assert.assertEquals("cs", pojo.getDept()); >>> + Assert.assertEquals(1, pojo.getEid()); >>> + Assert.assertEquals(2015, new >>> DateTime(pojo.getDateOfJoining()).getYear()); >>> + Assert.assertEquals(1, new >>> DateTime(pojo.getDateOfJoining()).getMonthOfYear()); >>> + Assert.assertEquals(1, new >>> DateTime(pojo.getDateOfJoining()).getDayOfMonth()); >>> + } >>> + >>> + @Test >>> + public void testXmlToPojoIncorrectXML() >>> + { >>> + String tuple = "<EmployeeBean>" >>> + + "<firstname>john</firstname>" //incorrect field name >>> + + "<dept>cs</dept>" + "<eid>1</eid>" + >>> "<dateOfJoining>2015-01-01 00:00:00.00 IST</dateOfJoining>" >>> + + "</EmployeeBean>"; >>> + >>> + operator.setAlias("EmployeeBean"); >>> + operator.setup(null); >>> + operator.activate(null); >>> + operator.in.process(tuple); >>> + Assert.assertEquals(0, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); >>> + Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); >>> + } >>> + >>> + @Test >>> + public void testXmlToPojoWithoutAliasHeirarchical() >>> + { >>> + String tuple = >>> "<com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>" + >>> "<name>john</name>" >>> + + "<dept>cs</dept>" + "<eid>1</eid>" + >>> "<dateOfJoining>2015-01-01</dateOfJoining>" + "<address>" >>> + + "<city>new york</city>" + "<country>US</country>" + >>> "</address>" >>> + + >>> "</com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>"; >>> + >>> + operator.setup(null); >>> + operator.activate(null); >>> + operator.in.process(tuple); >>> + Assert.assertEquals(1, validDataSink.collectedTuples.size()); >>> + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); >>> + Object obj = validDataSink.collectedTuples.get(0); >>> + Assert.assertNotNull(obj); >>> + Assert.assertEquals(EmployeeBean.class, obj.getClass()); >>> + EmployeeBean pojo = (EmployeeBean)obj; >>> + Assert.assertEquals("john", pojo.getName()); >>> + Assert.assertEquals("cs", pojo.getDept()); >>> + Assert.assertEquals(1, pojo.getEid()); >>> + Assert.assertEquals(Address.class, pojo.getAddress().getClass()); >>> + Assert.assertEquals("new york", pojo.getAddress().getCity()); >>> + Assert.assertEquals("US", pojo.getAddress().getCountry()); >>> + Assert.assertEquals(2015, new >>> DateTime(pojo.getDateOfJoining()).getYear()); >>> + Assert.assertEquals(1, new >>> DateTime(pojo.getDateOfJoining()).getMonthOfYear()); >>> + Assert.assertEquals(1, new >>> DateTime(pojo.getDateOfJoining()).getDayOfMonth()); >>> + } >>> + >>> + public static class EmployeeBean >>> + { >>> + >>> + private String name; >>> + private String dept; >>> + private int eid; >>> + private Date dateOfJoining; >>> + private Address address; >>> + >>> + public String getName() >>> + { >>> + return name; >>> + } >>> + >>> + public void setName(String name) >>> + { >>> + this.name = name; >>> + } >>> + >>> + public String getDept() >>> + { >>> + return dept; >>> + } >>> + >>> + public void setDept(String dept) >>> + { >>> + this.dept = dept; >>> + } >>> + >>> + public int getEid() >>> + { >>> + return eid; >>> + } >>> + >>> + public void setEid(int eid) >>> + { >>> + this.eid = eid; >>> + } >>> + >>> + public Date getDateOfJoining() >>> + { >>> + return dateOfJoining; >>> + } >>> + >>> + public void setDateOfJoining(Date dateOfJoining) >>> + { >>> + this.dateOfJoining = dateOfJoining; >>> + } >>> + >>> + public Address getAddress() >>> + { >>> + return address; >>> + } >>> + >>> + public void setAddress(Address address) >>> + { >>> + this.address = address; >>> + } >>> + } >>> + >>> + public static class Address >>> + { >>> + >>> + private String city; >>> + private String country; >>> + >>> + public String getCity() >>> + { >>> + return city; >>> + } >>> + >>> + public void setCity(String city) >>> + { >>> + this.city = city; >>> + } >>> + >>> + public String getCountry() >>> + { >>> + return country; >>> + } >>> + >>> + public void setCountry(String country) >>> + { >>> + this.country = country; >>> + } >>> + } >>> + >>> +} >>> >>> >> >