This is an automated email from the ASF dual-hosted git repository.

sblackmon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/streams.git

commit 55ef8ff77f47d394187a440f4aecfa7318d87ac9
Author: sblackmon <[email protected]>
AuthorDate: Thu Aug 20 14:03:42 2020 -0500

    gets ride of streams datum which contains joda DateTime in the flink 
pipelines
---
 .../apache/streams/examples/flink/FlinkBase.scala  | 18 ++++---
 .../collection/FlinkTwitterFollowingPipeline.scala |  7 +--
 .../collection/FlinkTwitterPostsPipeline.scala     | 30 ++---------
 .../collection/FlinkTwitterSpritzerPipeline.scala  | 35 +------------
 .../FlinkTwitterUserInformationPipeline.scala      | 28 ++--------
 .../FollowingCollectorFlatMapFunction.scala        | 27 +++++-----
 .../flink/twitter/collection/SpritzerSource.scala  | 59 ++++++++++++++++++++++
 .../TimelineCollectorFlatMapFunction.scala         | 28 +++++-----
 .../UserInformationCollectorFlatMapFunction.scala  | 26 ++++++----
 9 files changed, 124 insertions(+), 134 deletions(-)

diff --git 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
index 86e99ab..980f5eb 100644
--- 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
+++ 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
@@ -38,6 +38,16 @@ import org.slf4j.LoggerFactory
 /**
   * FlinkBase is a base class with capabilities common to all of the streams 
flink examples.
   */
+object FlinkBase {
+  def toProviderId(input : String) : String = {
+    if( input.startsWith("@") )
+      return input.substring(1)
+    if( input.contains(':'))
+      input.substring(input.lastIndexOf(':')+1)
+    else input
+  }
+}
+
 trait FlinkBase {
 
   private val BASELOGGER = LoggerFactory.getLogger("FlinkBase")
@@ -212,12 +222,4 @@ trait FlinkBase {
     outPathBuilder
   }
 
-  def toProviderId(input : String) : String = {
-    if( input.startsWith("@") )
-      return input.substring(1)
-    if( input.contains(':'))
-      input.substring(input.lastIndexOf(':')+1)
-    else input
-  }
-
 }
\ No newline at end of file
diff --git 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index fce134d..4cda31e 100644
--- 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -135,14 +135,9 @@ class FlinkTwitterFollowingPipeline(config: 
TwitterFollowingPipelineConfiguratio
       keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs )
 
     // these datums contain 'Follow' objects
-    val followDatums: DataStream[StreamsDatum] = keyed_ids.
+    val follows: DataStream[Follow] = keyed_ids.
       flatMap(new FollowingCollectorFlatMapFunction(streamsConfig, 
config.getTwitter, streamsFlinkConfiguration)).
-      name("followDatums").
-      setParallelism(streamsConfig.getParallelism().toInt)
-
-    val follows: DataStream[Follow] = followDatums.
       name("follows")
-      .map(datum => datum.getDocument.asInstanceOf[Follow])
 
     val jsons: DataStream[String] = follows.
       name("jsons")
diff --git 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index 94540dd..6d49a5a 100644
--- 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -142,12 +142,10 @@ class FlinkTwitterPostsPipeline(config: 
TwitterPostsPipelineConfiguration = new
       setParallelism(streamsConfig.getParallelism().toInt).
       keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs )
 
-    // these datums contain 'Tweet' objects
-    val tweetDatums: DataStream[StreamsDatum] =
-      keyed_ids.flatMap(new 
postCollectorFlatMapFunction).setParallelism(env.getParallelism).name("tweetDatums")
-
-    val tweets: DataStream[Tweet] = tweetDatums
-      .map(datum => datum.getDocument.asInstanceOf[Tweet]).name("tweets")
+    val tweets: DataStream[Tweet] = keyed_ids.
+      flatMap(new TimelineCollectorFlatMapFunction(streamsConfig, 
config.getTwitter, streamsFlinkConfiguration)).
+      setParallelism(env.getParallelism).
+      name("tweets")
 
     val jsons: DataStream[String] = tweets
       .map(tweet => {
@@ -180,24 +178,4 @@ class FlinkTwitterPostsPipeline(config: 
TwitterPostsPipelineConfiguration = new
 
   }
 
-  class postCollectorFlatMapFunction extends RichFlatMapFunction[String, 
StreamsDatum] with Serializable {
-    override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
-      collectPosts(input, out)
-    }
-    def collectPosts(id : String, out : Collector[StreamsDatum]) = {
-      val twitterConfiguration = config.getTwitter
-      twitterConfiguration.setInfo(List(toProviderId(id)))
-      val twitProvider: TwitterTimelineProvider =
-        new TwitterTimelineProvider(twitterConfiguration)
-      twitProvider.prepare(twitProvider)
-      twitProvider.startStream()
-      var iterator: Iterator[StreamsDatum] = null
-      do {
-        Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, 
TimeUnit.MILLISECONDS)
-        twitProvider.readCurrent().iterator().toList.map(out.collect(_))
-      } while( twitProvider.isRunning )
-    }
-  }
-
-
 }
