This is an automated email from the ASF dual-hosted git repository. sblackmon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/streams.git
commit 55ef8ff77f47d394187a440f4aecfa7318d87ac9 Author: sblackmon <[email protected]> AuthorDate: Thu Aug 20 14:03:42 2020 -0500 gets ride of streams datum which contains joda DateTime in the flink pipelines --- .../apache/streams/examples/flink/FlinkBase.scala | 18 ++++--- .../collection/FlinkTwitterFollowingPipeline.scala | 7 +-- .../collection/FlinkTwitterPostsPipeline.scala | 30 ++--------- .../collection/FlinkTwitterSpritzerPipeline.scala | 35 +------------ .../FlinkTwitterUserInformationPipeline.scala | 28 ++-------- .../FollowingCollectorFlatMapFunction.scala | 27 +++++----- .../flink/twitter/collection/SpritzerSource.scala | 59 ++++++++++++++++++++++ .../TimelineCollectorFlatMapFunction.scala | 28 +++++----- .../UserInformationCollectorFlatMapFunction.scala | 26 ++++++---- 9 files changed, 124 insertions(+), 134 deletions(-) diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala index 86e99ab..980f5eb 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala @@ -38,6 +38,16 @@ import org.slf4j.LoggerFactory /** * FlinkBase is a base class with capabilities common to all of the streams flink examples. */ +object FlinkBase { + def toProviderId(input : String) : String = { + if( input.startsWith("@") ) + return input.substring(1) + if( input.contains(':')) + input.substring(input.lastIndexOf(':')+1) + else input + } +} + trait FlinkBase { private val BASELOGGER = LoggerFactory.getLogger("FlinkBase") @@ -212,12 +222,4 @@ trait FlinkBase { outPathBuilder } - def toProviderId(input : String) : String = { - if( input.startsWith("@") ) - return input.substring(1) - if( input.contains(':')) - input.substring(input.lastIndexOf(':')+1) - else input - } - } \ No newline at end of file diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala index fce134d..4cda31e 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala @@ -135,14 +135,9 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs ) // these datums contain 'Follow' objects - val followDatums: DataStream[StreamsDatum] = keyed_ids. + val follows: DataStream[Follow] = keyed_ids. flatMap(new FollowingCollectorFlatMapFunction(streamsConfig, config.getTwitter, streamsFlinkConfiguration)). - name("followDatums"). - setParallelism(streamsConfig.getParallelism().toInt) - - val follows: DataStream[Follow] = followDatums. name("follows") - .map(datum => datum.getDocument.asInstanceOf[Follow]) val jsons: DataStream[String] = follows. name("jsons") diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala index 94540dd..6d49a5a 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala @@ -142,12 +142,10 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new setParallelism(streamsConfig.getParallelism().toInt). keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs ) - // these datums contain 'Tweet' objects - val tweetDatums: DataStream[StreamsDatum] = - keyed_ids.flatMap(new postCollectorFlatMapFunction).setParallelism(env.getParallelism).name("tweetDatums") - - val tweets: DataStream[Tweet] = tweetDatums - .map(datum => datum.getDocument.asInstanceOf[Tweet]).name("tweets") + val tweets: DataStream[Tweet] = keyed_ids. + flatMap(new TimelineCollectorFlatMapFunction(streamsConfig, config.getTwitter, streamsFlinkConfiguration)). + setParallelism(env.getParallelism). + name("tweets") val jsons: DataStream[String] = tweets .map(tweet => { @@ -180,24 +178,4 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new } - class postCollectorFlatMapFunction extends RichFlatMapFunction[String, StreamsDatum] with Serializable { - override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = { - collectPosts(input, out) - } - def collectPosts(id : String, out : Collector[StreamsDatum]) = { - val twitterConfiguration = config.getTwitter - twitterConfiguration.setInfo(List(toProviderId(id))) - val twitProvider: TwitterTimelineProvider = - new TwitterTimelineProvider(twitterConfiguration) - twitProvider.prepare(twitProvider) - twitProvider.startStream() - var iterator: Iterator[StreamsDatum] = null - do { - Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, TimeUnit.MILLISECONDS) - twitProvider.readCurrent().iterator().toList.map(out.collect(_)) - } while( twitProvider.isRunning ) - } - } - - } diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala index 0a3de4f..6cdedf2 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala @@ -117,7 +117,7 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration import FlinkTwitterSpritzerPipeline._ - val spritzerSource = new SpritzerSource(config.getTwitter) + val spritzerSource = new SpritzerSource(streamsConfig, config.getTwitter, streamsFlinkConfiguration) override def run(): Unit = { @@ -160,37 +160,4 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration spritzerSource.cancel() } - class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable /*with StoppableFunction*/ { - - var mapper: ObjectMapper = _ - - var twitProvider: TwitterStreamProvider = _ - - @throws[Exception] - override def open(parameters: Configuration): Unit = { - mapper = StreamsJacksonMapper.getInstance(TwitterDateTimeFormat.TWITTER_FORMAT) - twitProvider = new TwitterStreamProvider( sourceConfig ) - twitProvider.prepare(twitProvider) - twitProvider.startStream() - } - - override def run(ctx: SourceFunction.SourceContext[String]): Unit = { - var iterator: Iterator[StreamsDatum] = null - do { - Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, TimeUnit.MILLISECONDS) - iterator = twitProvider.readCurrent().iterator() - iterator.toList.map(datum => ctx.collect(mapper.writeValueAsString(datum.getDocument))) - } while( twitProvider.isRunning ) - } - - override def cancel(): Unit = { - close() - } - -// override def stop(): Unit = { -// close() -// } - } - - } diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala index eb15556..45f34ff 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala @@ -41,8 +41,8 @@ import org.apache.flink.streaming.api.windowing.windows.GlobalWindow import org.apache.flink.util.Collector import org.apache.streams.config.ComponentConfigurator import org.apache.streams.config.StreamsConfigurator -import org.apache.streams.core.StreamsDatum import org.apache.streams.examples.flink.FlinkBase +import org.apache.streams.examples.flink.FlinkBase.toProviderId import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration import org.apache.streams.hdfs.HdfsReaderConfiguration import org.apache.streams.hdfs.HdfsWriterConfiguration @@ -148,11 +148,9 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline val idLists: DataStream[List[String]] = idWindows.apply[List[String]] (new idListWindowFunction()).name("idLists") - val userDatums: DataStream[StreamsDatum] = idLists.flatMap(new profileCollectorFlatMapFunction).setParallelism(env.getParallelism).name("userDatums") + val users: DataStream[User] = idLists.flatMap(new UserInformationCollectorFlatMapFunction(streamsConfig, config.getTwitter, streamsFlinkConfiguration)).setParallelism(env.getParallelism).name("users") - val user: DataStream[User] = userDatums.map(datum => datum.getDocument.asInstanceOf[User]).name("users") - - val jsons: DataStream[String] = user + val jsons: DataStream[String] = users .map(user => { val MAPPER = StreamsJacksonMapper.getInstance MAPPER.writeValueAsString(user) @@ -167,7 +165,6 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline withRollingPolicy(rollingPolicy). withBucketAssigner(basePathBucketAssigner).build(); - if( config.getTest == true ) { keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE) } else { @@ -192,23 +189,4 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline } } - class profileCollectorFlatMapFunction extends RichFlatMapFunction[List[String], StreamsDatum] with Serializable { - override def flatMap(input: List[String], out: Collector[StreamsDatum]): Unit = { - collectProfiles(input, out) - } - def collectProfiles(ids : List[String], out : Collector[StreamsDatum]) = { - val twitterConfiguration = config.getTwitter - val twitProvider: TwitterUserInformationProvider = - new TwitterUserInformationProvider( - twitterConfiguration.withInfo(ids) - ) - twitProvider.prepare(twitProvider) - twitProvider.startStream() - var iterator: Iterator[StreamsDatum] = null - do { - Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, TimeUnit.MILLISECONDS) - twitProvider.readCurrent().iterator().toList.map(out.collect(_)) - } while( twitProvider.isRunning ) - } - } } diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala index 1223047..83d1275 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala @@ -10,9 +10,10 @@ import org.apache.flink.util.Collector import org.apache.streams.config.ComponentConfigurator import org.apache.streams.config.StreamsConfiguration import org.apache.streams.core.StreamsDatum -import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline.toProviderId +import org.apache.streams.examples.flink.FlinkBase.toProviderId import org.apache.streams.flink.StreamsFlinkConfiguration import org.apache.streams.twitter.config.TwitterFollowingConfiguration +import org.apache.streams.twitter.pojo.Follow import org.apache.streams.twitter.provider.TwitterFollowingProvider import scala.collection.JavaConversions._ @@ -21,31 +22,33 @@ class FollowingCollectorFlatMapFunction( streamsConfiguration : StreamsConfiguration, twitterConfiguration : TwitterFollowingConfiguration = new ComponentConfigurator(classOf[TwitterFollowingConfiguration]).detectConfiguration(), flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator(classOf[StreamsFlinkConfiguration]).detectConfiguration() - ) extends RichFlatMapFunction[String, StreamsDatum] with Serializable { + ) extends RichFlatMapFunction[String, Follow] with Serializable { - var size : IntCounter = new IntCounter() - var counter : IntCounter = new IntCounter() + var userids : IntCounter = new IntCounter() + var follows : IntCounter = new IntCounter() override def open(parameters: Configuration): Unit = { - getRuntimeContext().addAccumulator("FlinkTwitterFollowingPipeline.size", this.size) - getRuntimeContext().addAccumulator("FlinkTwitterFollowingPipeline.counter", this.counter) + getRuntimeContext().addAccumulator("FlinkTwitterFollowingPipeline.userids", this.userids) + getRuntimeContext().addAccumulator("FlinkTwitterFollowingPipeline.follows", this.follows) } - override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = { - size.add(input.size) + override def flatMap(input: String, out: Collector[Follow]): Unit = { + userids.add(input.size) collectConnections(input, out) } - def collectConnections(id : String, out : Collector[StreamsDatum]) = { + def collectConnections(id : String, out : Collector[Follow]) = { val conf = twitterConfiguration.withInfo(List(toProviderId(id))).asInstanceOf[TwitterFollowingConfiguration] val twitProvider: TwitterFollowingProvider = new TwitterFollowingProvider(conf) twitProvider.prepare(twitProvider) twitProvider.startStream() do { Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS) - val current = twitProvider.readCurrent().iterator().toList - counter.add(current.size) - current.map(out.collect(_)) + val current : List[StreamsDatum] = twitProvider.readCurrent().iterator().toList + follows.add(current.size) + for( datum <- current ) { + out.collect(datum.getDocument().asInstanceOf[Follow]) + } } while( twitProvider.isRunning ) } diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/SpritzerSource.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/SpritzerSource.scala new file mode 100644 index 0000000..c5c1e50 --- /dev/null +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/SpritzerSource.scala @@ -0,0 +1,59 @@ +package org.apache.streams.examples.flink.twitter.collection + +import java.io.Serializable +import java.util.Objects +import java.util.concurrent.TimeUnit + +import com.fasterxml.jackson.databind.ObjectMapper +import com.google.common.util.concurrent.Uninterruptibles +import org.apache.flink.configuration.Configuration +import org.apache.flink.api.common.accumulators.IntCounter +import org.apache.flink.streaming.api.functions.source.RichSourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator} +import org.apache.streams.core.StreamsDatum +import org.apache.streams.flink.StreamsFlinkConfiguration +import org.apache.streams.jackson.StreamsJacksonMapper +import org.apache.streams.twitter.config.TwitterStreamConfiguration +import org.apache.streams.twitter.converter.TwitterDateTimeFormat +import org.apache.streams.twitter.provider.TwitterStreamProvider + +import scala.collection.JavaConversions._ + +class SpritzerSource( + streamsConfiguration : StreamsConfiguration, + twitterConfiguration : TwitterStreamConfiguration = new ComponentConfigurator(classOf[TwitterStreamConfiguration]).detectConfiguration(), + flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator(classOf[StreamsFlinkConfiguration]).detectConfiguration() + ) extends RichSourceFunction[String] with Serializable /*with StoppableFunction*/ { + + var mapper: ObjectMapper = _ + + var twitProvider: TwitterStreamProvider = _ + + var items : IntCounter = new IntCounter() + + @throws[Exception] + override def open(parameters: Configuration): Unit = { + mapper = StreamsJacksonMapper.getInstance(TwitterDateTimeFormat.TWITTER_FORMAT) + getRuntimeContext().addAccumulator("SpritzerSource.items", this.items) + twitProvider = new TwitterStreamProvider( twitterConfiguration ) + twitProvider.prepare(twitProvider) + twitProvider.startStream() + } + + override def run(ctx: SourceFunction.SourceContext[String]): Unit = { + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS) + val current : List[StreamsDatum] = twitProvider.readCurrent().iterator().toList + items.add(current.size) + for( item <- current ) { + ctx.collect(mapper.writeValueAsString(item.getDocument)) + } + } while( twitProvider.isRunning ) + } + + override def cancel(): Unit = { + close() + } + +} \ No newline at end of file diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala index 8f21145..bbf70a5 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala @@ -9,8 +9,10 @@ import org.apache.flink.configuration.Configuration import org.apache.flink.util.Collector import org.apache.streams.config.StreamsConfiguration import org.apache.streams.core.StreamsDatum +import org.apache.streams.examples.flink.FlinkBase.toProviderId import org.apache.streams.flink.StreamsFlinkConfiguration import org.apache.streams.twitter.config.TwitterTimelineProviderConfiguration +import org.apache.streams.twitter.pojo.Tweet import org.apache.streams.twitter.provider.TwitterTimelineProvider import scala.collection.JavaConversions._ @@ -22,28 +24,30 @@ class TimelineCollectorFlatMapFunction( streamsConfiguration : StreamsConfiguration, twitterConfiguration : TwitterTimelineProviderConfiguration, streamsFlinkConfiguration : StreamsFlinkConfiguration - ) extends RichFlatMapFunction[List[String], StreamsDatum] with Serializable { - var size : IntCounter = new IntCounter() - var counter : IntCounter = new IntCounter() + ) extends RichFlatMapFunction[String, Tweet] with Serializable { + var userids : IntCounter = new IntCounter() + var posts : IntCounter = new IntCounter() override def open(parameters: Configuration): Unit = { - getRuntimeContext().addAccumulator("TimelineCollectorFlatMapFunction.size", this.size) - getRuntimeContext().addAccumulator("TimelineCollectorFlatMapFunction.counter", this.counter) + getRuntimeContext().addAccumulator("TimelineCollectorFlatMapFunction.userids", this.userids) + getRuntimeContext().addAccumulator("TimelineCollectorFlatMapFunction.posts", this.posts) } - override def flatMap(input: List[String], out: Collector[StreamsDatum]): Unit = { - size.add(input.size) + override def flatMap(input: String, out: Collector[Tweet]): Unit = { + userids.add(input.size) collectPosts(input, out) } - def collectPosts(ids : List[String], out : Collector[StreamsDatum]) = { + def collectPosts(id : String, out : Collector[Tweet]) = { try { - val conf = twitterConfiguration.withInfo(ids).asInstanceOf[TwitterTimelineProviderConfiguration] + val conf = twitterConfiguration.withInfo(List(toProviderId(id))).asInstanceOf[TwitterTimelineProviderConfiguration] val twitProvider: TwitterTimelineProvider = new TwitterTimelineProvider(conf) twitProvider.prepare(conf) twitProvider.startStream() do { Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS) - val current = twitProvider.readCurrent().iterator().toList - counter.add(current.size) - current.map(out.collect(_)) + val current : List[StreamsDatum] = twitProvider.readCurrent().iterator().toList + posts.add(current.size) + for( datum <- current ) { + out.collect(datum.getDocument().asInstanceOf[Tweet]) + } } while( twitProvider.isRunning ) } finally { diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/UserInformationCollectorFlatMapFunction.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/UserInformationCollectorFlatMapFunction.scala index 46e0d4a..41dde97 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/UserInformationCollectorFlatMapFunction.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/UserInformationCollectorFlatMapFunction.scala @@ -9,9 +9,11 @@ import org.apache.flink.configuration.Configuration import org.apache.flink.util.Collector import org.apache.streams.config.StreamsConfiguration import org.apache.streams.core.StreamsDatum +import org.apache.streams.examples.flink.FlinkBase.toProviderId import org.apache.streams.flink.StreamsFlinkConfiguration import org.apache.streams.twitter.config.TwitterUserInformationConfiguration import org.apache.streams.twitter.provider.TwitterUserInformationProvider +import org.apache.streams.twitter.pojo.User import scala.collection.JavaConversions._ @@ -22,27 +24,29 @@ class UserInformationCollectorFlatMapFunction( streamsConfiguration : StreamsConfiguration, twitterConfiguration : TwitterUserInformationConfiguration, streamsFlinkConfiguration : StreamsFlinkConfiguration - ) extends RichFlatMapFunction[List[String], StreamsDatum] with Serializable { - var size : IntCounter = new IntCounter() - var counter : IntCounter = new IntCounter() + ) extends RichFlatMapFunction[List[String], User] with Serializable { + var ids : IntCounter = new IntCounter() + var users : IntCounter = new IntCounter() override def open(parameters: Configuration): Unit = { - getRuntimeContext().addAccumulator("UserInformationCollectorFlatMapFunction.size", this.size) - getRuntimeContext().addAccumulator("UserInformationCollectorFlatMapFunction.counter", this.counter) + getRuntimeContext().addAccumulator("UserInformationCollectorFlatMapFunction.ids", this.ids) + getRuntimeContext().addAccumulator("UserInformationCollectorFlatMapFunction.users", this.users) } - override def flatMap(input: List[String], out: Collector[StreamsDatum]): Unit = { - size.add(input.size) + override def flatMap(input: List[String], out: Collector[User]): Unit = { + ids.add(input.size) collectProfiles(input, out) } - def collectProfiles(ids : List[String], out : Collector[StreamsDatum]) = { + def collectProfiles(ids : List[String], out : Collector[User]) = { val conf = twitterConfiguration.withInfo(ids) val twitProvider: TwitterUserInformationProvider = new TwitterUserInformationProvider(conf) twitProvider.prepare(conf) twitProvider.startStream() do { Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS) - val current = twitProvider.readCurrent().iterator().toList - counter.add(current.size) - current.map(out.collect(_)) + val current : List[StreamsDatum] = twitProvider.readCurrent().iterator().toList + users.add(current.size) + for( datum <- current ) { + out.collect(datum.getDocument().asInstanceOf[User]) + } } while( twitProvider.isRunning ) } }
