This is an automated email from the ASF dual-hosted git repository. sblackmon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/streams.git
commit c176d09435b619e4c1e8b6d66e2d10b24adc68e3 Author: sblackmon <sblack...@apache.org> AuthorDate: Wed Aug 12 11:07:16 2020 -0500 STREAMS-671 flink twitter examples output files do not contain date-time fields switch the twitter pojos to use java.util.Date and build them without additionalProperties maps, --- streams-contrib/streams-provider-twitter/pom.xml | 42 +++++++++-- .../org/apache/streams/twitter/api/Twitter.java | 5 +- .../streams/twitter/converter/TwitterDateSwap.java | 51 +++++++++++++ .../converter/util/TwitterActivityUtil.java | 5 +- .../flink-twitter-collection/pom.xml | 83 ++++++++++++++++------ .../TwitterFollowingPipelineConfiguration.json | 6 +- .../TwitterPostsPipelineConfiguration.json | 6 +- .../TwitterSpritzerPipelineConfiguration.json | 6 +- ...witterUserInformationPipelineConfiguration.json | 6 +- .../java/org/apache/streams/juneau/DateSwap.java | 69 ++++++++++++++++++ 10 files changed, 238 insertions(+), 41 deletions(-) diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml index 40685a4..01ef834 100644 --- a/streams-contrib/streams-provider-twitter/pom.xml +++ b/streams-contrib/streams-provider-twitter/pom.xml @@ -31,6 +31,7 @@ <properties> <skipITs>true</skipITs> + <jsonschema2pojo.plugin.version>1.0.2</jsonschema2pojo.plugin.version> </properties> <dependencies> @@ -147,15 +148,46 @@ </testResources> <plugins> <plugin> - <groupId>org.apache.streams.plugins</groupId> - <artifactId>streams-plugin-pojo</artifactId> - <version>${project.version}</version> + <groupId>org.jsonschema2pojo</groupId> + <artifactId>jsonschema2pojo-maven-plugin</artifactId> + <version>${jsonschema2pojo.plugin.version}</version> + <dependencies> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-persist-hdfs</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams.plugins</groupId> + <artifactId>streams-plugin-pojo</artifactId> + <version>${streams.version}</version> + </dependency> + </dependencies> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>generate</goal> + </goals> + </execution> + </executions> <configuration> + <annotationStyle>jackson2</annotationStyle> + <customAnnotator>org.apache.streams.plugins.JuneauPojoAnnotator</customAnnotator> + <generateBuilders>true</generateBuilders> + <includeAdditionalProperties>false</includeAdditionalProperties> + <includeDynamicAccessors>true</includeDynamicAccessors> + <includeHashcodeAndEquals>true</includeHashcodeAndEquals> + <outputDirectory>${project.basedir}/target/generated-sources/pojo</outputDirectory> + <propertyWordDelimiters></propertyWordDelimiters> + <removeOldOutput>true</removeOldOutput> + <serializable>true</serializable> <sourcePaths> <sourcePath>${project.basedir}/src/main/jsonschema</sourcePath> </sourcePaths> - <targetDirectory>${project.basedir}/target/generated-sources/pojo</targetDirectory> - <targetPackage>org.apache.streams.twitter.pojo</targetPackage> + <useDoubleNumbers>true</useDoubleNumbers> + <useLongIntegers>true</useLongIntegers> + <useJodaDates>false</useJodaDates> </configuration> </plugin> <plugin> diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java index ddb3d28..ebb9c64 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java @@ -21,6 +21,7 @@ package org.apache.streams.twitter.api; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.twitter.config.TwitterConfiguration; +import org.apache.streams.twitter.converter.TwitterDateSwap; import org.apache.streams.twitter.converter.TwitterDateTimeFormat; import org.apache.streams.twitter.converter.TwitterJodaDateSwap; import org.apache.streams.twitter.pojo.DirectMessage; @@ -125,11 +126,11 @@ public class Twitter implements .parser( JsonParser.DEFAULT.builder() .ignoreUnknownBeanProperties(true) - .pojoSwaps(TwitterJodaDateSwap.class) + .pojoSwaps(TwitterJodaDateSwap.class, TwitterDateSwap.class) .build()) .serializer( JsonSerializer.DEFAULT.builder() - .pojoSwaps(TwitterJodaDateSwap.class) + .pojoSwaps(TwitterJodaDateSwap.class, TwitterDateSwap.class) .trimEmptyCollections(true) .trimEmptyMaps(true) .build()) diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDateSwap.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDateSwap.java new file mode 100644 index 0000000..f4d5705 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDateSwap.java @@ -0,0 +1,51 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.streams.twitter.converter; + +import org.apache.juneau.BeanSession; +import org.apache.juneau.ClassMeta; +import org.apache.juneau.parser.ParseException; +import org.apache.juneau.transform.StringSwap; +import org.apache.streams.data.util.RFC3339Utils; + +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Date; + +/** + * Transforms {@link Date} to {@link String Strings}. + */ + +public class TwitterDateSwap extends StringSwap<Date> { + + DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(TwitterDateTimeFormat.TWITTER_FORMAT); + + @Override /* PojoSwap */ + public String swap(BeanSession session, Date o) { + return ZonedDateTime.ofInstant(o.toInstant(), ZoneOffset.UTC).format(dateTimeFormatter); + } + + @Override /* PojoSwap */ + public Date unswap(BeanSession session, String s, ClassMeta<?> hint) { + return Date.from(LocalDateTime.parse(s, dateTimeFormatter).atZone(ZoneOffset.UTC).toInstant()); + } + +} diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java index 6f92aa3..d8fffcc 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java @@ -41,6 +41,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.commons.lang3.StringUtils; +import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,7 +87,7 @@ public class TwitterActivityUtil { throw new ActivityConversionException("Unable to determine activity id"); } try { - activity.setPublished(tweet.getCreatedAt()); + activity.setPublished(new DateTime(tweet.getCreatedAt())); } catch ( Exception ex ) { throw new ActivityConversionException("Unable to determine publishedDate", ex); } @@ -323,7 +324,7 @@ public class TwitterActivityUtil { Map<String, Object> likes = new HashMap<>(); likes.put("perspectival", tweet.getFavorited()); - likes.put("count", tweet.getAdditionalProperties().get("favorite_count")); + likes.put("count", tweet.getFavoriteCount()); extensions.put("likes", likes); diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml b/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml index 1e2de10..467536b 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml @@ -32,14 +32,16 @@ <description>Collects twitter documents using flink.</description> <properties> - <testng.version>6.9.10</testng.version> <hdfs.version>2.7.0</hdfs.version> <flink.version>1.11.1</flink.version> + <testng.version>6.9.10</testng.version> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <scalapc.version>1.1.0</scalapc.version> <scalatest.version>3.0.0</scalatest.version> <scalaxml.version>1.1.0</scalaxml.version> + + <jsonschema2pojo.plugin.version>1.0.2</jsonschema2pojo.plugin.version> </properties> <dependencies> @@ -394,22 +396,9 @@ </executions> </plugin> <plugin> - <groupId>org.apache.streams.plugins</groupId> - <artifactId>streams-plugin-pojo</artifactId> - <configuration> - <sourcePaths> - <sourcePath>${project.basedir}/src/main/jsonschema</sourcePath> - </sourcePaths> - <targetDirectory>${project.basedir}/target/generated-sources/pojo</targetDirectory> - <targetPackage>org.apache.streams.examples.flink.twitter</targetPackage> - </configuration> - <executions> - <execution> - <goals> - <goal>generate-sources</goal> - </goals> - </execution> - </executions> + <groupId>org.jsonschema2pojo</groupId> + <artifactId>jsonschema2pojo-maven-plugin</artifactId> + <version>${jsonschema2pojo.plugin.version}</version> <dependencies> <dependency> <groupId>org.apache.streams</groupId> @@ -417,12 +406,66 @@ <version>${project.version}</version> </dependency> <dependency> - <groupId>org.apache.streams</groupId> - <artifactId>streams-provider-twitter</artifactId> - <version>${project.version}</version> + <groupId>org.apache.streams.plugins</groupId> + <artifactId>streams-plugin-pojo</artifactId> + <version>${streams.version}</version> </dependency> </dependencies> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>generate</goal> + </goals> + </execution> + </executions> + <configuration> + <annotationStyle>jackson2</annotationStyle> + <customAnnotator>org.apache.streams.plugins.JuneauPojoAnnotator</customAnnotator> + <generateBuilders>true</generateBuilders> + <includeAdditionalProperties>false</includeAdditionalProperties> + <includeDynamicAccessors>true</includeDynamicAccessors> + <includeHashcodeAndEquals>true</includeHashcodeAndEquals> + <outputDirectory>${project.basedir}/target/generated-sources/pojo</outputDirectory> + <propertyWordDelimiters></propertyWordDelimiters> + <removeOldOutput>true</removeOldOutput> + <serializable>true</serializable> + <sourcePaths> + <sourcePath>${project.basedir}/src/main/jsonschema</sourcePath> +<!-- <sourcePath>${project.basedir}/../../../streams-contrib/streams-provider-twitter/src/main/jsonschema</sourcePath>--> + </sourcePaths> + <useDoubleNumbers>true</useDoubleNumbers> + <useLongIntegers>true</useLongIntegers> + <useJodaDates>false</useJodaDates> + </configuration> </plugin> +<!-- <plugin>--> +<!-- <groupId>org.apache.streams.plugins</groupId>--> +<!-- <artifactId>streams-plugin-pojo</artifactId>--> +<!-- <configuration>--> +<!-- <targetDirectory>${project.basedir}/target/generated-sources/pojo</targetDirectory>--> +<!-- <targetPackage>org.apache.streams.examples.flink.twitter</targetPackage>--> +<!-- </configuration>--> +<!-- <executions>--> +<!-- <execution>--> +<!-- <goals>--> +<!-- <goal>generate-sources</goal>--> +<!-- </goals>--> +<!-- </execution>--> +<!-- </executions>--> +<!-- <dependencies>--> +<!-- <dependency>--> +<!-- <groupId>org.apache.streams</groupId>--> +<!-- <artifactId>streams-persist-hdfs</artifactId>--> +<!-- <version>${project.version}</version>--> +<!-- </dependency>--> +<!-- <dependency>--> +<!-- <groupId>org.apache.streams</groupId>--> +<!-- <artifactId>streams-provider-twitter</artifactId>--> +<!-- <version>${project.version}</version>--> +<!-- </dependency>--> +<!-- </dependencies>--> +<!-- </plugin>--> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>build-helper-maven-plugin</artifactId> diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json index bc56a0a..6f0bd54 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json @@ -13,15 +13,15 @@ "properties": { "twitter": { "type": "object", - "javaType": "org.apache.streams.twitter.config.TwitterFollowingConfiguration" + "$ref": "../../../../../../streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterFollowingConfiguration.json" }, "source": { "type": "object", - "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration" + "$ref": "../../../../../../streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsReaderConfiguration.json" }, "destination": { "type": "object", - "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration" + "$ref": "../../../../../../streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json" }, "providerWaitMs": { "type": "integer" diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json index 892c615..32ba3c0 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json @@ -13,15 +13,15 @@ "properties": { "twitter": { "type": "object", - "javaType": "org.apache.streams.twitter.config.TwitterTimelineProviderConfiguration" + "$ref": "../../../../../../streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterTimelineProviderConfiguration.json" }, "source": { "type": "object", - "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration" + "$ref": "../../../../../../streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsReaderConfiguration.json" }, "destination": { "type": "object", - "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration" + "$ref": "../../../../../../streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json" } } } \ No newline at end of file diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json index fe0cfbb..612d905 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json @@ -13,15 +13,15 @@ "properties": { "twitter": { "type": "object", - "javaType": "org.apache.streams.twitter.config.TwitterStreamConfiguration" + "$ref": "../../../../../../streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterStreamConfiguration.json" }, "source": { "type": "object", - "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration" + "$ref": "../../../../../../streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsReaderConfiguration.json" }, "destination": { "type": "object", - "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration" + "$ref": "../../../../../../streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json" } } } \ No newline at end of file diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json index ba3b5fc..ea8f505 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json @@ -13,15 +13,15 @@ "properties": { "twitter": { "type": "object", - "javaType": "org.apache.streams.twitter.config.TwitterUserInformationConfiguration" + "$ref": "../../../../../../streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterUserInformationConfiguration.json" }, "source": { "type": "object", - "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration" + "$ref": "../../../../../../streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsReaderConfiguration.json" }, "destination": { "type": "object", - "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration" + "$ref": "../../../../../../streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json" } } } \ No newline at end of file diff --git a/streams-pojo/src/main/java/org/apache/streams/juneau/DateSwap.java b/streams-pojo/src/main/java/org/apache/streams/juneau/DateSwap.java new file mode 100644 index 0000000..7e980f2 --- /dev/null +++ b/streams-pojo/src/main/java/org/apache/streams/juneau/DateSwap.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.juneau; + +import org.apache.commons.lang3.StringUtils; +import org.apache.juneau.BeanSession; +import org.apache.juneau.ClassMeta; +import org.apache.juneau.parser.ParseException; +import org.apache.juneau.transform.StringSwap; +import org.apache.streams.data.util.RFC3339Utils; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Date; + +/** + * Transforms {@link Date} to {@link String Strings}. + */ +public class DateSwap extends StringSwap<Date> { + + DateTimeFormatter dateTimeFormatter; + + /** + * Constructor. + */ + public DateSwap() { + dateTimeFormatter = DateTimeFormatter.ISO_INSTANT; + } + + @Override /* PojoSwap */ + public String swap(BeanSession session, Date o) { + DateTimeFormatter dateTimeFormatter = this.dateTimeFormatter; + if( StringUtils.isNotBlank(session.getProperty("format", String.class, RFC3339Utils.UTC_STANDARD_FMT.toString()))) { + dateTimeFormatter = DateTimeFormatter.ofPattern(session.getProperty("format", String.class, RFC3339Utils.UTC_STANDARD_FMT.toString())); + } + return ZonedDateTime.ofInstant(o.toInstant(), ZoneOffset.UTC).format(dateTimeFormatter); + } + + @Override /* PojoSwap */ + public Date unswap(BeanSession session, String s, ClassMeta<?> hint) { + DateTimeFormatter dateTimeFormatter = this.dateTimeFormatter; + if( StringUtils.isNotBlank(session.getProperty("format", String.class, RFC3339Utils.UTC_STANDARD_FMT.toString()))) { + dateTimeFormatter = DateTimeFormatter.ofPattern(session.getProperty("format", String.class, RFC3339Utils.UTC_STANDARD_FMT.toString())); + } + return Date.from(LocalDateTime.parse(s, dateTimeFormatter).atZone(ZoneOffset.UTC).toInstant()); + } + +}