diff --git 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
index 0a3de4f..6cdedf2 100644
--- 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
+++ 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -117,7 +117,7 @@ class FlinkTwitterSpritzerPipeline(config: 
TwitterSpritzerPipelineConfiguration
 
   import FlinkTwitterSpritzerPipeline._
 
-  val spritzerSource = new SpritzerSource(config.getTwitter)
+  val spritzerSource = new SpritzerSource(streamsConfig, config.getTwitter, 
streamsFlinkConfiguration)
 
   override def run(): Unit = {
 
@@ -160,37 +160,4 @@ class FlinkTwitterSpritzerPipeline(config: 
TwitterSpritzerPipelineConfiguration
     spritzerSource.cancel()
   }
 
-  class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends 
RichSourceFunction[String] with Serializable /*with StoppableFunction*/ {
-
-    var mapper: ObjectMapper = _
-
-    var twitProvider: TwitterStreamProvider = _
-
-    @throws[Exception]
-    override def open(parameters: Configuration): Unit = {
-      mapper = 
StreamsJacksonMapper.getInstance(TwitterDateTimeFormat.TWITTER_FORMAT)
-      twitProvider = new TwitterStreamProvider( sourceConfig )
-      twitProvider.prepare(twitProvider)
-      twitProvider.startStream()
-    }
-
-    override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
-      var iterator: Iterator[StreamsDatum] = null
-      do {
-        Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, 
TimeUnit.MILLISECONDS)
-        iterator = twitProvider.readCurrent().iterator()
-        iterator.toList.map(datum => 
ctx.collect(mapper.writeValueAsString(datum.getDocument)))
-      } while( twitProvider.isRunning )
-    }
-
-    override def cancel(): Unit = {
-      close()
-    }
-
-//    override def stop(): Unit = {
-//      close()
-//    }
-  }
-
-
 }
diff --git 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index eb15556..45f34ff 100644
--- 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -41,8 +41,8 @@ import 
org.apache.flink.streaming.api.windowing.windows.GlobalWindow
 import org.apache.flink.util.Collector
 import org.apache.streams.config.ComponentConfigurator
 import org.apache.streams.config.StreamsConfigurator
-import org.apache.streams.core.StreamsDatum
 import org.apache.streams.examples.flink.FlinkBase
+import org.apache.streams.examples.flink.FlinkBase.toProviderId
 import 
org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration
 import org.apache.streams.hdfs.HdfsReaderConfiguration
 import org.apache.streams.hdfs.HdfsWriterConfiguration
