http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/package.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/package.scala deleted file mode 100644 index f12e7a0..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/package.scala +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.scala - -import com.google.common.base.Preconditions -import org.apache.flink.api.expressions.{Row, ExpressionOperation} -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.streaming.api.scala.DataStream - -import scala.language.implicitConversions - -/** - * == Language Integrated Queries (aka Expression Operations) == - * - * Importing this package with: - * - * {{{ - * import org.apache.flink.api.scala.expressions._ - * }}} - * - * imports implicit conversions for converting a [[DataSet]] or [[DataStream]] to an - * [[ExpressionOperation]]. This can be used to perform SQL-like queries on data. Please have - * a look at [[ExpressionOperation]] to see which operations are supported and - * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]] to see how an - * expression can be specified. - * - * Inside an expression operation you can use Scala Symbols to refer to field names. One would - * refer to field `a` by writing `'a`. Sometimes it is necessary to manually confert a - * Scala literal to an Expression Literal, in those cases use `Literal`, as in `Literal(3)`. - * - * Example: - * - * {{{ - * import org.apache.flink.api.scala._ - * import org.apache.flink.api.scala.expressions._ - * - * val env = ExecutionEnvironment.getExecutionEnvironment - * val input = env.fromElements(("Hello", 2), ("Hello", 5), ("Ciao", 3)) - * val result = input.as('word, 'count).groupBy('word).select('word, 'count.avg) - * result.print() - * - * env.execute() - * }}} - * - * The result of an [[ExpressionOperation]] can be converted back to the underlying API - * representation using `as`: - * - * {{{ - * case class Word(word: String, count: Int) - * - * val result = in.select(...).as('word, 'count) - * val set = result.as[Word] - * }}} - */ -package object expressions extends ImplicitExpressionConversions { - - implicit def dataSet2DataSetConversions[T](set: DataSet[T]): DataSetConversions[T] = { - new DataSetConversions[T](set, set.getType.asInstanceOf[CompositeType[T]]) - } - - implicit def expressionOperation2RowDataSet( - expressionOperation: ExpressionOperation[ScalaBatchTranslator]): DataSet[Row] = { - expressionOperation.as[Row] - } - - implicit def rowDataSet2ExpressionOperation( - rowDataSet: DataSet[Row]): ExpressionOperation[ScalaBatchTranslator] = { - rowDataSet.toExpression - } - - implicit def dataStream2DataSetConversions[T]( - stream: DataStream[T]): DataStreamConversions[T] = { - new DataStreamConversions[T]( - stream, - stream.getJavaStream.getType.asInstanceOf[CompositeType[T]]) - } - - implicit def expressionOperation2RowDataStream( - expressionOperation: ExpressionOperation[ScalaStreamingTranslator]): DataStream[Row] = { - expressionOperation.as[Row] - } - - implicit def rowDataStream2ExpressionOperation( - rowDataStream: DataStream[Row]): ExpressionOperation[ScalaStreamingTranslator] = { - rowDataStream.toExpression - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala deleted file mode 100644 index dadfe09..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.examples.scala - -import org.apache.flink.api.expressions.tree.Literal -import org.apache.flink.api.common.functions.GroupReduceFunction -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.expressions._ -import org.apache.flink.examples.java.graph.util.PageRankData -import org.apache.flink.util.Collector - -import _root_.scala.collection.JavaConverters._ - -/** -* A basic implementation of the Page Rank algorithm using a bulk iteration. -* -* This implementation requires a set of pages and a set of directed links as input and works as -* follows. -* -* In each iteration, the rank of every page is evenly distributed to all pages it points to. Each -* page collects the partial ranks of all pages that point to it, sums them up, and applies a -* dampening factor to the sum. The result is the new rank of the page. A new iteration is started -* with the new ranks of all pages. This implementation terminates after a fixed number of -* iterations. This is the Wikipedia entry for the -* [[http://en.wikipedia.org/wiki/Page_rank Page Rank algorithm]] -* -* Input files are plain text files and must be formatted as follows: -* -* - Pages represented as an (long) ID separated by new-line characters. -* For example `"1\n2\n12\n42\n63\n"` gives five pages with IDs 1, 2, 12, 42, and 63. -* - Links are represented as pairs of page IDs which are separated by space characters. Links -* are separated by new-line characters. -* For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (directed) links (1)->(2), (2)->(12), -* (1)->(12), and (42)->(63). For this simple implementation it is required that each page has -* at least one incoming and one outgoing link (a page can point to itself). -* -* Usage: -* {{{ -* PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations> -* }}} -* -* If no parameters are provided, the program is run with default data from -* [[org.apache.flink.examples.java.graph.util.PageRankData]] and 10 iterations. -* -* This example shows how to use: -* -* - Bulk Iterations -* - Expression Operations -*/ -object PageRankExpression { - - private final val DAMPENING_FACTOR: Double = 0.85 - private final val EPSILON: Double = 0.0001 - - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - // set up execution environment - val env = ExecutionEnvironment.getExecutionEnvironment - - // read input data - val pagesWithRanks = getPagesDataSet(env).map { p => (p, 1.0 / numPages) } - .as('pageId, 'rank) - - val links = getLinksDataSet(env) - - // build adjacency list from link input - val adjacencyLists = links - .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, AdjacencyList] { - - override def reduce( - values: _root_.java.lang.Iterable[Link], - out: Collector[AdjacencyList]): Unit = { - var outputId = -1L - val outputList = values.asScala map { t => outputId = t.sourceId; t.targetId } - out.collect(new AdjacencyList(outputId, outputList.toArray)) - } - - }).as('sourceId, 'targetIds) - - // start iteration - val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) { - currentRanks => - val newRanks = currentRanks.toExpression - // distribute ranks to target pages - .join(adjacencyLists).where('pageId === 'sourceId) - .select('rank, 'targetIds).as[RankOutput] - .flatMap { - (in, out: Collector[(Long, Double)]) => - val targets = in.targetIds - val len = targets.length - targets foreach { t => out.collect((t, in.rank / len )) } - } - .as('pageId, 'rank) - // collect ranks and sum them up - .groupBy('pageId).select('pageId, 'rank.sum as 'rank) - // apply dampening factor - .select( - 'pageId, - ('rank * DAMPENING_FACTOR) + (Literal(1) - DAMPENING_FACTOR) / numPages as 'rank) - - - val termination = currentRanks.toExpression - .as('curId, 'curRank).join(newRanks.as('newId, 'newRank)) - .where('curId === 'newId && ('curRank - 'newRank).abs > EPSILON) - - (newRanks, termination) - } - - val result = finalRanks - - // emit result - if (fileOutput) { - result.writeAsCsv(outputPath, "\n", " ") - } else { - result.print() - } - - // execute program - env.execute("Expression PageRank Example") - } - - // ************************************************************************* - // USER TYPES - // ************************************************************************* - - case class Link(sourceId: Long, targetId: Long) - - case class Page(pageId: Long, rank: Double) - - case class AdjacencyList(sourceId: Long, targetIds: Array[Long]) - - case class RankOutput(rank: Double, targetIds: Array[Long]) - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private def parseParameters(args: Array[String]): Boolean = { - if (args.length > 0) { - fileOutput = true - if (args.length == 5) { - pagesInputPath = args(0) - linksInputPath = args(1) - outputPath = args(2) - numPages = args(3).toLong - maxIterations = args(4).toInt - } else { - System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num " + - "pages> <num iterations>") - false - } - } else { - System.out.println("Executing PageRank Basic example with default parameters and built-in " + - "default data.") - System.out.println(" Provide parameters to read input data from files.") - System.out.println(" See the documentation for the correct format of input files.") - System.out.println(" Usage: PageRankBasic <pages path> <links path> <output path> <num " + - "pages> <num iterations>") - - numPages = PageRankData.getNumberOfPages - } - true - } - - private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = { - if (fileOutput) { - env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = " ", lineDelimiter = "\n") - .map(x => x._1) - } else { - env.generateSequence(1, 15) - } - } - - private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = { - if (fileOutput) { - env.readCsvFile[Link](linksInputPath, fieldDelimiter = " ", - includedFields = Array(0, 1)) - } else { - val edges = PageRankData.EDGES.map { case Array(v1, v2) => Link(v1.asInstanceOf[Long], - v2.asInstanceOf[Long])} - env.fromCollection(edges) - } - } - - private var fileOutput: Boolean = false - private var pagesInputPath: String = null - private var linksInputPath: String = null - private var outputPath: String = null - private var numPages: Double = 0 - private var maxIterations: Int = 10 - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala deleted file mode 100644 index 2d1d0ec..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.examples.scala - -import org.apache.flink.streaming.api.scala._ - -import org.apache.flink.api.scala.expressions._ - -import scala.Stream._ -import scala.math._ -import scala.language.postfixOps -import scala.util.Random - -/** - * Simple example for demonstrating the working streaming api expression operations. - */ -object StreamingExpressionFilter { - - case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long) extends Serializable - - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - val cars = genCarStream().toExpression - .filter('carId === 0) - .select('carId, 'speed, 'distance + 1000 as 'distance, 'time % 5 as 'time) - .as[CarEvent] - - cars.print() - - StreamExecutionEnvironment.getExecutionEnvironment.execute("TopSpeedWindowing") - - } - - def genCarStream(): DataStream[CarEvent] = { - - def nextSpeed(carEvent : CarEvent) : CarEvent = - { - val next = - if (Random.nextBoolean()) min(100, carEvent.speed + 5) else max(0, carEvent.speed - 5) - CarEvent(carEvent.carId, next, carEvent.distance + next/3.6d,System.currentTimeMillis) - } - def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] = - { - Thread.sleep(1000) - speeds.append(carStream(speeds.map(nextSpeed))) - } - carStream(range(0, numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis()))) - } - - def parseParameters(args: Array[String]): Boolean = { - if (args.length > 0) { - if (args.length == 3) { - numOfCars = args(0).toInt - evictionSec = args(1).toInt - triggerMeters = args(2).toDouble - true - } - else { - System.err.println("Usage: TopSpeedWindowing <numCars> <evictSec> <triggerMeters>") - false - } - }else{ - true - } - } - - var numOfCars = 2 - var evictionSec = 10 - var triggerMeters = 50d - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala deleted file mode 100644 index 1803bdd..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.examples.scala - -import org.apache.flink.api.expressions.tree.Literal -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.expressions._ - -/** - * This program implements a modified version of the TPC-H query 3. The - * example demonstrates how to assign names to fields by extending the Tuple class. - * The original query can be found at - * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf) - * (page 29). - * - * This program implements the following SQL equivalent: - * - * {{{ - * SELECT - * l_orderkey, - * SUM(l_extendedprice*(1-l_discount)) AS revenue, - * o_orderdate, - * o_shippriority - * FROM customer, - * orders, - * lineitem - * WHERE - * c_mktsegment = '[SEGMENT]' - * AND c_custkey = o_custkey - * AND l_orderkey = o_orderkey - * AND o_orderdate < date '[DATE]' - * AND l_shipdate > date '[DATE]' - * GROUP BY - * l_orderkey, - * o_orderdate, - * o_shippriority; - * }}} - * - * Compared to the original TPC-H query this version does not sort the result by revenue - * and orderdate. - * - * Input files are plain text CSV files using the pipe character ('|') as field separator - * as generated by the TPC-H data generator which is available at - * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/). - * - * Usage: - * {{{ - * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv path> <result path> - * }}} - * - * This example shows how to use: - * - Expression Operations - * - */ -object TPCHQuery3Expression { - - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - // set filter date - val dateFormat = new _root_.java.text.SimpleDateFormat("yyyy-MM-dd") - val date = dateFormat.parse("1995-03-12") - - // get execution environment - val env = ExecutionEnvironment.getExecutionEnvironment - - val lineitems = getLineitemDataSet(env) - .filter( l => dateFormat.parse(l.shipDate).after(date) ) - .as('id, 'extdPrice, 'discount, 'shipDate) - - val customers = getCustomerDataSet(env) - .as('id, 'mktSegment) - .filter( 'mktSegment === "AUTOMOBILE" ) - - val orders = getOrdersDataSet(env) - .filter( o => dateFormat.parse(o.orderDate).before(date) ) - .as('orderId, 'custId, 'orderDate, 'shipPrio) - - val items = - orders.join(customers) - .where('custId === 'id) - .select('orderId, 'orderDate, 'shipPrio) - .join(lineitems) - .where('orderId === 'id) - .select( - 'orderId, - 'extdPrice * (Literal(1.0f) - 'discount) as 'revenue, - 'orderDate, - 'shipPrio) - - val result = items - .groupBy('orderId, 'orderDate, 'shipPrio) - .select('orderId, 'revenue.sum, 'orderDate, 'shipPrio) - - // emit result - result.writeAsCsv(outputPath, "\n", "|") - - // execute program - env.execute("Scala TPCH Query 3 (Expression) Example") - } - - // ************************************************************************* - // USER DATA TYPES - // ************************************************************************* - - case class Lineitem(id: Long, extdPrice: Double, discount: Double, shipDate: String) - case class Customer(id: Long, mktSegment: String) - case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long) - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private var lineitemPath: String = null - private var customerPath: String = null - private var ordersPath: String = null - private var outputPath: String = null - - private def parseParameters(args: Array[String]): Boolean = { - if (args.length == 4) { - lineitemPath = args(0) - customerPath = args(1) - ordersPath = args(2) - outputPath = args(3) - true - } else { - System.err.println("This program expects data from the TPC-H benchmark as input data.\n" + - " Due to legal restrictions, we can not ship generated data.\n" + - " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + - " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" + - "<orders-csv path> <result path>"); - false - } - } - - private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = { - env.readCsvFile[Lineitem]( - lineitemPath, - fieldDelimiter = "|", - includedFields = Array(0, 5, 6, 10) ) - } - - private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = { - env.readCsvFile[Customer]( - customerPath, - fieldDelimiter = "|", - includedFields = Array(0, 6) ) - } - - private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = { - env.readCsvFile[Order]( - ordersPath, - fieldDelimiter = "|", - includedFields = Array(0, 1, 4, 7) ) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AggregationsITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AggregationsITCase.java deleted file mode 100644 index 76c7fed..0000000 --- a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AggregationsITCase.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java.expressions.test; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import org.apache.flink.api.expressions.ExpressionException; -import org.apache.flink.api.expressions.ExpressionOperation; -import org.apache.flink.api.expressions.Row; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.expressions.ExpressionUtil; -import org.apache.flink.api.java.operators.DataSource; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple7; -import org.apache.flink.api.scala.expressions.JavaBatchTranslator; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class AggregationsITCase extends MultipleProgramsTestBase { - - - public AggregationsITCase(TestExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expected = ""; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expected, resultPath); - } - - @Test - public void testAggregationTypes() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env)); - - ExpressionOperation<JavaBatchTranslator> result = - expressionOperation.select("f0.sum, f0.min, f0.max, f0.count, f0.avg"); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "231,1,21,21,11"; - } - - @Test(expected = ExpressionException.class) - public void testAggregationOnNonExistingField() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env)); - - ExpressionOperation<JavaBatchTranslator> result = - expressionOperation.select("'foo.avg"); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = ""; - } - - @Test - public void testWorkingAggregationDataTypes() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input = - env.fromElements( - new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"), - new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Ciao")); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(input); - - ExpressionOperation<JavaBatchTranslator> result = - expressionOperation.select("f0.avg, f1.avg, f2.avg, f3.avg, f4.avg, f5.avg, f6.count"); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "1,1,1,1,1.5,1.5,2"; - } - - @Test - public void testAggregationWithArithmetic() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSource<Tuple2<Float, String>> input = - env.fromElements( - new Tuple2<Float, String>(1f, "Hello"), - new Tuple2<Float, String>(2f, "Ciao")); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(input); - - ExpressionOperation<JavaBatchTranslator> result = - expressionOperation.select("(f0 + 2).avg + 2, f1.count + \" THE COUNT\""); - - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "5.5,2 THE COUNT"; - } - - @Test(expected = ExpressionException.class) - public void testNonWorkingDataTypes() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<Float, String>(1f, - "Hello")); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(input); - - ExpressionOperation<JavaBatchTranslator> result = - expressionOperation.select("f1.sum"); - - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = ""; - } - - @Test(expected = ExpressionException.class) - public void testNoNestedAggregation() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<Float, String>(1f, "Hello")); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(input); - - ExpressionOperation<JavaBatchTranslator> result = - expressionOperation.select("f0.sum.sum"); - - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = ""; - } - -} - http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AsITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AsITCase.java deleted file mode 100644 index 3b69be0..0000000 --- a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AsITCase.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.expressions.test; - -import org.apache.flink.api.expressions.ExpressionException; -import org.apache.flink.api.expressions.ExpressionOperation; -import org.apache.flink.api.expressions.Row; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.expressions.ExpressionUtil; -import org.apache.flink.api.scala.expressions.JavaBatchTranslator; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class AsITCase extends MultipleProgramsTestBase { - - - public AsITCase(TestExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expected = ""; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expected, resultPath); - } - - @Test - public void testAs() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a, b, c"); - - DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; - } - - @Test(expected = ExpressionException.class) - public void testAsWithToFewFields() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a, b"); - - DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = ""; - } - - @Test(expected = ExpressionException.class) - public void testAsWithToManyFields() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d"); - - DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = ""; - } - - @Test(expected = ExpressionException.class) - public void testAsWithAmbiguousFields() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a, b, b"); - - DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = ""; - } - - @Test(expected = ExpressionException.class) - public void testAsWithNonFieldReference1() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c"); - - DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = ""; - } - - @Test(expected = ExpressionException.class) - public void testAsWithNonFieldReference2() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a as foo, b, c"); - - DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = ""; - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/CastingITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/CastingITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/CastingITCase.java deleted file mode 100644 index b4d1159..0000000 --- a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/CastingITCase.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.expressions.test; - -import org.apache.flink.api.expressions.ExpressionOperation; -import org.apache.flink.api.expressions.Row; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.expressions.ExpressionUtil; -import org.apache.flink.api.java.operators.DataSource; -import org.apache.flink.api.java.tuple.Tuple7; -import org.apache.flink.api.scala.expressions.JavaBatchTranslator; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class CastingITCase extends MultipleProgramsTestBase { - - - public CastingITCase(TestExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expected = ""; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expected, resultPath); - } - - @Test - public void testAutoCastToString() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input = - env.fromElements(new Tuple7<Byte, Short, Integer, Long, Float, Double, String>( - (byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello")); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(input); - - ExpressionOperation<JavaBatchTranslator> result = expressionOperation.select( - "f0 + 'b', f1 + 's', f2 + 'i', f3 + 'L', f4 + 'f', f5 + \"d\""); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "1b,1s,1i,1L,1.0f,1.0d"; - } - - @Test - public void testNumericAutocastInArithmetic() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input = - env.fromElements(new Tuple7<Byte, Short, Integer, Long, Float, Double, String>( - (byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello")); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(input); - - ExpressionOperation<JavaBatchTranslator> result = expressionOperation.select("f0 + 1, f1 +" + - " 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1"); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "2,2,2,2.0,2.0,2.0"; - } - - @Test - public void testNumericAutocastInComparison() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input = - env.fromElements( - new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"), - new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Hello")); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(input, "a,b,c,d,e,f,g"); - - ExpressionOperation<JavaBatchTranslator> result = expressionOperation - .filter("a > 1 && b > 1 && c > 1L && d > 1.0f && e > 1.0d && f > 1"); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "2,2,2,2,2.0,2.0,Hello"; - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/ExpressionsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/ExpressionsITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/ExpressionsITCase.java deleted file mode 100644 index 5c3a92a..0000000 --- a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/ExpressionsITCase.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.expressions.test; - -import org.apache.flink.api.expressions.ExpressionException; -import org.apache.flink.api.expressions.ExpressionOperation; -import org.apache.flink.api.expressions.Row; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.expressions.ExpressionUtil; -import org.apache.flink.api.java.operators.DataSource; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple7; -import org.apache.flink.api.scala.expressions.JavaBatchTranslator; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class ExpressionsITCase extends MultipleProgramsTestBase { - - - public ExpressionsITCase(TestExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expected = ""; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expected, resultPath); - } - - @Test - public void testArithmetic() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSource<Tuple2<Integer, Integer>> input = - env.fromElements(new Tuple2<Integer, Integer>(5, 10)); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(input, "a, b"); - - ExpressionOperation<JavaBatchTranslator> result = expressionOperation.select( - "a - 5, a + 5, a / 2, a * 2, a % 2, -a"); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "0,10,2,10,1,-5"; - } - - @Test - public void testLogic() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSource<Tuple2<Integer, Boolean>> input = - env.fromElements(new Tuple2<Integer, Boolean>(5, true)); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(input, "a, b"); - - ExpressionOperation<JavaBatchTranslator> result = expressionOperation.select( - "b && true, b && false, b || false, !b"); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "true,false,true,false"; - } - - @Test - public void testComparisons() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSource<Tuple3<Integer, Integer, Integer>> input = - env.fromElements(new Tuple3<Integer, Integer, Integer>(5, 5, 4)); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(input, "a, b, c"); - - ExpressionOperation<JavaBatchTranslator> result = expressionOperation.select( - "a > c, a >= b, a < c, a.isNull, a.isNotNull"); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "true,true,false,false,true"; - } - - @Test - public void testBitwiseOperation() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSource<Tuple2<Byte, Byte>> input = - env.fromElements(new Tuple2<Byte, Byte>((byte) 3, (byte) 5)); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(input, "a, b"); - - ExpressionOperation<JavaBatchTranslator> result = expressionOperation.select( - "a & b, a | b, a ^ b, ~a"); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "1,7,6,-4"; - } - - @Test - public void testBitwiseWithAutocast() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSource<Tuple2<Integer, Byte>> input = - env.fromElements(new Tuple2<Integer, Byte>(3, (byte) 5)); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(input, "a, b"); - - ExpressionOperation<JavaBatchTranslator> result = expressionOperation.select( - "a & b, a | b, a ^ b, ~a"); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "1,7,6,-4"; - } - - @Test(expected = ExpressionException.class) - public void testBitwiseWithNonWorkingAutocast() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSource<Tuple2<Float, Byte>> input = - env.fromElements(new Tuple2<Float, Byte>(3.0f, (byte) 5)); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(input, "a, b"); - - ExpressionOperation<JavaBatchTranslator> result = - expressionOperation.select("a & b, a | b, a ^ b, ~a"); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = ""; - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/FilterITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/FilterITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/FilterITCase.java deleted file mode 100644 index 7da5fa3..0000000 --- a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/FilterITCase.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.expressions.test; - -import org.apache.flink.api.expressions.ExpressionException; -import org.apache.flink.api.expressions.ExpressionOperation; -import org.apache.flink.api.expressions.Row; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.expressions.ExpressionUtil; -import org.apache.flink.api.java.operators.DataSource; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.scala.expressions.JavaBatchTranslator; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class FilterITCase extends MultipleProgramsTestBase { - - - public FilterITCase(TestExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expected = ""; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expected, resultPath); - } - - @Test - public void testAllRejectingFilter() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(input, "a, b, c"); - - ExpressionOperation<JavaBatchTranslator> result = expressionOperation - .filter("false"); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "\n"; - } - - @Test - public void testAllPassingFilter() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(input, "a, b, c"); - - ExpressionOperation<JavaBatchTranslator> result = expressionOperation - .filter("true"); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; - } - - @Test - public void testFilterOnIntegerTupleField() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(input, "a, b, c"); - - ExpressionOperation<JavaBatchTranslator> result = expressionOperation - .filter(" a % 2 === 0 "); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," + - "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + - "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"; - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/GroupedAggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/GroupedAggregationsITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/GroupedAggregationsITCase.java deleted file mode 100644 index 8141dea..0000000 --- a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/GroupedAggregationsITCase.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.expressions.test; - -import org.apache.flink.api.expressions.ExpressionException; -import org.apache.flink.api.expressions.ExpressionOperation; -import org.apache.flink.api.expressions.Row; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.expressions.ExpressionUtil; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.scala.expressions.JavaBatchTranslator; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class GroupedAggregationsITCase extends MultipleProgramsTestBase { - - - public GroupedAggregationsITCase(TestExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expected = ""; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expected, resultPath); - } - - @Test(expected = ExpressionException.class) - public void testGroupingOnNonExistentField() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(input, "a, b, c"); - - ExpressionOperation<JavaBatchTranslator> result = expressionOperation - .groupBy("foo").select("a.avg"); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = ""; - } - - @Test - public void testGroupedAggregate() throws Exception { - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(input, "a, b, c"); - - ExpressionOperation<JavaBatchTranslator> result = expressionOperation - .groupBy("b").select("b, a.sum"); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"; - } - - @Test - public void testGroupingKeyForwardIfNotUsed() throws Exception { - - // the grouping key needs to be forwarded to the intermediate DataSet, even - // if we don't want the key in the output - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - - ExpressionOperation<JavaBatchTranslator> expressionOperation = - ExpressionUtil.from(input, "a, b, c"); - - ExpressionOperation<JavaBatchTranslator> result = expressionOperation - .groupBy("b").select("a.sum"); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"; - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/JoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/JoinITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/JoinITCase.java deleted file mode 100644 index 3ece3dc..0000000 --- a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/JoinITCase.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.expressions.test; - -import org.apache.flink.api.expressions.ExpressionException; -import org.apache.flink.api.expressions.ExpressionOperation; -import org.apache.flink.api.expressions.Row; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.expressions.ExpressionUtil; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.api.scala.expressions.JavaBatchTranslator; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class JoinITCase extends MultipleProgramsTestBase { - - - public JoinITCase(TestExecutionMode mode) { - super(mode); - } - - private String resultPath; - private String expected = ""; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception { - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception { - compareResultsByLinesInMemory(expected, resultPath); - } - - @Test - public void testJoin() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - ExpressionOperation<JavaBatchTranslator> in1 = ExpressionUtil.from(ds1, "a, b, c"); - ExpressionOperation<JavaBatchTranslator> in2 = ExpressionUtil.from(ds2, "d, e, f, g, h"); - - ExpressionOperation<JavaBatchTranslator> result = in1.join(in2).where("b === e").select("c, g"); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"; - } - - @Test - public void testJoinWithFilter() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - ExpressionOperation<JavaBatchTranslator> in1 = ExpressionUtil.from(ds1, "a, b, c"); - ExpressionOperation<JavaBatchTranslator> in2 = ExpressionUtil.from(ds2, "d, e, f, g, h"); - - ExpressionOperation<JavaBatchTranslator> result = in1.join(in2).where("b === e && b < 2").select("c, g"); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "Hi,Hallo\n"; - } - - @Test - public void testJoinWithMultipleKeys() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - ExpressionOperation<JavaBatchTranslator> in1 = ExpressionUtil.from(ds1, "a, b, c"); - ExpressionOperation<JavaBatchTranslator> in2 = ExpressionUtil.from(ds2, "d, e, f, g, h"); - - ExpressionOperation<JavaBatchTranslator> result = in1.join(in2).where("a === d && b === h").select("c, g"); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" + - "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"; - } - - @Test(expected = ExpressionException.class) - public void testJoinNonExistingKey() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - ExpressionOperation<JavaBatchTranslator> in1 = ExpressionUtil.from(ds1, "a, b, c"); - ExpressionOperation<JavaBatchTranslator> in2 = ExpressionUtil.from(ds2, "d, e, f, g, h"); - - ExpressionOperation<JavaBatchTranslator> result = in1.join(in2).where("foo === e").select("c, g"); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = ""; - } - - @Test(expected = ExpressionException.class) - public void testJoinWithNonMatchingKeyTypes() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - ExpressionOperation<JavaBatchTranslator> in1 = ExpressionUtil.from(ds1, "a, b, c"); - ExpressionOperation<JavaBatchTranslator> in2 = ExpressionUtil.from(ds2, "d, e, f, g, h"); - - ExpressionOperation<JavaBatchTranslator> result = in1 - .join(in2).where("a === g").select("c, g"); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = ""; - } - - @Test(expected = ExpressionException.class) - public void testJoinWithAmbiguousFields() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - ExpressionOperation<JavaBatchTranslator> in1 = ExpressionUtil.from(ds1, "a, b, c"); - ExpressionOperation<JavaBatchTranslator> in2 = ExpressionUtil.from(ds2, "d, e, f, g, c"); - - ExpressionOperation<JavaBatchTranslator> result = in1 - .join(in2).where("a === d").select("c, g"); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = ""; - } - - @Test - public void testJoinWithAggregation() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - ExpressionOperation<JavaBatchTranslator> in1 = ExpressionUtil.from(ds1, "a, b, c"); - ExpressionOperation<JavaBatchTranslator> in2 = ExpressionUtil.from(ds2, "d, e, f, g, h"); - - ExpressionOperation<JavaBatchTranslator> result = in1 - .join(in2).where("a === d").select("g.count"); - - DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); - ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "6"; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/SelectITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/SelectITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/SelectITCase.java deleted file mode 100644 index 89ec2e5..0000000 --- a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/SelectITCase.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.expressions.test; - -import org.apache.flink.api.expressions.ExpressionException; -import org.apache.flink.api.expressions.ExpressionOperation; -import org.apache.flink.api.expressions.Row; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.expressions.ExpressionUtil; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.api.scala.expressions.JavaBatchTranslator; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class SelectITCase extends MultipleProgramsTestBase { - - - public SelectITCase(TestExecutionMode mode) { - super(mode); - } - - private String resultPath; - private String expected = ""; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception { - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception { - compareResultsByLinesInMemory(expected, resultPath); - } - - @Test - public void testSimpleSelectAllWithAs() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - - ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a,b,c"); - - ExpressionOperation<JavaBatchTranslator> result = in - .select("a, b, c"); - - DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class); - resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; - - } - - @Test - public void testSimpleSelectWithNaming() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - - ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds); - - ExpressionOperation<JavaBatchTranslator> result = in - .select("f0 as a, f1 as b") - .select("a, b"); - - DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class); - resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + - "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + - "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"; - } - - @Test(expected = ExpressionException.class) - public void testAsWithToFewFields() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - - ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b"); - - DataSet<Row> resultSet = ExpressionUtil.toSet(in, Row.class); - resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = " sorry dude "; - } - - @Test(expected = ExpressionException.class) - public void testAsWithToManyFields() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - - ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b, c, d"); - - DataSet<Row> resultSet = ExpressionUtil.toSet(in, Row.class); - resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = " sorry dude "; - } - - @Test(expected = ExpressionException.class) - public void testAsWithAmbiguousFields() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - - ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b, c, b"); - - DataSet<Row> resultSet = ExpressionUtil.toSet(in, Row.class); - resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = " today's not your day "; - } - - @Test(expected = ExpressionException.class) - public void testOnlyFieldRefInAs() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - - ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b as c, d"); - - DataSet<Row> resultSet = ExpressionUtil.toSet(in, Row.class); - resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "sorry bro"; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/StringExpressionsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/StringExpressionsITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/StringExpressionsITCase.java deleted file mode 100644 index f9f1c6b..0000000 --- a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/StringExpressionsITCase.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.expressions.test; - -import org.apache.flink.api.expressions.ExpressionException; -import org.apache.flink.api.expressions.ExpressionOperation; -import org.apache.flink.api.expressions.Row; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.expressions.ExpressionUtil; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.scala.expressions.JavaBatchTranslator; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class StringExpressionsITCase extends MultipleProgramsTestBase { - - - public StringExpressionsITCase(TestExecutionMode mode) { - super(mode); - } - - private String resultPath; - private String expected = ""; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception { - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception { - compareResultsByLinesInMemory(expected, resultPath); - } - - @Test - public void testSubstring() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple2<String, Integer>> ds = env.fromElements( - new Tuple2<String, Integer>("AAAA", 2), - new Tuple2<String, Integer>("BBBB", 1)); - - ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b"); - - ExpressionOperation<JavaBatchTranslator> result = in - .select("a.substring(0, b)"); - - DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class); - resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "AA\nB"; - } - - @Test - public void testSubstringWithMaxEnd() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple2<String, Integer>> ds = env.fromElements( - new Tuple2<String, Integer>("ABCD", 2), - new Tuple2<String, Integer>("ABCD", 1)); - - ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b"); - - ExpressionOperation<JavaBatchTranslator> result = in - .select("a.substring(b)"); - - DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class); - resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = "CD\nBCD"; - } - - @Test(expected = ExpressionException.class) - public void testNonWorkingSubstring1() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple2<String, Float>> ds = env.fromElements( - new Tuple2<String, Float>("ABCD", 2.0f), - new Tuple2<String, Float>("ABCD", 1.0f)); - - ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b"); - - ExpressionOperation<JavaBatchTranslator> result = in - .select("a.substring(0, b)"); - - DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class); - resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = ""; - } - - @Test(expected = ExpressionException.class) - public void testNonWorkingSubstring2() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple2<String, String>> ds = env.fromElements( - new Tuple2<String, String>("ABCD", "a"), - new Tuple2<String, String>("ABCD", "b")); - - ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b"); - - ExpressionOperation<JavaBatchTranslator> result = in - .select("a.substring(b, 15)"); - - DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class); - resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - - env.execute(); - - expected = ""; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/scala/expressions/test/PageRankExpressionITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/scala/expressions/test/PageRankExpressionITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/scala/expressions/test/PageRankExpressionITCase.java deleted file mode 100644 index b75a2ee..0000000 --- a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/scala/expressions/test/PageRankExpressionITCase.java +++ /dev/null @@ -1,100 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.flink.api.scala.expressions.test; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.examples.scala.PageRankExpression; -import org.apache.flink.test.testdata.PageRankData; -import org.apache.flink.test.util.JavaProgramTestBase; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Collection; -import java.util.LinkedList; - -@RunWith(Parameterized.class) -public class PageRankExpressionITCase extends JavaProgramTestBase { - - private static int NUM_PROGRAMS = 2; - - private int curProgId = config.getInteger("ProgramId", -1); - - private String verticesPath; - private String edgesPath; - private String resultPath; - private String expectedResult; - - public PageRankExpressionITCase(Configuration config) { - super(config); - } - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES); - edgesPath = createTempFile("edges.txt", PageRankData.EDGES); - } - - @Override - protected void testProgram() throws Exception { - expectedResult = runProgram(curProgId); - } - - @Override - protected void postSubmit() throws Exception { - compareKeyValueParisWithDelta(expectedResult, resultPath, " ", 0.01); - } - - @Parameters - public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { - - LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); - - for(int i=1; i <= NUM_PROGRAMS; i++) { - Configuration config = new Configuration(); - config.setInteger("ProgramId", i); - tConfigs.add(config); - } - - return toParameterList(tConfigs); - } - - - public String runProgram(int progId) throws Exception { - - switch(progId) { - case 1: { - PageRankExpression.main(new String[]{verticesPath, edgesPath, resultPath, PageRankData - .NUM_VERTICES + "", "3"}); - return PageRankData.RANKS_AFTER_3_ITERATIONS; - } - case 2: { - // start with a very high number of iteration such that the dynamic convergence criterion must handle termination - PageRankExpression.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"}); - return PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE; - } - - default: - throw new IllegalArgumentException("Invalid program id"); - } - } -}