This is an automated email from the ASF dual-hosted git repository.
sblackmon pushed a commit to branch STREAMS-676
in repository https://gitbox.apache.org/repos/asf/streams.git
commit f47c42c5df3410387aba291dad48a4558869242f
Author: sblackmon
AuthorDate: Mon Oct 12 17:33:49 2020 -0500
resolves STREAMS-676
remove unnecessary keying and use GlobalWindow in
streams-examples-flink/flink-twitter-collection
---
.../flink-twitter-collection/pom.xml | 15 ++
.../main/jsonschema/StreamsFlinkConfiguration.json | 4 +--
.../apache/streams/examples/flink/FlinkBase.scala | 11
.../collection/FlinkTwitterFollowingPipeline.scala | 13 +
.../collection/FlinkTwitterPostsPipeline.scala | 19 +
.../FlinkTwitterUserInformationPipeline.scala | 32 --
.../FollowingCollectorFlatMapFunction.scala| 8 +++---
.../TimelineCollectorFlatMapFunction.scala | 8 +++---
8 files changed, 55 insertions(+), 55 deletions(-)
diff --git
a/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml
b/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml
index 467536b..dee8dfc 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml
@@ -95,6 +95,11 @@
org.apache.streams
+streams-converters
+${project.version}
+
+
+org.apache.streams
streams-provider-twitter
${project.version}
@@ -402,6 +407,16 @@
org.apache.streams
+streams-provider-twitter
+${project.version}
+
+
+org.apache.streams
+streams-converters
+${project.version}
+
+
+org.apache.streams
streams-persist-hdfs
${project.version}
diff --git
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json
index 7c6291e..b45ec14 100644
---
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json
+++
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json
@@ -12,10 +12,10 @@
},
"additionalProperties": false,
"properties": {
-"test": {
+"local": {
"type": "boolean"
},
-"local": {
+"test": {
"type": "boolean"
}
}
diff --git
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
index 980f5eb..a2123b8 100644
---
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
+++
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
@@ -29,6 +29,9 @@ import org.apache.flink.streaming.api.CheckpointingMode
import
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner
import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import org.apache.flink.util.Collector
import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
import org.apache.streams.flink.{FlinkBatchConfiguration,
FlinkStreamingConfiguration, StreamsFlinkConfiguration}
import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration,
HdfsWriterConfiguration}
@@ -46,6 +49,14 @@ object FlinkBase {
input.substring(input.lastIndexOf(':')+1)
else input
}
+
+ class idListWindowFunction extends AllWindowFunction[String, List[String],
GlobalWindow] {
+override def apply(window: GlobalWindow, input: Iterable[String], out:
Collector[List[String]]): Unit = {
+ if( input.nonEmpty )
+out.collect(input.map(id => toProviderId(id)).toList)
+}
+ }
+
}
trait FlinkBase {
diff --git
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala