This is an automated email from the ASF dual-hosted git repository. sblackmon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/streams.git
commit 132504b7f22ced3312c1cb7fc050a678a1693597 Author: sblackmon <sblack...@apache.org> AuthorDate: Sun Aug 9 06:57:02 2020 -0500 use newer flink file sink --- .../apache/streams/examples/flink/FlinkBase.scala | 10 +++++++++ .../collection/FlinkTwitterFollowingPipeline.scala | 4 +++- .../collection/FlinkTwitterPostsPipeline.scala | 3 ++- .../collection/FlinkTwitterSpritzerPipeline.scala | 6 ++++-- .../FlinkTwitterUserInformationPipeline.scala | 4 +++- .../src/test/resources/testng.xml | 2 +- .../FlinkTwitterFollowingPipelineFollowersIT.scala | 14 ++++++++----- .../FlinkTwitterFollowingPipelineFriendsIT.scala | 14 ++++++++----- .../twitter/test/FlinkTwitterPostsPipelineIT.scala | 22 ++++++++++++-------- .../test/FlinkTwitterSpritzerPipelineIT.scala | 24 ++++++++++++++-------- .../FlinkTwitterUserInformationPipelineIT.scala | 22 ++++++++++++-------- 11 files changed, 84 insertions(+), 41 deletions(-) diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala index 3acca05..86e99ab 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala @@ -19,12 +19,15 @@ package org.apache.streams.examples.flink import java.net.MalformedURLException +import java.util.concurrent.TimeUnit import com.typesafe.config.Config import org.apache.commons.lang3.StringUtils import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.streaming.api.CheckpointingMode +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator} import org.apache.streams.flink.{FlinkBatchConfiguration, FlinkStreamingConfiguration, StreamsFlinkConfiguration} @@ -48,6 +51,13 @@ trait FlinkBase { var executionEnvironment: ExecutionEnvironment = _ var streamExecutionEnvironment: StreamExecutionEnvironment = _ + final val basePathBucketAssigner : BasePathBucketAssigner[String] = new BasePathBucketAssigner() + final val rollingPolicy: DefaultRollingPolicy[String, String] = DefaultRollingPolicy.builder() + .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) + .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) + .withMaxPartSize(1024 * 1024 * 1024) + .build() + /* Basic stuff for every flink job */ diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala index 6f20216..96a7739 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala @@ -114,6 +114,7 @@ object FlinkTwitterFollowingPipeline extends FlinkBase { class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguration = new StreamsConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectCustomConfiguration()) extends Runnable with java.io.Serializable { import FlinkTwitterFollowingPipeline._ + import FlinkTwitterFollowingPipeline.rollingPolicy override def run(): Unit = { @@ -159,7 +160,8 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio val fileSink : StreamingFileSink[String] = StreamingFileSink. forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")). - build() + withRollingPolicy(rollingPolicy). + withBucketAssigner(basePathBucketAssigner).build(); if( config.getTest == true ) { keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE) diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala index ed45a9f..aad7f32 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala @@ -162,7 +162,8 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new val fileSink : StreamingFileSink[String] = StreamingFileSink. forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")). - build() + withRollingPolicy(rollingPolicy). + withBucketAssigner(basePathBucketAssigner).build(); if( config.getTest == true ) { keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE) diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala index 6235672..0a3de4f 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala @@ -35,8 +35,8 @@ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.functions.source.RichSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction -import org.apache.flink.streaming.api.scala.KeyedStream import org.apache.flink.streaming.api.scala.DataStream +import org.apache.flink.streaming.api.scala.KeyedStream import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.streams.config.ComponentConfigurator import org.apache.streams.config.StreamsConfigurator @@ -136,7 +136,9 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration val fileSink : StreamingFileSink[String] = StreamingFileSink. forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")). - build() + withRollingPolicy(rollingPolicy). + withBucketAssigner(basePathBucketAssigner).build(); + if( config.getTest == true ) { keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE) diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala index 21bc911..080ce57 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala @@ -165,7 +165,9 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline val fileSink : StreamingFileSink[String] = StreamingFileSink. forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")). - build() + withRollingPolicy(rollingPolicy). + withBucketAssigner(basePathBucketAssigner).build(); + if( config.getTest == true ) { keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE) diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml index bdb250d..6c11e30 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml @@ -19,7 +19,7 @@ ~ under the License. --> -<suite name="ExampleFlinkITs" preserve-order="true"> +<suite name="ExampleFlinkITs" allow-return-values="true" preserve-order="true"> <test name="FlinkTwitterUserInformationPipelineIT"> <classes> diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala index bfc6940..22ab957 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala @@ -19,16 +19,20 @@ package org.apache.streams.examples.flink.twitter.test import java.io.File -import java.nio.file.{Files, Paths} +import java.nio.file.Files +import java.nio.file.Paths -import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions} -import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator} +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import com.typesafe.config.ConfigParseOptions +import org.apache.streams.config.StreamsConfigurator import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline -import org.scalatest.FlatSpec +import org.scalatest.Assertions._ import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ -import org.slf4j.{Logger, LoggerFactory} +import org.slf4j.Logger +import org.slf4j.LoggerFactory import org.testng.annotations.Test import scala.io.Source diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala index 6d52d44..e71dbaf 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala @@ -19,16 +19,20 @@ package org.apache.streams.examples.flink.twitter.test import java.io.File -import java.nio.file.{Files, Paths} +import java.nio.file.Files +import java.nio.file.Paths -import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions} -import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator} +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import com.typesafe.config.ConfigParseOptions +import org.apache.streams.config.StreamsConfigurator import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline -import org.scalatest.FlatSpec import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ -import org.slf4j.{Logger, LoggerFactory} +import org.scalatest.Assertions._ +import org.slf4j.Logger +import org.slf4j.LoggerFactory import org.testng.annotations.Test import scala.io.Source diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala index a43c758..c061d43 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala @@ -19,16 +19,20 @@ package org.apache.streams.examples.flink.twitter.test import java.io.File -import java.nio.file.{Files, Paths} +import java.nio.file.Files +import java.nio.file.Paths -import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions} -import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator} +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import com.typesafe.config.ConfigParseOptions +import org.apache.streams.config.StreamsConfigurator import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline -import org.scalatest.FlatSpec +import org.scalatest.Assertions._ import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ -import org.slf4j.{Logger, LoggerFactory} +import org.slf4j.Logger +import org.slf4j.LoggerFactory import org.testng.annotations.Test import scala.io.Source @@ -63,9 +67,11 @@ class FlinkTwitterPostsPipelineIT { eventually (timeout(30 seconds), interval(1 seconds)) { assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath))) - assert( - Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size - >= 200) + val lines = Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.toList + assert(lines.size > 200) + lines foreach { + line => assert( line.contains("created_at") ) + } } } diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala index 8d45a66..00e32e6 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala @@ -19,16 +19,20 @@ package org.apache.streams.examples.flink.twitter.test import java.io.File -import java.nio.file.{Files, Paths} +import java.nio.file.Files +import java.nio.file.Paths -import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions} -import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator} +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import com.typesafe.config.ConfigParseOptions +import org.apache.streams.config.StreamsConfigurator import org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline -import org.scalatest.FlatSpec +import org.scalatest.Assertions._ import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ -import org.slf4j.{Logger, LoggerFactory} +import org.slf4j.Logger +import org.slf4j.LoggerFactory import org.testng.annotations.Test import scala.io.Source @@ -36,7 +40,7 @@ import scala.io.Source /** * FlinkTwitterSpritzerPipelineIT is an integration test for FlinkTwitterSpritzerPipeline. */ -class FlinkTwitterSpritzerPipelineIT extends FlatSpec { +class FlinkTwitterSpritzerPipelineIT { private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterSpritzerPipelineIT]) @@ -64,9 +68,11 @@ class FlinkTwitterSpritzerPipelineIT extends FlatSpec { eventually (timeout(60 seconds), interval(1 seconds)) { assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath))) - assert( - Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size - >= 10) + val lines = Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.toList + assert(lines.size > 10) + lines foreach { + line => assert( line.contains("created_at") ) + } } } diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala index 48b876a..fc582e1 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala @@ -19,16 +19,20 @@ package org.apache.streams.examples.flink.twitter.test import java.io.File -import java.nio.file.{Files, Paths} +import java.nio.file.Files +import java.nio.file.Paths -import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions} -import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator} +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import com.typesafe.config.ConfigParseOptions +import org.apache.streams.config.StreamsConfigurator import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline -import org.scalatest.FlatSpec +import org.scalatest.Assertions._ import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ -import org.slf4j.{Logger, LoggerFactory} +import org.slf4j.Logger +import org.slf4j.LoggerFactory import org.testng.annotations.Test import scala.io.Source @@ -63,9 +67,11 @@ class FlinkTwitterUserInformationPipelineIT { eventually (timeout(30 seconds), interval(1 seconds)) { assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath))) - assert( - Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size - > 500) + val lines = Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.toList + assert(lines.size > 500) + lines foreach { + line => assert( line.contains("created_at") ) + } } }