http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/project/BuildGearpump.scala ---------------------------------------------------------------------- diff --git a/project/BuildGearpump.scala b/project/BuildGearpump.scala new file mode 100644 index 0000000..f0c9517 --- /dev/null +++ b/project/BuildGearpump.scala @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.typesafe.sbt.SbtPgp.autoImport._ +import BuildExamples.examples +import BuildExperiments.experiments +import BuildExternals.externals +import BuildIntegrationTests.integrationTests +import BuildDashboard.services +import Dependencies._ +import Docs._ +import de.johoop.jacoco4sbt.JacocoPlugin.jacoco +import sbt.Keys._ +import sbt._ +import Pack.packProject +import sbtassembly.AssemblyPlugin.autoImport._ + +import xerial.sbt.Sonatype._ + +object BuildGearpump extends sbt.Build { + + val apacheRepo = "https://repository.apache.org/" + val distDirectory = "output" + val projectName = "gearpump" + + val commonSettings = Seq(jacoco.settings: _*) ++ sonatypeSettings ++ + Seq( + resolvers ++= Seq( + // https://repo1.maven.org/maven2 has been added by default + "apache-repo" at "https://repository.apache.org/content/repositories", + Resolver.sonatypeRepo("releases"), + "clojars" at "http://clojars.org/repo" + ) + ) ++ + Seq( + scalaVersion := scalaVersionNumber, + crossScalaVersions := crossScalaVersionNumbers, + organization := "org.apache.gearpump", + useGpg := false, + pgpSecretRing := file("./secring.asc"), + pgpPublicRing := file("./pubring.asc"), + scalacOptions ++= Seq("-Yclosure-elim", "-Yinline"), + publishMavenStyle := true, + + pgpPassphrase := Option(System.getenv().get("PASSPHRASE")).map(_.toArray), + credentials += Credentials( + "Sonatype Nexus Repository Manager", + "repository.apache.org", + System.getenv().get("SONATYPE_USERNAME"), + System.getenv().get("SONATYPE_PASSWORD")), + + pomIncludeRepository := { _ => false }, + + publishTo := { + if (isSnapshot.value) { + Some("snapshots" at apacheRepo + "content/repositories/snapshots") + } else { + Some("releases" at apacheRepo + "content/repositories/releases") + } + }, + + publishArtifact in Test := true, + + pomExtra := { + <url>https://github.com/apache/incubator-gearpump</url> + <licenses> + <license> + <name>Apache 2</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + </license> + </licenses> + <scm> + <connection>scm:git://git.apache.org/incubator-gearpump.git</connection> + <developerConnection>scm:git:[email protected]:apache/incubator-gearpump</developerConnection> + <url>github.com/apache/incubator-gearpump</url> + </scm> + <developers> + <developer> + <id>gearpump</id> + <name>Gearpump Team</name> + <url>http://gearpump.incubator.apache.org/community.html#who-we-are</url> + </developer> + </developers> + }, + + pomPostProcess := { + (node: xml.Node) => changeShadedDeps( + Set( + "org.scoverage", + "org.scala-lang" + ), + List.empty[xml.Node], + node) + }, + + cleanFiles += (baseDirectory.value / "examples" / "target") + ) + + val noPublish = Seq( + publish := {}, + publishLocal := {}, + publishArtifact := false, + publishArtifact in Test := false + ) + + lazy val myAssemblySettings = Seq( + test in assembly := {}, + assemblyOption in assembly ~= { + _.copy(includeScala = false) + }, + assemblyJarName in assembly := { + s"${name.value}_${scalaBinaryVersion.value}-${version.value}.jar" + }, + assemblyShadeRules in assembly := Seq( + ShadeRule.rename("com.romix.**" -> "org.apache.gearpump.@0").inAll, + ShadeRule.rename("com.esotericsoftware.**" -> + "org.apache.gearpump.@0").inAll, + ShadeRule.rename("org.objenesis.**" -> "org.apache.gearpump.@0").inAll, + ShadeRule.rename("com.google.common.**" -> "org.apache.gearpump.@0").inAll, + ShadeRule.rename("com.google.thirdparty.**" -> "org.apache.gearpump.@0").inAll, + ShadeRule.rename("com.codahale.metrics.**" -> + "org.apache.gearpump.@0").inAll, + ShadeRule.rename("com.gs.collections.**" -> + "org.apache.gearpump.gs.collections.@0").inAll + ), + target in assembly := baseDirectory.value / "target" / scalaBinaryVersion.value + ) + + lazy val aggregated: Seq[ProjectReference] = Seq[ProjectReference]( + core, + streaming, + services, + gearpumpHadoop, + packProject + ) ++ examples ++ experiments ++ externals ++ integrationTests + + lazy val root = Project( + id = "gearpump", + base = file("."), + settings = commonSettings ++ noPublish ++ gearpumpUnidocSetting, + aggregate = aggregated) + .settings(Defaults.itSettings: _*) + .disablePlugins(sbtassembly.AssemblyPlugin) + + lazy val core = Project( + id = "gearpump-core", + base = file("core"), + settings = commonSettings ++ myAssemblySettings ++ javadocSettings ++ coreDependencies ++ + addArtifact(Artifact("gearpump-core"), sbtassembly.AssemblyKeys.assembly) ++ Seq( + + assemblyOption in assembly ~= { + _.copy(includeScala = true) + }, + + pomPostProcess := { + (node: xml.Node) => changeShadedDeps( + Set( + "com.github.romix.akka", + "com.google.guava", + "com.codahale.metrics", + "org.scoverage" + ), List.empty[xml.Node], node) + } + )) + + lazy val streaming = Project( + id = "gearpump-streaming", + base = file("streaming"), + settings = commonSettings ++ myAssemblySettings ++ javadocSettings ++ + addArtifact(Artifact("gearpump-streaming"), sbtassembly.AssemblyKeys.assembly) ++ + Seq( + assemblyMergeStrategy in assembly := { + case "geardefault.conf" => + MergeStrategy.last + case x => + val oldStrategy = (assemblyMergeStrategy in assembly).value + oldStrategy(x) + }, + + libraryDependencies ++= Seq( + "com.goldmansachs" % "gs-collections" % gsCollectionsVersion + ), + + pomPostProcess := { + (node: xml.Node) => changeShadedDeps( + Set( + "com.goldmansachs", + "org.scala-lang", + "org.scoverage" + ), + List( + getShadedDepXML(organization.value, s"${core.id}_${scalaBinaryVersion.value}", + version.value, "provided")), + node) + } + ) + ).dependsOn(core % "test->test;provided") + + lazy val gearpumpHadoop = Project( + id = "gearpump-hadoop", + base = file("gearpump-hadoop"), + settings = commonSettings ++ noPublish ++ + Seq( + libraryDependencies ++= Seq( + "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion, + "org.apache.hadoop" % "hadoop-common" % hadoopVersion + ) + ) + ).dependsOn(core % "provided").disablePlugins(sbtassembly.AssemblyPlugin) + + private def changeShadedDeps(toExclude: Set[String], toInclude: List[xml.Node], + node: xml.Node): xml.Node = { + node match { + case elem: xml.Elem => + val child = + if (elem.label == "dependencies") { + elem.child.filterNot { dep => + dep.child.find(_.label == "groupId").exists(gid => toExclude.contains(gid.text)) + } ++ toInclude + } else { + elem.child.map(changeShadedDeps(toExclude, toInclude, _)) + } + xml.Elem(elem.prefix, elem.label, elem.attributes, elem.scope, false, child: _*) + case _ => + node + } + } + + private def getShadedDepXML(groupId: String, artifactId: String, + version: String, scope: String): scala.xml.Node = { + <dependency> + <groupId>{groupId}</groupId> + <artifactId>{artifactId}</artifactId> + <version>{version}</version> + <scope>{scope}</scope> + </dependency> + } +}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/project/BuildIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/project/BuildIntegrationTest.scala b/project/BuildIntegrationTest.scala deleted file mode 100644 index 6eed7a2..0000000 --- a/project/BuildIntegrationTest.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import sbt.Keys._ -import sbt._ -import Build._ -import sbtassembly.AssemblyPlugin.autoImport._ - -object BuildIntegrationTest extends sbt.Build { - - val jsonSimpleVersion = "1.1" - val storm09Version = "0.9.6" - - lazy val integration_test = Project( - id = "gearpump-integrationtest", - base = file("integrationtest"), - settings = commonSettings ++ noPublish - ).aggregate(it_core, it_storm09, it_storm010). - disablePlugins(sbtassembly.AssemblyPlugin) - - val itTestFilter: String => Boolean = { name => name endsWith "Suite" } - lazy val it_core = Project( - id = "gearpump-integrationtest-core", - base = file("integrationtest/core"), - settings = commonSettings ++ noPublish ++ Seq( - testOptions in IntegrationTest += Tests.Filter(itTestFilter), - libraryDependencies ++= Seq( - "com.lihaoyi" %% "upickle" % upickleVersion, - "org.scalatest" %% "scalatest" % scalaTestVersion % "it", - "org.pegdown" % "pegdown" % "1.4.2" % "it", - "org.parboiled" % "parboiled-core" % "1.1.7" % "it", - "org.parboiled" % "parboiled-java" % "1.1.7" % "it", - "org.mortbay.jetty" % "jetty-util" % "6.1.26" % "it", - "org.ow2.asm" % "asm-all" % "5.0.3" % "it" - ) - ) - ).configs(IntegrationTest).settings(Defaults.itSettings: _*) - .dependsOn( - streaming % "test->test; provided", - services % "test->test; provided", - external_kafka, - storm, - external_serializer - ).disablePlugins(sbtassembly.AssemblyPlugin) - - // Integration test for Storm 0.9.x - lazy val it_storm09 = Project( - id = "gearpump-integrationtest-storm09", - base = file("integrationtest/storm09"), - settings = commonSettings ++ noPublish ++ myAssemblySettings ++ - Seq( - libraryDependencies ++= Seq( - "org.apache.storm" % "storm-kafka" % storm09Version, - "org.apache.storm" % "storm-starter" % storm09Version, - "com.googlecode.json-simple" % "json-simple" % jsonSimpleVersion, - "org.apache.kafka" %% "kafka" % kafkaVersion - ), - target in assembly := baseDirectory.value.getParentFile / "target" / - CrossVersion.binaryScalaVersion(scalaVersion.value) - ) - ) dependsOn (storm % "provided") - - // Integration test for Storm 0.10.x - lazy val it_storm010 = Project( - id = "gearpump-integrationtest-storm010", - base = file("integrationtest/storm010"), - settings = commonSettings ++ noPublish ++ myAssemblySettings ++ - Seq( - libraryDependencies ++= Seq( - "org.apache.storm" % "storm-kafka" % stormVersion, - "org.apache.storm" % "storm-starter" % stormVersion, - "org.apache.kafka" %% "kafka" % kafkaVersion - ), - target in assembly := baseDirectory.value.getParentFile / "target" / - CrossVersion.binaryScalaVersion(scalaVersion.value) - ) - ) dependsOn (storm % "provided") -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/project/BuildIntegrationTests.scala ---------------------------------------------------------------------- diff --git a/project/BuildIntegrationTests.scala b/project/BuildIntegrationTests.scala new file mode 100644 index 0000000..161d906 --- /dev/null +++ b/project/BuildIntegrationTests.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import sbt.Keys._ +import sbt._ +import BuildGearpump._ +import BuildExperiments.storm +import BuildDashboard.services +import BuildExternals.{external_kafka, external_serializer} +import Dependencies._ +import sbtassembly.AssemblyPlugin.autoImport._ + +object BuildIntegrationTests extends sbt.Build { + + lazy val integrationTests: Seq[ProjectReference] = Seq( + it_core, + it_storm09, + it_storm010 + ) + + lazy val it_core = Project( + id = "gearpump-integrationtest-core", + base = file("integrationtest/core"), + settings = commonSettings ++ noPublish ++ Seq( + testOptions in IntegrationTest += Tests.Filter(_.endsWith("Suite")), + libraryDependencies ++= Seq( + "com.lihaoyi" %% "upickle" % upickleVersion, + "org.scalatest" %% "scalatest" % scalaTestVersion % "it", + "org.pegdown" % "pegdown" % "1.4.2" % "it", + "org.parboiled" % "parboiled-core" % "1.1.7" % "it", + "org.parboiled" % "parboiled-java" % "1.1.7" % "it", + "org.mortbay.jetty" % "jetty-util" % "6.1.26" % "it", + "org.ow2.asm" % "asm-all" % "5.0.3" % "it" + ) + ) + ).configs(IntegrationTest).settings(Defaults.itSettings: _*) + .dependsOn( + core % "provided", + streaming % "test->test; provided", + services % "test->test; provided", + external_kafka, + storm, + external_serializer + ).disablePlugins(sbtassembly.AssemblyPlugin) + + // Integration test for Storm 0.9.x + lazy val it_storm09 = Project( + id = "gearpump-integrationtest-storm09", + base = file("integrationtest/storm09"), + settings = commonSettings ++ noPublish ++ myAssemblySettings ++ + Seq( + libraryDependencies ++= Seq( + "org.apache.storm" % "storm-kafka" % storm09Version, + "org.apache.storm" % "storm-starter" % storm09Version, + "com.googlecode.json-simple" % "json-simple" % jsonSimpleVersion, + "org.apache.kafka" %% "kafka" % kafkaVersion + ), + target in assembly := baseDirectory.value.getParentFile / "target" / + CrossVersion.binaryScalaVersion(scalaVersion.value) + ) + ).dependsOn(core % "provided", storm % "provided") + + // Integration test for Storm 0.10.x + lazy val it_storm010 = Project( + id = "gearpump-integrationtest-storm010", + base = file("integrationtest/storm010"), + settings = commonSettings ++ noPublish ++ myAssemblySettings ++ + Seq( + libraryDependencies ++= Seq( + "org.apache.storm" % "storm-kafka" % stormVersion, + "org.apache.storm" % "storm-starter" % stormVersion, + "org.apache.kafka" %% "kafka" % kafkaVersion + ), + target in assembly := baseDirectory.value.getParentFile / "target" / + CrossVersion.binaryScalaVersion(scalaVersion.value) + ) + ).dependsOn(core % "provided", storm % "provided") +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/project/BuildShaded.scala ---------------------------------------------------------------------- diff --git a/project/BuildShaded.scala b/project/BuildShaded.scala deleted file mode 100644 index a43587c..0000000 --- a/project/BuildShaded.scala +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import sbt.Keys._ -import sbt._ -import sbtassembly.AssemblyPlugin.autoImport._ - -object BuildShaded extends sbt.Build { - - val guavaVersion = "16.0.1" - val codahaleVersion = "3.0.2" - val kryoVersion = "0.4.1" - val gsCollectionsVersion = "6.2.0" - private val scalaVersionMajor = "2.11" - - val shadeAssemblySettings = Build.commonSettings ++ Seq( - scalaVersion := Build.scalaVersionNumber, - test in assembly := {}, - assemblyOption in assembly ~= { - _.copy(includeScala = false) - }, - assemblyJarName in assembly := { - s"${name.value}_$scalaVersionMajor-${version.value}.jar" - }, - target in assembly := baseDirectory.value.getParentFile / "target" / scalaVersionMajor - ) - - val shaded = Project( - id = "gearpump-shaded", - base = file("shaded") - ).aggregate(shaded_akka_kryo, shaded_gs_collections, shaded_guava, shaded_metrics_graphite) - .disablePlugins(sbtassembly.AssemblyPlugin) - - lazy val shaded_akka_kryo = Project( - id = "gearpump-shaded-akka-kryo", - base = file("shaded/akka-kryo"), - settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-akka-kryo"), - sbtassembly.AssemblyKeys.assembly) ++ - Seq( - assemblyShadeRules in assembly := Seq( - ShadeRule.zap("com.google.protobuf.**").inAll, - ShadeRule.zap("com.typesafe.config.**").inAll, - ShadeRule.zap("akka.**").inAll, - ShadeRule.zap("org.jboss.netty.**").inAll, - ShadeRule.zap("net.jpountz.lz4.**").inAll, - ShadeRule.zap("org.uncommons.maths.**").inAll, - ShadeRule.rename("com.romix.**" -> "org.apache.gearpump.romix.@1").inAll, - ShadeRule.rename("com.esotericsoftware.**" -> - "org.apache.gearpump.esotericsoftware.@1").inAll, - ShadeRule.rename("org.objenesis.**" -> "org.apache.gearpump.objenesis.@1").inAll - ) - ) ++ - Seq( - libraryDependencies ++= Seq( - "com.github.romix.akka" %% "akka-kryo-serialization" % kryoVersion - ) - ) - ) - - lazy val shaded_gs_collections = Project( - id = "gearpump-shaded-gs-collections", - base = file("shaded/gs-collections"), - settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-gs-collections"), - sbtassembly.AssemblyKeys.assembly) ++ - Seq( - assemblyShadeRules in assembly := Seq( - ShadeRule.rename("com.gs.collections.**" -> - "org.apache.gearpump.gs.collections.@1").inAll - ) - ) ++ - Seq( - libraryDependencies ++= Seq( - "com.goldmansachs" % "gs-collections" % gsCollectionsVersion - ) - ) - ) - - lazy val shaded_guava = Project( - id = "gearpump-shaded-guava", - base = file("shaded/guava"), - settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-guava"), - sbtassembly.AssemblyKeys.assembly) ++ - Seq( - assemblyShadeRules in assembly := Seq( - ShadeRule.rename("com.google.**" -> "org.apache.gearpump.google.@1").inAll - ) - ) ++ - Seq( - libraryDependencies ++= Seq( - "com.google.guava" % "guava" % guavaVersion - ) - ) - ) - - lazy val shaded_metrics_graphite = Project( - id = "gearpump-shaded-metrics-graphite", - base = file("shaded/metrics-graphite"), - settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-metrics-graphite"), - sbtassembly.AssemblyKeys.assembly) ++ - Seq( - assemblyShadeRules in assembly := Seq( - ShadeRule.rename("com.codahale.metrics.**" -> - "org.apache.gearpump.codahale.metrics.@1").inAll - ) - ) ++ - Seq( - libraryDependencies ++= Seq( - "com.codahale.metrics" % "metrics-graphite" % codahaleVersion, - "com.codahale.metrics" % "metrics-jvm" % codahaleVersion - ) - ) - ) - - def getShadedJarFile(name: String, gearpumpVersion: String): File = { - shaded.base / "target" / scalaVersionMajor / - s"${name}_$scalaVersionMajor-$gearpumpVersion.jar" - } - - def getShadedDepXML(groupId: String, artifactId: String, version: String): scala.xml.Node = { - <dependency> - <groupId>{groupId}</groupId> - <artifactId>{artifactId}</artifactId> - <version>{version}</version> - </dependency> - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/project/Dependencies.scala ---------------------------------------------------------------------- diff --git a/project/Dependencies.scala b/project/Dependencies.scala new file mode 100644 index 0000000..4e30d3f --- /dev/null +++ b/project/Dependencies.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import sbt._ +import Keys._ + +object Dependencies { + + val crossScalaVersionNumbers = Seq("2.11.8") + val scalaVersionNumber = crossScalaVersionNumbers.last + val akkaVersion = "2.4.16" + val akkaHttpVersion = "10.0.1" + val hadoopVersion = "2.6.0" + val hbaseVersion = "1.0.0" + val commonsHttpVersion = "3.1" + val commonsLoggingVersion = "1.1.3" + val commonsLangVersion = "2.6" + val commonsIOVersion = "2.4" + val dataReplicationVersion = "0.7" + val upickleVersion = "0.3.4" + val junitVersion = "4.12" + val kafkaVersion = "0.8.2.1" + val jsonSimpleVersion = "1.1" + val storm09Version = "0.9.6" + val stormVersion = "0.10.0" + val slf4jVersion = "1.7.16" + val guavaVersion = "16.0.1" + val codahaleVersion = "3.0.2" + val kryoVersion = "0.4.1" + val gsCollectionsVersion = "6.2.0" + val sprayVersion = "1.3.2" + val sprayJsonVersion = "1.3.1" + val scalaTestVersion = "2.2.0" + val scalaCheckVersion = "1.11.3" + val mockitoVersion = "1.10.17" + val bijectionVersion = "0.8.0" + val scalazVersion = "7.1.1" + val algebirdVersion = "0.9.0" + val chillVersion = "0.6.0" + val jedisVersion = "2.9.0" + + val coreDependencies = Seq( + libraryDependencies ++= Seq( + "org.slf4j" % "slf4j-api" % slf4jVersion, + "org.slf4j" % "slf4j-log4j12" % slf4jVersion, + "commons-lang" % "commons-lang" % commonsLangVersion, + + /** + * Overrides Netty version 3.10.3.Final used by Akka 2.4.2 to work-around netty hang issue + * (https://github.com/gearpump/gearpump/issues/2020) + * + * Akka 2.4.2 by default use Netty 3.10.3.Final, which has a serious issue which can hang + * the network. The same issue also happens in version range (3.10.0.Final, 3.10.5.Final) + * Netty 3.10.6.Final have this issue fixed, however, we find there is a 20% performance + * drop. So we decided to downgrade netty to 3.8.0.Final (Same version used in akka 2.3.12). + * + * @see https://github.com/gearpump/gearpump/pull/2017 for more discussions. + */ + "io.netty" % "netty" % "3.8.0.Final", + "com.typesafe.akka" %% "akka-remote" % akkaVersion + exclude("io.netty", "netty"), + + "com.typesafe.akka" %% "akka-cluster" % akkaVersion, + "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion, + "commons-logging" % "commons-logging" % commonsLoggingVersion, + "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion, + "com.typesafe.akka" %% "akka-actor" % akkaVersion, + "com.typesafe.akka" %% "akka-agent" % akkaVersion, + "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, + "com.typesafe.akka" %% "akka-kernel" % akkaVersion, + "com.typesafe.akka" %% "akka-http" % akkaHttpVersion, + "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion, + "org.scala-lang" % "scala-reflect" % scalaVersionNumber, + "com.github.romix.akka" %% "akka-kryo-serialization" % kryoVersion, + "com.google.guava" % "guava" % guavaVersion, + "com.codahale.metrics" % "metrics-graphite" % codahaleVersion + exclude("org.slf4j", "slf4j-api"), + "com.codahale.metrics" % "metrics-jvm" % codahaleVersion + exclude("org.slf4j", "slf4j-api"), + "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test", + "org.scalatest" %% "scalatest" % scalaTestVersion % "test", + "org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test", + "org.mockito" % "mockito-core" % mockitoVersion % "test", + "junit" % "junit" % junitVersion % "test" + ) + ) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/project/Docs.scala ---------------------------------------------------------------------- diff --git a/project/Docs.scala b/project/Docs.scala new file mode 100644 index 0000000..f8d433e --- /dev/null +++ b/project/Docs.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import BuildGearpump.{core, streaming} +import BuildExternals.externals +import sbt.Keys._ +import sbt._ +import sbtunidoc.Plugin.UnidocKeys._ +import sbtunidoc.Plugin._ + +object Docs extends sbt.Build { + lazy val javadocSettings = Seq( + addCompilerPlugin( + "com.typesafe.genjavadoc" %% "genjavadoc-plugin" % "0.9" cross CrossVersion.full), + scalacOptions += s"-P:genjavadoc:out=${target.value}/java" + ) + + lazy val gearpumpUnidocSetting = scalaJavaUnidocSettings ++ Seq( + unidocProjectFilter in(ScalaUnidoc, unidoc) := projectsWithDoc, + unidocProjectFilter in(JavaUnidoc, unidoc) := projectsWithDoc, + + unidocAllSources in(ScalaUnidoc, unidoc) := { + ignoreUndocumentedPackages((unidocAllSources in(ScalaUnidoc, unidoc)).value) + }, + + // Skip class names containing $ and some internal packages in Javadocs + unidocAllSources in(JavaUnidoc, unidoc) := { + ignoreUndocumentedPackages((unidocAllSources in(JavaUnidoc, unidoc)).value) + } + ) + + private lazy val projectsWithDoc = { + val projects: Seq[ProjectReference] = Seq[ProjectReference]( + core, + streaming + ) ++ externals + + inProjects(projects: _*) + } + + private def ignoreUndocumentedPackages(packages: Seq[Seq[File]]): Seq[Seq[File]] = { + packages + .map(_.filterNot(_.getName.contains("$"))) + .map(_.filterNot(_.getCanonicalPath.contains("akka"))) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/project/Pack.scala ---------------------------------------------------------------------- diff --git a/project/Pack.scala b/project/Pack.scala index 1c87653..2eb0ca7 100644 --- a/project/Pack.scala +++ b/project/Pack.scala @@ -18,13 +18,14 @@ import sbt.Keys._ import sbt._ -import Build._ +import BuildGearpump._ +import BuildDashboard.services +import BuildExperiments.{cgroup, storm, yarn} import xerial.sbt.Pack._ object Pack extends sbt.Build { val daemonClassPath = Seq( "${PROG_HOME}/conf", - "${PROG_HOME}/lib/daemon/*", // This is for DFSJarStore "${PROG_HOME}/lib/yarn/*" ) @@ -37,14 +38,12 @@ object Pack extends sbt.Build { val serviceClassPath = Seq( "${PROG_HOME}/conf", - "${PROG_HOME}/lib/daemon/*", "${PROG_HOME}/lib/services/*", "${PROG_HOME}/dashboard" ) val yarnClassPath = Seq( "${PROG_HOME}/conf", - "${PROG_HOME}/lib/daemon/*", "${PROG_HOME}/lib/services/*", "${PROG_HOME}/lib/yarn/*", "${PROG_HOME}/conf/yarnconf", @@ -72,9 +71,14 @@ object Pack extends sbt.Build { "storm" -> "org.apache.gearpump.experiments.storm.StormRunner" ), packJvmOpts := Map( - "gear" -> Seq("-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}"), + "gear" -> Seq( + "-noverify", + "-Djava.net.preferIPv4Stack=true", + "-Dgearpump.home=${PROG_HOME}"), + "local" -> Seq( "-server", + "-noverify", "-Djava.net.preferIPv4Stack=true", "-DlogFilename=local", "-Dgearpump.home=${PROG_HOME}", @@ -82,6 +86,7 @@ object Pack extends sbt.Build { "master" -> Seq( "-server", + "-noverify", "-Djava.net.preferIPv4Stack=true", "-DlogFilename=master", "-Dgearpump.home=${PROG_HOME}", @@ -89,6 +94,7 @@ object Pack extends sbt.Build { "worker" -> Seq( "-server", + "-noverify", "-Djava.net.preferIPv4Stack=true", "-DlogFilename=worker", "-Dgearpump.home=${PROG_HOME}", @@ -96,26 +102,28 @@ object Pack extends sbt.Build { "services" -> Seq( "-server", + "-noverify", "-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}", "-Djava.rmi.server.hostname=localhost"), "yarnclient" -> Seq( "-server", + "-noverify", "-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}", "-Djava.rmi.server.hostname=localhost"), "storm" -> Seq( "-server", + "-noverify", "-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}") ), packLibDir := Map( - "lib" -> new ProjectsToPack(core.id, streaming.id), - "lib/daemon" -> new ProjectsToPack(daemon.id, cgroup.id).exclude(core.id, streaming.id), - "lib/yarn" -> new ProjectsToPack(yarn.id).exclude(services.id, daemon.id), - "lib/services" -> new ProjectsToPack(services.id).exclude(daemon.id), + "lib/yarn" -> new ProjectsToPack(gearpumpHadoop.id, yarn.id). + exclude(services.id, core.id), + "lib/services" -> new ProjectsToPack(services.id).exclude(core.id), "lib/storm" -> new ProjectsToPack(storm.id).exclude(streaming.id) ), packExclude := Seq(thisProjectRef.value.project), @@ -123,7 +131,9 @@ object Pack extends sbt.Build { packResourceDir += (baseDirectory.value / ".." / "bin" -> "bin"), packResourceDir += (baseDirectory.value / ".." / "conf" -> "conf"), packResourceDir += (baseDirectory.value / ".." / "yarnconf" -> "conf/yarnconf"), - packResourceDir += (baseDirectory.value / ".." / "shaded" / "target" / + packResourceDir += (baseDirectory.value / ".." / "core" / "target" / + CrossVersion.binaryScalaVersion(scalaVersion.value) -> "lib"), + packResourceDir += (baseDirectory.value / ".." / "streaming" / "target" / CrossVersion.binaryScalaVersion(scalaVersion.value) -> "lib"), packResourceDir += (baseDirectory.value / ".." / "services" / "dashboard" -> "dashboard"), packResourceDir += (baseDirectory.value / ".." / "examples" / "target" / @@ -138,7 +148,7 @@ object Pack extends sbt.Build { "gear" -> applicationClassPath, "local" -> daemonClassPath, "master" -> daemonClassPath, - "worker" -> daemonClassPath, + "worker" -> applicationClassPath, "services" -> serviceClassPath, "yarnclient" -> yarnClassPath, "storm" -> stormClassPath @@ -148,6 +158,6 @@ object Pack extends sbt.Build { packArchiveExcludes := Seq("integrationtest") ) - ).dependsOn(core, streaming, services, yarn, storm). + ).dependsOn(core, streaming, services, yarn, storm, cgroup). disablePlugins(sbtassembly.AssemblyPlugin) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/project/build.properties ---------------------------------------------------------------------- diff --git a/project/build.properties b/project/build.properties index c4df008..8df00d1 100644 --- a/project/build.properties +++ b/project/build.properties @@ -16,4 +16,4 @@ # limitations under the License. # -sbt.version=0.13.11 +sbt.version=0.13.13 http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/project/license.sbt ---------------------------------------------------------------------- diff --git a/project/license.sbt b/project/license.sbt deleted file mode 100644 index 3452eed..0000000 --- a/project/license.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("com.typesafe.sbt" % "sbt-license-report" % "1.2.0") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/project/plugins.sbt ---------------------------------------------------------------------- diff --git a/project/plugins.sbt b/project/plugins.sbt index 5a9740e..0a6a562 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -16,8 +16,7 @@ * limitations under the License. */ -resolvers += Resolver.url("fvunicorn", - url("http://dl.bintray.com/fvunicorn/sbt-plugins"))(Resolver.ivyStylePatterns) +resolvers += Resolver.bintrayIvyRepo("fvunicorn", "sbt-plugins") resolvers += Classpaths.sbtPluginReleases @@ -25,7 +24,7 @@ addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.8") addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3") -addSbtPlugin("io.gearpump.sbt" % "sbt-pack" % "0.7.6") +addSbtPlugin("io.gearpump.sbt" % "sbt-pack" % "0.7.7") addSbtPlugin("de.johoop" % "jacoco4sbt" % "2.1.6") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/scalastyle-config.xml ---------------------------------------------------------------------- diff --git a/scalastyle-config.xml b/scalastyle-config.xml new file mode 100644 index 0000000..1b0a838 --- /dev/null +++ b/scalastyle-config.xml @@ -0,0 +1,240 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<!-- + +If you wish to turn off checking for a section of code, you can put a comment in the source +before and after the section, with the following syntax: + + // scalastyle:off + ... // stuff that breaks the styles + // scalastyle:on + +You can also disable only one rule, by specifying its rule id, as specified in: + http://www.scalastyle.org/rules-0.8.0.html + + // scalastyle:off no.finalize + override def finalize(): Unit = ... + // scalastyle:on no.finalize + +This file is divided into 3 sections: + (1) rules that we enforce. + (2) rules that we would like to enforce, but haven't cleaned up the codebase to turn on yet + (or we need to make the scalastyle rule more configurable). + (3) rules that we don't want to enforce. +--> + +<scalastyle> + <name>Scalastyle standard configuration</name> + + <check level="error" class="org.scalastyle.file.FileTabChecker" enabled="true"></check> + + <check level="error" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true"> + <parameters> + <parameter name="header"><![CDATA[/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */]]></parameter> + </parameters> + </check> + + <check level="error" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" + enabled="true"></check> + + <check level="error" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" + enabled="true"></check> + + <check level="error" class="org.scalastyle.file.FileLineLengthChecker" enabled="true"> + <parameters> + <parameter name="maxLineLength"><![CDATA[100]]></parameter> + <parameter name="tabSize"><![CDATA[2]]></parameter> + <parameter name="ignoreImports">true</parameter> + </parameters> + </check> + + <check level="error" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true"> + <parameters> + <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter> + </parameters> + </check> + + <check level="error" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true"> + <parameters> + <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter> + </parameters> + </check> + + <check level="error" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true"> + <parameters> + <parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter> + </parameters> + </check> + + <check level="error" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true"> + <parameters> + <parameter name="maxParameters"><![CDATA[10]]></parameter> + </parameters> + </check> + + <check level="error" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check> + + <check level="error" class="org.scalastyle.scalariform.CovariantEqualsChecker" + enabled="true"></check> + + <check level="error" class="org.scalastyle.scalariform.StructuralTypeChecker" + enabled="true"></check> + + <check level="error" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check> + + <check level="error" class="org.scalastyle.scalariform.IfBraceChecker" enabled="true"> + <parameters> + <parameter name="singleLineAllowed"><![CDATA[true]]></parameter> + <parameter name="doubleLineAllowed"><![CDATA[true]]></parameter> + </parameters> + </check> + + <check level="error" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" + enabled="true"></check> + + <check level="error" class="org.scalastyle.scalariform.NonASCIICharacterChecker" + enabled="true"></check> + + <check level="error" class="org.scalastyle.scalariform.SpaceAfterCommentStartChecker" + enabled="true"></check> + + <check level="error" class="org.scalastyle.scalariform.EnsureSingleSpaceBeforeTokenChecker" + enabled="true"> + <parameters> + <parameter name="tokens">ARROW, EQUALS, ELSE, TRY, CATCH, FINALLY, LARROW, RARROW</parameter> + </parameters> + </check> + + <check level="error" class="org.scalastyle.scalariform.EnsureSingleSpaceAfterTokenChecker" + enabled="true"> + <parameters> + <parameter name="tokens">ARROW, EQUALS, COMMA, COLON, IF, ELSE, DO, WHILE, FOR, MATCH, TRY, + CATCH, FINALLY, LARROW, RARROW + </parameter> + </parameters> + </check> + + <!-- ??? usually shouldn't be checked into the code base. --> + <check level="error" class="org.scalastyle.scalariform.NotImplementedErrorUsage" + enabled="true"></check> + + <check customId="println" level="error" class="org.scalastyle.scalariform.TokenChecker" + enabled="true"> + <parameters> + <parameter name="regex">^println$</parameter> + </parameters> + <customMessage><![CDATA[Are you sure you want to println? If yes, wrap the code block with + // scalastyle:off println + println(...) + // scalastyle:on println]]></customMessage> + </check> + + <check customId="runtimeaddshutdownhook" level="error" class="org.scalastyle.file.RegexChecker" + enabled="true"> + <parameters> + <parameter name="regex">Runtime\.getRuntime\.addShutdownHook</parameter> + </parameters> + <customMessage><![CDATA[ + Are you sure that you want to use Runtime.getRuntime.addShutdownHook? In most cases, you should use + ShutdownHookManager.addShutdownHook instead. + If you must use Runtime.getRuntime.addShutdownHook, wrap the code block with + // scalastyle:off runtimeaddshutdownhook + Runtime.getRuntime.addShutdownHook(...) + // scalastyle:on runtimeaddshutdownhook + ]]></customMessage> + </check> + + <check customId="mutablesynchronizedbuffer" level="error" class="org.scalastyle.file.RegexChecker" + enabled="true"> + <parameters> + <parameter name="regex">mutable\.SynchronizedBuffer</parameter> + </parameters> + <customMessage><![CDATA[ + Are you sure that you want to use mutable.SynchronizedBuffer? In most cases, you should use + java.util.concurrent.ConcurrentLinkedQueue instead. + If you must use mutable.SynchronizedBuffer, wrap the code block with + // scalastyle:off mutablesynchronizedbuffer + mutable.SynchronizedBuffer[...] + // scalastyle:on mutablesynchronizedbuffer + ]]></customMessage> + </check> + + <check customId="javaconversions" level="error" class="org.scalastyle.scalariform.TokenChecker" + enabled="true"> + <parameters> + <parameter name="regex">JavaConversions</parameter> + </parameters> + <customMessage>Instead of importing implicits in scala.collection.JavaConversions._, import + scala.collection.JavaConverters._ and use .asScala / .asJava methods + </customMessage> + </check> + + <check level="error" class="org.scalastyle.scalariform.DisallowSpaceBeforeTokenChecker" + enabled="true"> + <parameters> + <parameter name="tokens">COMMA</parameter> + </parameters> + </check> + + <!-- Should add single Space between ')' and '{' --> + <check customId="SingleSpaceBetweenRParenAndLCurlyBrace" level="error" + class="org.scalastyle.file.RegexChecker" enabled="true"> + <parameters> + <parameter name="regex">\)\{</parameter> + </parameters> + <customMessage><![CDATA[ + Single Space between ')' and `{`. + ]]></customMessage> + </check> + + <check level="error" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="false"> + <parameters> + <parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter> + </parameters> + </check> + + <check level="error" class="org.scalastyle.scalariform.EqualsHashCodeChecker" + enabled="true"></check> + + <check level="error" class="org.scalastyle.file.IndentationChecker" enabled="true"> + <parameters> + <parameter name="tabSize">2</parameter> + <parameter name="methodParamIndentSize">4</parameter> + </parameters> + </check> + + <!-- Don't allow return --> + <check level="error" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"></check> + +</scalastyle> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala index 46e16cf..53ee692 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala @@ -196,14 +196,6 @@ class AppMasterService(val master: ActorRef, } ~ pathEnd { delete { - val writer = (result: ShutdownApplicationResult) => { - val output = if (result.appId.isSuccess) { - Map("status" -> "success", "info" -> null) - } else { - Map("status" -> "fail", "info" -> result.appId.failed.get.toString) - } - write(output) - } onComplete(askActor[ShutdownApplicationResult](master, ShutdownApplication(appId))) { case Success(result) => val output = if (result.appId.isSuccess) { @@ -223,4 +215,4 @@ class AppMasterService(val master: ActorRef, object AppMasterService { case class Status(success: Boolean, reason: String = null) -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala index a763be6..be96577 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala @@ -41,7 +41,7 @@ import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.worker.WorkerSummary import org.apache.gearpump.cluster.{ClusterConfig, UserConfig} import org.apache.gearpump.jarstore.{JarStoreClient, FileDirective, JarStoreServer} -import org.apache.gearpump.partitioner.{PartitionerByClassName, PartitionerDescription} +import org.apache.gearpump.streaming.partitioner.{PartitionerByClassName, PartitionerDescription} import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest} // NOTE: This cannot be removed!!! import org.apache.gearpump.services.util.UpickleUtil._ @@ -159,7 +159,7 @@ class MasterService(val master: ActorRef, entity(as[String]) { request => val msg = java.net.URLDecoder.decode(request, "UTF-8") val submitApplicationRequest = read[SubmitApplicationRequest](msg) - import submitApplicationRequest.{appName, dag, processors, userconfig} + import submitApplicationRequest.{appName, dag, processors, userConfig} val context = ClientContext(system.settings.config, system, master) val graph = dag.mapVertex { processorId => @@ -168,8 +168,8 @@ class MasterService(val master: ActorRef, PartitionerDescription(new PartitionerByClassName(edge)) } - val effectiveConfig = if (userconfig == null) UserConfig.empty else userconfig - val appId = context.submit(new StreamApplication(appName, effectiveConfig, graph)) + val effectiveConfig = if (userConfig == null) UserConfig.empty else userConfig + val appId = context.submit(new StreamApplication(appName, effectiveConfig, graph)).appId import upickle.default.write val submitApplicationResultValue = SubmitApplicationResultValue(appId) @@ -192,7 +192,8 @@ class MasterService(val master: ActorRef, } ~ path("partitioners") { get { - complete(write(BuiltinPartitioners(Constants.BUILTIN_PARTITIONERS.map(_.getName)))) + complete(write(BuiltinPartitioners(org.apache.gearpump.streaming.Constants + .BUILTIN_PARTITIONERS.map(_.getName)))) } } } @@ -344,5 +345,5 @@ object MasterService { appName: String, processors: Map[ProcessorId, ProcessorDescription], dag: Graph[Int, String], - userconfig: UserConfig) + userConfig: UserConfig) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala index 8ae8dbe..4989364 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala @@ -61,7 +61,7 @@ class SecurityService(inner: RouteService, implicit val system: ActorSystem) ext // Use scheme "GearpumpBasic" to avoid popping up web browser native authentication box. private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = Some("gearpump"), - params = Map.empty[String,String]) + params = Map.empty[String, String]) val LOG = LogUtil.getLogger(getClass, "AUDIT") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Graph.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Graph.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Graph.java index 8f85aa3..aaf6db8 100644 --- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Graph.java +++ b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Graph.java @@ -18,7 +18,7 @@ package org.apache.gearpump.streaming.javaapi; -import org.apache.gearpump.partitioner.Partitioner; +import org.apache.gearpump.streaming.partitioner.Partitioner; import org.apache.gearpump.streaming.Processor; import org.apache.gearpump.streaming.task.Task; http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java deleted file mode 100644 index f07ceff..0000000 --- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.javaapi.dsl.functions; - -import java.io.Serializable; - -/** - * Filter function - * - * @param <T> Message of type T - */ -public interface FilterFunction<T> extends Serializable { - boolean apply(T t); -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java deleted file mode 100644 index 9788dd2..0000000 --- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.javaapi.dsl.functions; - -import java.io.Serializable; -import java.util.Iterator; - -/** - * Function that converts a value of type T to a iterator of values of type R. - * - * @param <T> Input value type - * @param <R> Return value type - */ -public interface FlatMapFunction<T, R> extends Serializable { - Iterator<R> apply(T t); -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java deleted file mode 100644 index 6c71280..0000000 --- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.javaapi.dsl.functions; - -import java.io.Serializable; - -/** - * GroupBy function which assign value of type T to groups - * - * @param <T> Input value type - * @param <Group> Group Type - */ -public interface GroupByFunction<T, Group> extends Serializable { - Group apply(T t); -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java deleted file mode 100644 index e1fc821..0000000 --- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.javaapi.dsl.functions; - -import java.io.Serializable; - -/** - * Function that map a value of type T to value of type R - * - * @param <T> Input value type - * @param <R> Output value type - */ -public interface MapFunction<T, R> extends Serializable { - R apply(T t); -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java deleted file mode 100644 index 2bcac60..0000000 --- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.javaapi.dsl.functions; - -import java.io.Serializable; - -/** - * Function that applies reduce operation - * - * @param <T> Input value type - */ -public interface ReduceFunction<T> extends Serializable { - T apply(T t1, T t2); -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala index f99a436..d7582b0 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala @@ -17,6 +17,8 @@ */ package org.apache.gearpump.streaming +import org.apache.gearpump.streaming.partitioner._ + object Constants { val GEARPUMP_STREAMING_OPERATOR = "gearpump.streaming.dsl.operator" @@ -36,4 +38,12 @@ object Constants { val GEARPUMP_STREAMING_EXECUTOR_RESTART_TIME_WINDOW = "gearpump.streaming.executor-restart-time-window" + + // The partitioners provided by Gearpump + val BUILTIN_PARTITIONERS = Array( + classOf[BroadcastPartitioner], + classOf[CoLocationPartitioner], + classOf[HashPartitioner], + classOf[ShuffleGroupingPartitioner], + classOf[ShufflePartitioner]) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala index 4a94ad3..8ad74f8 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.streaming -import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.partitioner.PartitionerDescription import org.apache.gearpump.streaming.task.TaskId import org.apache.gearpump.util.Graph http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala index 66ec873..d4b3719 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala @@ -25,7 +25,7 @@ import akka.actor.ActorSystem import org.apache.gearpump.TimeStamp import org.apache.gearpump.cluster._ -import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription, PartitionerObject} +import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner, PartitionerDescription, PartitionerObject} import org.apache.gearpump.streaming.appmaster.AppMaster import org.apache.gearpump.streaming.task.Task import org.apache.gearpump.util.{Graph, LogUtil, ReferenceEqual} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala index 31e1151..1341464 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala @@ -30,7 +30,7 @@ import org.apache.gearpump.cluster._ import org.apache.gearpump.cluster.worker.WorkerId import org.apache.gearpump.metrics.Metrics.ReportMetrics import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService} -import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.partitioner.PartitionerDescription import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterExecutor, RegisterTask, UnRegisterTask} import org.apache.gearpump.streaming._ import org.apache.gearpump.streaming.appmaster.AppMaster._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala index 68db354..2085953 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala @@ -23,7 +23,7 @@ import java.util.Date import java.util.concurrent.TimeUnit import akka.actor.{Actor, Cancellable, Stash} -import org.apache.gearpump.google.common.primitives.Longs +import com.google.common.primitives.Longs import org.apache.gearpump.TimeStamp import org.apache.gearpump.cluster.ClientToMaster.GetStallingTasks import org.apache.gearpump.streaming.AppMasterToMaster.StallingTasks http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala index 2736f5e..6154946 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala @@ -21,7 +21,7 @@ package org.apache.gearpump.streaming.appmaster import akka.actor.{ExtendedActorSystem, Actor, ActorRef, Stash} import akka.serialization.JavaSerializer import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.partitioner.PartitionerDescription import org.apache.gearpump.streaming._ import org.apache.gearpump.streaming.appmaster.DagManager._ import org.apache.gearpump.streaming.storage.AppDataStore http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala index 6de5306..e023cdf 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala @@ -24,7 +24,7 @@ import org.apache.gearpump.TimeStamp import org.apache.gearpump.cluster.AppJar import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest} import org.apache.gearpump.cluster.worker.WorkerId -import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.partitioner.PartitionerDescription import org.apache.gearpump.streaming.appmaster.JarScheduler._ import org.apache.gearpump.streaming.task.TaskId import org.apache.gearpump.streaming.{DAG, ProcessorDescription} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala deleted file mode 100644 index 440a45e..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.dsl - -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.dsl.plan._ -import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction} -import org.apache.gearpump.streaming.dsl.window.api._ -import org.apache.gearpump.streaming.dsl.window.impl._ -import org.apache.gearpump.streaming.sink.DataSink -import org.apache.gearpump.streaming.task.{Task, TaskContext} -import org.apache.gearpump.util.Graph -import org.slf4j.{Logger, LoggerFactory} - -import scala.language.implicitConversions - -class Stream[T]( - private val graph: Graph[Op, OpEdge], private val thisNode: Op, - private val edge: Option[OpEdge] = None) { - - /** - * converts a value[T] to a list of value[R] - * - * @param fn FlatMap function - * @param description The description message for this operation - * @return A new stream with type [R] - */ - def flatMap[R](fn: T => TraversableOnce[R], description: String = "flatMap"): Stream[R] = { - val flatMapOp = ChainableOp(new FlatMapFunction[T, R](fn, description)) - graph.addVertex(flatMapOp) - graph.addEdge(thisNode, edge.getOrElse(Direct), flatMapOp) - new Stream[R](graph, flatMapOp) - } - - /** - * Maps message of type T message of type R - * - * @param fn Function - * @return A new stream with type [R] - */ - def map[R](fn: T => R, description: String = "map"): Stream[R] = { - this.flatMap({ data => - Option(fn(data)) - }, description) - } - - /** - * Keeps records when fun(T) == true - * - * @param fn the filter - * @return a new stream after filter - */ - def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = { - this.flatMap({ data => - if (fn(data)) Option(data) else None - }, description) - } - - /** - * Reduces operations. - * - * @param fn reduction function - * @param description description message for this operator - * @return a new stream after reduction - */ - def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = { - val reduceOp = ChainableOp(new ReduceFunction(fn, description)) - graph.addVertex(reduceOp) - graph.addEdge(thisNode, edge.getOrElse(Direct), reduceOp) - new Stream(graph, reduceOp) - } - - /** - * Log to task log file - */ - def log(): Unit = { - this.map(msg => { - LoggerFactory.getLogger("dsl").info(msg.toString) - msg - }, "log") - } - - /** - * Merges data from two stream into one - * - * @param other the other stream - * @return the merged stream - */ - def merge(other: Stream[T], description: String = "merge"): Stream[T] = { - val mergeOp = MergeOp(description, UserConfig.empty) - graph.addVertex(mergeOp) - graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp) - graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp) - new Stream[T](graph, mergeOp) - } - - /** - * Group by function (T => Group) - * - * For example, we have T type, People(name: String, gender: String, age: Int) - * groupBy[People](_.gender) will group the people by gender. - * - * You can append other combinators after groupBy - * - * For example, - * {{{ - * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..) - * }}} - * - * @param fn Group by function - * @param parallelism Parallelism level - * @param description The description - * @return the grouped stream - */ - def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, - description: String = "groupBy"): Stream[T] = { - window(CountWindow.apply(1).accumulating) - .groupBy[GROUP](fn, parallelism, description) - } - - /** - * Window function - * - * @param win window definition - * @param description window description - * @return [[WindowStream]] where groupBy could be applied - */ - def window(win: Window, description: String = "window"): WindowStream[T] = { - new WindowStream[T](graph, edge, thisNode, win, description) - } - - /** - * Connects with a low level Processor(TaskDescription) - * - * @param processor a user defined processor - * @param parallelism parallelism level - * @return new stream after processing with type [R] - */ - def process[R]( - processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty, - description: String = "process"): Stream[R] = { - val processorOp = ProcessorOp(processor, parallelism, conf, description) - graph.addVertex(processorOp) - graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp) - new Stream[R](graph, processorOp, Some(Shuffle)) - } -} - -class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], thisNode: Op, - window: Window, winDesc: String) { - - def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, - description: String = "groupBy"): Stream[T] = { - val groupBy: GroupByFn[T, (GROUP, List[Bucket])] = GroupAlsoByWindow(fn, window) - val groupOp = GroupByOp[T, (GROUP, List[Bucket])](groupBy, parallelism, - s"$winDesc.$description") - graph.addVertex(groupOp) - graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp) - new Stream[T](graph, groupOp) - } -} - -class KVStream[K, V](stream: Stream[Tuple2[K, V]]) { - /** - * GroupBy key - * - * Applies to Stream[Tuple2[K,V]] - * - * @param parallelism the parallelism for this operation - * @return the new KV stream - */ - def groupByKey(parallelism: Int = 1): Stream[Tuple2[K, V]] = { - stream.groupBy(Stream.getTupleKey[K, V], parallelism, "groupByKey") - } - - /** - * Sum the value of the tuples - * - * Apply to Stream[Tuple2[K,V]], V must be of type Number - * - * For input (key, value1), (key, value2), will generate (key, value1 + value2) - * @param numeric the numeric operations - * @return the sum stream - */ - def sum(implicit numeric: Numeric[V]): Stream[(K, V)] = { - stream.reduce(Stream.sumByKey[K, V](numeric), "sum") - } -} - -object Stream { - - def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge]): Stream[T] = { - new Stream[T](graph, node, edge) - } - - def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1 - - def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] - = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2)) - - implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): KVStream[K, V] = { - new KVStream(stream) - } - - implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable { - def sink(dataSink: DataSink, parallelism: Int = 1, - conf: UserConfig = UserConfig.empty, description: String = "sink"): Stream[T] = { - implicit val sink = DataSinkOp(dataSink, parallelism, conf, description) - stream.graph.addVertex(sink) - stream.graph.addEdge(stream.thisNode, Shuffle, sink) - new Stream[T](stream.graph, sink) - } - } -} - -class LoggerSink[T] extends DataSink { - var logger: Logger = _ - - override def open(context: TaskContext): Unit = { - this.logger = context.logger - } - - override def write(message: Message): Unit = { - logger.info("logging message " + message.msg) - } - - override def close(): Unit = Unit -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala deleted file mode 100644 index 8116146..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.dsl - -import java.time.Instant - -import akka.actor.ActorSystem -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.streaming.StreamApplication -import org.apache.gearpump.streaming.dsl.plan._ -import org.apache.gearpump.streaming.source.DataSource -import org.apache.gearpump.streaming.task.TaskContext -import org.apache.gearpump.util.Graph -import org.apache.gearpump.Message - -import scala.language.implicitConversions - -/** - * Example: - * {{{ - * val data = "This is a good start, bingo!! bingo!!" - * app.fromCollection(data.lines.toList). - * // word => (word, count) - * flatMap(line => line.split("[\\s]+")).map((_, 1)). - * // (word, count1), (word, count2) => (word, count1 + count2) - * groupBy(kv => kv._1).reduce(sum(_, _)) - * - * val appId = context.submit(app) - * context.close() - * }}} - * - * @param name name of app - */ -class StreamApp( - name: String, system: ActorSystem, userConfig: UserConfig, - private val graph: Graph[Op, OpEdge]) { - - def this(name: String, system: ActorSystem, userConfig: UserConfig) = { - this(name, system, userConfig, Graph.empty[Op, OpEdge]) - } - - def plan(): StreamApplication = { - implicit val actorSystem = system - val planner = new Planner - val dag = planner.plan(graph) - StreamApplication(name, dag, userConfig) - } -} - -object StreamApp { - def apply(name: String, context: ClientContext, userConfig: UserConfig = UserConfig.empty) - : StreamApp = { - new StreamApp(name, context.system, userConfig) - } - - implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication = { - streamApp.plan() - } - - implicit class Source(app: StreamApp) extends java.io.Serializable { - - def source[T](dataSource: DataSource, parallelism: Int = 1, - conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = { - implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, description) - app.graph.addVertex(sourceOp) - new Stream[T](app.graph, sourceOp) - } - - def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = { - this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description) - } - } -} - -/** A test message source which generated message sequence repeatedly. */ -class CollectionDataSource[T](seq: Seq[T]) extends DataSource { - private lazy val iterator: Iterator[T] = seq.iterator - - override def open(context: TaskContext, startTime: Instant): Unit = {} - - override def read(): Message = { - if (iterator.hasNext) { - Message(iterator.next(), Instant.now().toEpochMilli) - } else { - null - } - } - - override def close(): Unit = {} - - override def getWatermark: Instant = Instant.now() -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala new file mode 100644 index 0000000..e4e7309 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.api.functions + +import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction + +object FilterFunction { + + def apply[T](fn: T => Boolean): FilterFunction[T] = { + new FilterFunction[T] { + override def apply(t: T): Boolean = { + fn(t) + } + } + } +} + +/** + * Returns true to keep the input and false otherwise. + * + * @param T Input value type + */ +abstract class FilterFunction[T] extends SerializableFunction { + + def apply(t: T): Boolean + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala new file mode 100644 index 0000000..70fe9d4 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.api.functions + +import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction + +object MapFunction { + + def apply[T, R](fn: T => R): MapFunction[T, R] = { + new MapFunction[T, R] { + override def apply(t: T): R = { + fn(t) + } + } + } +} + +/** + * Transforms an input into an output of possibly different types. + * + * @param T Input value type + * @param R Output value type + */ +abstract class MapFunction[T, R] extends SerializableFunction { + + def apply(t: T): R + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala new file mode 100644 index 0000000..25b12be --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.api.functions + +import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction + +object ReduceFunction { + + def apply[T](fn: (T, T) => T): ReduceFunction[T] = { + new ReduceFunction[T] { + override def apply(t1: T, t2: T): T = { + fn(t1, t2) + } + } + } +} + +/** + * Combines two inputs into one output of the same type. + * + * @param T Type of both inputs and output + */ +abstract class ReduceFunction[T] extends SerializableFunction { + + def apply(t1: T, t2: T): T + +}