@@ -148,11 +148,9 @@ class FlinkTwitterUserInformationPipeline(config: 
TwitterUserInformationPipeline
 
     val idLists: DataStream[List[String]] = idWindows.apply[List[String]] (new 
idListWindowFunction()).name("idLists")
 
-    val userDatums: DataStream[StreamsDatum] = idLists.flatMap(new 
profileCollectorFlatMapFunction).setParallelism(env.getParallelism).name("userDatums")
+    val users: DataStream[User] = idLists.flatMap(new 
UserInformationCollectorFlatMapFunction(streamsConfig, config.getTwitter, 
streamsFlinkConfiguration)).setParallelism(env.getParallelism).name("users")
 
-    val user: DataStream[User] = userDatums.map(datum => 
datum.getDocument.asInstanceOf[User]).name("users")
-
-    val jsons: DataStream[String] = user
+    val jsons: DataStream[String] = users
       .map(user => {
         val MAPPER = StreamsJacksonMapper.getInstance
         MAPPER.writeValueAsString(user)
@@ -167,7 +165,6 @@ class FlinkTwitterUserInformationPipeline(config: 
TwitterUserInformationPipeline
       withRollingPolicy(rollingPolicy).
       withBucketAssigner(basePathBucketAssigner).build();
 
-
     if( config.getTest == true ) {
       keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
     } else {
@@ -192,23 +189,4 @@ class FlinkTwitterUserInformationPipeline(config: 
TwitterUserInformationPipeline
     }
   }
 
-  class profileCollectorFlatMapFunction extends 
RichFlatMapFunction[List[String], StreamsDatum] with Serializable {
-    override def flatMap(input: List[String], out: Collector[StreamsDatum]): 
Unit = {
-      collectProfiles(input, out)
-    }
-    def collectProfiles(ids : List[String], out : Collector[StreamsDatum]) = {
-      val twitterConfiguration = config.getTwitter
-      val twitProvider: TwitterUserInformationProvider =
-        new TwitterUserInformationProvider(
-          twitterConfiguration.withInfo(ids)
-        )
-      twitProvider.prepare(twitProvider)
-      twitProvider.startStream()
-      var iterator: Iterator[StreamsDatum] = null
-      do {
-        Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, 
TimeUnit.MILLISECONDS)
-        twitProvider.readCurrent().iterator().toList.map(out.collect(_))
-      } while( twitProvider.isRunning )
-    }
-  }
 }
diff --git 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala
 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala
index 1223047..83d1275 100644
--- 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala
+++ 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala
@@ -10,9 +10,10 @@ import org.apache.flink.util.Collector
 import org.apache.streams.config.ComponentConfigurator
 import org.apache.streams.config.StreamsConfiguration
 import org.apache.streams.core.StreamsDatum
-import 
org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline.toProviderId
+import org.apache.streams.examples.flink.FlinkBase.toProviderId
 import org.apache.streams.flink.StreamsFlinkConfiguration
 import org.apache.streams.twitter.config.TwitterFollowingConfiguration
+import org.apache.streams.twitter.pojo.Follow
 import org.apache.streams.twitter.provider.TwitterFollowingProvider
 
 import scala.collection.JavaConversions._
@@ -21,31 +22,33 @@ class FollowingCollectorFlatMapFunction(
                                          streamsConfiguration : 
StreamsConfiguration,
                                          twitterConfiguration : 
TwitterFollowingConfiguration = new 
ComponentConfigurator(classOf[TwitterFollowingConfiguration]).detectConfiguration(),
                                          flinkConfiguration : 
StreamsFlinkConfiguration = new 
ComponentConfigurator(classOf[StreamsFlinkConfiguration]).detectConfiguration()
-                                       ) extends RichFlatMapFunction[String, 
StreamsDatum] with Serializable {
+                                       ) extends RichFlatMapFunction[String, 
Follow] with Serializable {
 
-  var size : IntCounter = new IntCounter()
-  var counter : IntCounter = new IntCounter()
+  var userids : IntCounter = new IntCounter()
+  var follows : IntCounter = new IntCounter()
 
   override def open(parameters: Configuration): Unit = {
-    getRuntimeContext().addAccumulator("FlinkTwitterFollowingPipeline.size", 
this.size)
-    
getRuntimeContext().addAccumulator("FlinkTwitterFollowingPipeline.counter", 
this.counter)
+    
getRuntimeContext().addAccumulator("FlinkTwitterFollowingPipeline.userids", 
this.userids)
+    
getRuntimeContext().addAccumulator("FlinkTwitterFollowingPipeline.follows", 
this.follows)
   }
 
-  override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
-    size.add(input.size)
+  override def flatMap(input: String, out: Collector[Follow]): Unit = {
+    userids.add(input.size)
     collectConnections(input, out)
   }
 
-  def collectConnections(id : String, out : Collector[StreamsDatum]) = {
+  def collectConnections(id : String, out : Collector[Follow]) = {
     val conf = 
twitterConfiguration.withInfo(List(toProviderId(id))).asInstanceOf[TwitterFollowingConfiguration]
     val twitProvider: TwitterFollowingProvider = new 
TwitterFollowingProvider(conf)
     twitProvider.prepare(twitProvider)
     twitProvider.startStream()
     do {
       
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getProviderWaitMs, 
TimeUnit.MILLISECONDS)
-      val current = twitProvider.readCurrent().iterator().toList
-      counter.add(current.size)
-      current.map(out.collect(_))
+      val current : List[StreamsDatum] = 
twitProvider.readCurrent().iterator().toList
+      follows.add(current.size)
+      for( datum <- current ) {
+        out.collect(datum.getDocument().asInstanceOf[Follow])
+      }
     } while( twitProvider.isRunning )
   }
 
diff --git 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/SpritzerSource.scala
 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/SpritzerSource.scala
new file mode 100644
index 0000000..c5c1e50
--- /dev/null
+++ 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/SpritzerSource.scala
@@ -0,0 +1,59 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.io.Serializable
+import java.util.Objects
+import java.util.concurrent.TimeUnit
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.api.common.accumulators.IntCounter
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, 
StreamsConfigurator}
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.flink.StreamsFlinkConfiguration
+import org.apache.streams.jackson.StreamsJacksonMapper
+import org.apache.streams.twitter.config.TwitterStreamConfiguration
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat
+import org.apache.streams.twitter.provider.TwitterStreamProvider
+
+import scala.collection.JavaConversions._
+
+class SpritzerSource(
+                      streamsConfiguration : StreamsConfiguration,
+                      twitterConfiguration : TwitterStreamConfiguration = new 
ComponentConfigurator(classOf[TwitterStreamConfiguration]).detectConfiguration(),
+                      flinkConfiguration : StreamsFlinkConfiguration = new 
ComponentConfigurator(classOf[StreamsFlinkConfiguration]).detectConfiguration()
+                    ) extends RichSourceFunction[String] with Serializable 
/*with StoppableFunction*/ {
+
+  var mapper: ObjectMapper = _
+
+  var twitProvider: TwitterStreamProvider = _
+
+  var items : IntCounter = new IntCounter()
+
+  @throws[Exception]
+  override def open(parameters: Configuration): Unit = {
+    mapper = 
StreamsJacksonMapper.getInstance(TwitterDateTimeFormat.TWITTER_FORMAT)
+    getRuntimeContext().addAccumulator("SpritzerSource.items", this.items)
+    twitProvider = new TwitterStreamProvider( twitterConfiguration )
+    twitProvider.prepare(twitProvider)
+    twitProvider.startStream()
+  }
+
+  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
+    do {
+      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getProviderWaitMs, 
TimeUnit.MILLISECONDS)
+      val current : List[StreamsDatum] = 
twitProvider.readCurrent().iterator().toList
+      items.add(current.size)
+      for( item <- current ) {
+        ctx.collect(mapper.writeValueAsString(item.getDocument))
+      }
+    } while( twitProvider.isRunning )
+  }
+
+  override def cancel(): Unit = {
+    close()
+  }
+
+}
\ No newline at end of file
diff --git 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala
 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala
index 8f21145..bbf70a5 100644
--- 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala
+++ 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala
@@ -9,8 +9,10 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.util.Collector
 import org.apache.streams.config.StreamsConfiguration
 import org.apache.streams.core.StreamsDatum
+import org.apache.streams.examples.flink.FlinkBase.toProviderId
 import org.apache.streams.flink.StreamsFlinkConfiguration
 import org.apache.streams.twitter.config.TwitterTimelineProviderConfiguration
+import org.apache.streams.twitter.pojo.Tweet
 import org.apache.streams.twitter.provider.TwitterTimelineProvider
 
 import scala.collection.JavaConversions._
@@ -22,28 +24,30 @@ class TimelineCollectorFlatMapFunction(
                                         streamsConfiguration : 
StreamsConfiguration,
                                         twitterConfiguration : 
TwitterTimelineProviderConfiguration,
                                         streamsFlinkConfiguration : 
StreamsFlinkConfiguration
-                                      ) extends 
RichFlatMapFunction[List[String], StreamsDatum] with Serializable {
-  var size : IntCounter = new IntCounter()
-  var counter : IntCounter = new IntCounter()
+                                      ) extends RichFlatMapFunction[String, 
Tweet] with Serializable {
+  var userids : IntCounter = new IntCounter()
+  var posts : IntCounter = new IntCounter()
   override def open(parameters: Configuration): Unit = {
-    
getRuntimeContext().addAccumulator("TimelineCollectorFlatMapFunction.size", 
this.size)
-    
getRuntimeContext().addAccumulator("TimelineCollectorFlatMapFunction.counter", 
this.counter)
+    
getRuntimeContext().addAccumulator("TimelineCollectorFlatMapFunction.userids", 
this.userids)
+    
getRuntimeContext().addAccumulator("TimelineCollectorFlatMapFunction.posts", 
this.posts)
   }
-  override def flatMap(input: List[String], out: Collector[StreamsDatum]): 
Unit = {
-    size.add(input.size)
+  override def flatMap(input: String, out: Collector[Tweet]): Unit = {
+    userids.add(input.size)
     collectPosts(input, out)
   }
-  def collectPosts(ids : List[String], out : Collector[StreamsDatum]) = {
+  def collectPosts(id : String, out : Collector[Tweet]) = {
     try {
-      val conf = 
twitterConfiguration.withInfo(ids).asInstanceOf[TwitterTimelineProviderConfiguration]
+      val conf = 
twitterConfiguration.withInfo(List(toProviderId(id))).asInstanceOf[TwitterTimelineProviderConfiguration]
       val twitProvider: TwitterTimelineProvider = new 
TwitterTimelineProvider(conf)
       twitProvider.prepare(conf)
       twitProvider.startStream()
       do {
         
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getProviderWaitMs, 
TimeUnit.MILLISECONDS)
-        val current = twitProvider.readCurrent().iterator().toList
-        counter.add(current.size)
-        current.map(out.collect(_))
+        val current : List[StreamsDatum] = 
twitProvider.readCurrent().iterator().toList
+        posts.add(current.size)
+        for( datum <- current ) {
+          out.collect(datum.getDocument().asInstanceOf[Tweet])
+        }
       } while( twitProvider.isRunning )
     } finally {
 
diff --git 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/UserInformationCollectorFlatMapFunction.scala
 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/UserInformationCollectorFlatMapFunction.scala
index 46e0d4a..41dde97 100644
--- 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/UserInformationCollectorFlatMapFunction.scala
+++ 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/UserInformationCollectorFlatMapFunction.scala
@@ -9,9 +9,11 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.util.Collector
 import org.apache.streams.config.StreamsConfiguration
 import org.apache.streams.core.StreamsDatum
+import org.apache.streams.examples.flink.FlinkBase.toProviderId
 import org.apache.streams.flink.StreamsFlinkConfiguration
 import org.apache.streams.twitter.config.TwitterUserInformationConfiguration
 import org.apache.streams.twitter.provider.TwitterUserInformationProvider
+import org.apache.streams.twitter.pojo.User
 
 import scala.collection.JavaConversions._
 
@@ -22,27 +24,29 @@ class UserInformationCollectorFlatMapFunction(
                                                streamsConfiguration : 
StreamsConfiguration,
                                                twitterConfiguration : 
TwitterUserInformationConfiguration,
                                                streamsFlinkConfiguration : 
StreamsFlinkConfiguration
-                                             ) extends 
RichFlatMapFunction[List[String], StreamsDatum] with Serializable {
-  var size : IntCounter = new IntCounter()
-  var counter : IntCounter = new IntCounter()
+                                             ) extends 
RichFlatMapFunction[List[String], User] with Serializable {
+  var ids : IntCounter = new IntCounter()
+  var users : IntCounter = new IntCounter()
   override def open(parameters: Configuration): Unit = {
-    
getRuntimeContext().addAccumulator("UserInformationCollectorFlatMapFunction.size",
 this.size)
-    
getRuntimeContext().addAccumulator("UserInformationCollectorFlatMapFunction.counter",
 this.counter)
+    
getRuntimeContext().addAccumulator("UserInformationCollectorFlatMapFunction.ids",
 this.ids)
+    
getRuntimeContext().addAccumulator("UserInformationCollectorFlatMapFunction.users",
 this.users)
   }
-  override def flatMap(input: List[String], out: Collector[StreamsDatum]): 
Unit = {
-    size.add(input.size)
+  override def flatMap(input: List[String], out: Collector[User]): Unit = {
+    ids.add(input.size)
     collectProfiles(input, out)
   }
-  def collectProfiles(ids : List[String], out : Collector[StreamsDatum]) = {
+  def collectProfiles(ids : List[String], out : Collector[User]) = {
     val conf = twitterConfiguration.withInfo(ids)
     val twitProvider: TwitterUserInformationProvider = new 
TwitterUserInformationProvider(conf)
     twitProvider.prepare(conf)
     twitProvider.startStream()
     do {
       
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getProviderWaitMs, 
TimeUnit.MILLISECONDS)
-      val current = twitProvider.readCurrent().iterator().toList
-      counter.add(current.size)
-      current.map(out.collect(_))
+      val current : List[StreamsDatum] = 
twitProvider.readCurrent().iterator().toList
+      users.add(current.size)
+      for( datum <- current ) {
+        out.collect(datum.getDocument().asInstanceOf[User])
+      }
     } while( twitProvider.isRunning )
   }
 }

Reply via email to