http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala new file mode 100644 index 0000000..3b5459b --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.typeinfo + +import org.apache.flink.api.common.operators.Operator +import org.apache.flink.api.java.operators.SingleInputOperator +import org.apache.flink.api.java.{DataSet => JavaDataSet} + +/** + * This is a logical operator that can hold a [[RenamingProxyTypeInfo]] for renaming some + * fields of a [[org.apache.flink.api.common.typeutils.CompositeType]]. At runtime this + * disappears since the translation methods simply returns the input. + */ +class RenameOperator[T]( + input: JavaDataSet[T], + renamingTypeInformation: RenamingProxyTypeInfo[T]) + extends SingleInputOperator[T, T, RenameOperator[T]](input, renamingTypeInformation) { + + override protected def translateToDataFlow( + input: Operator[T]): Operator[T] = input +}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala new file mode 100644 index 0000000..6a9cbfe --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.typeinfo + +import java.util + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor +import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator, TypeSerializer} + +/** + * A TypeInformation that is used to rename fields of an underlying CompositeType. This + * allows the system to translate "as" Table API operations to a [[RenameOperator]] + * that does not get translated to a runtime operator. + */ +class RenamingProxyTypeInfo[T]( + tpe: CompositeType[T], + fieldNames: Array[String]) extends CompositeType[T](tpe.getTypeClass) { + + def getUnderlyingType: CompositeType[T] = tpe + + if (tpe.getArity != fieldNames.length) { + throw new IllegalArgumentException(s"Number of field names '${fieldNames.mkString(",")}' and " + + s"number of fields in underlying type $tpe do not match.") + } + + if (fieldNames.toSet.size != fieldNames.length) { + throw new IllegalArgumentException(s"New field names must be unique. " + + s"Names: ${fieldNames.mkString(",")}.") + } + + override def getFieldIndex(fieldName: String): Int = { + val result = fieldNames.indexOf(fieldName) + if (result != fieldNames.lastIndexOf(fieldName)) { + -2 + } else { + result + } + } + override def getFieldNames: Array[String] = fieldNames + + override def isBasicType: Boolean = tpe.isBasicType + + override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = + tpe.createSerializer(executionConfig) + + override def getArity: Int = tpe.getArity + + override def isKeyType: Boolean = tpe.isKeyType + + override def getTypeClass: Class[T] = tpe.getTypeClass + + override def getGenericParameters: java.util.List[TypeInformation[_]] = tpe.getGenericParameters + + override def isTupleType: Boolean = tpe.isTupleType + + override def toString = { + s"RenamingType(type: ${tpe.getTypeClass.getSimpleName}; " + + s"fields: ${fieldNames.mkString(", ")})" + } + + override def getTypeAt[X](pos: Int): TypeInformation[X] = tpe.getTypeAt(pos) + + override def getTotalFields: Int = tpe.getTotalFields + + override def createComparator( + logicalKeyFields: Array[Int], + orders: Array[Boolean], + logicalFieldOffset: Int, + executionConfig: ExecutionConfig) = + tpe.createComparator(logicalKeyFields, orders, logicalFieldOffset, executionConfig) + + // These are never called since we override create comparator + override protected def initializeNewComparator(localKeyCount: Int): Unit = + throw new RuntimeException("Cannot happen.") + + override protected def getNewComparator(executionConfig: ExecutionConfig): TypeComparator[T] = + throw new RuntimeException("Cannot happen.") + + override protected def addCompareField(fieldId: Int, comparator: TypeComparator[_]): Unit = + throw new RuntimeException("Cannot happen.") + + override def getFlatFields( + fieldExpression: String, + offset: Int, + result: util.List[FlatFieldDescriptor]): Unit = { + tpe.getFlatFields(fieldExpression, offset, result) + } + + override def getTypeAt[X](fieldExpression: String): TypeInformation[X] = { + tpe.getTypeAt(fieldExpression) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala new file mode 100644 index 0000000..8a8dc3d --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.typeinfo + +import org.apache.flink.api.table.Row +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.core.memory.{DataOutputView, DataInputView} +; + +/** + * Serializer for [[Row]]. + */ +class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]]) + extends TypeSerializer[Row] { + + override def isImmutableType: Boolean = false + + override def getLength: Int = -1 + + override def duplicate = this + + override def createInstance: Row = { + new Row(fieldSerializers.length) + } + + override def copy(from: Row, reuse: Row): Row = { + val len = fieldSerializers.length + + if (from.productArity != len) { + throw new RuntimeException("Row arity of reuse and from do not match.") + } + var i = 0 + while (i < len) { + val reuseField = reuse.productElement(i) + val fromField = from.productElement(i).asInstanceOf[AnyRef] + val copy = fieldSerializers(i).copy(fromField, reuseField) + reuse.setField(i, copy) + i += 1 + } + reuse + } + + override def copy(from: Row): Row = { + val len = fieldSerializers.length + + if (from.productArity != len) { + throw new RuntimeException("Row arity of reuse and from do not match.") + } + val result = new Row(len) + var i = 0 + while (i < len) { + val fromField = from.productElement(i).asInstanceOf[AnyRef] + val copy = fieldSerializers(i).copy(fromField) + result.setField(i, copy) + i += 1 + } + result + } + + override def serialize(value: Row, target: DataOutputView) { + val len = fieldSerializers.length + var i = 0 + while (i < len) { + val serializer = fieldSerializers(i) + serializer.serialize(value.productElement(i), target) + i += 1 + } + } + + override def deserialize(reuse: Row, source: DataInputView): Row = { + val len = fieldSerializers.length + + if (reuse.productArity != len) { + throw new RuntimeException("Row arity of reuse and fields do not match.") + } + + var i = 0 + while (i < len) { + val field = reuse.productElement(i).asInstanceOf[AnyRef] + reuse.setField(i, fieldSerializers(i).deserialize(field, source)) + i += 1 + } + reuse + } + + override def deserialize(source: DataInputView): Row = { + val len = fieldSerializers.length + + val result = new Row(len) + var i = 0 + while (i < len) { + result.setField(i, fieldSerializers(i).deserialize(source)) + i += 1 + } + result + } + + override def copy(source: DataInputView, target: DataOutputView): Unit = { + val len = fieldSerializers.length + var i = 0 + while (i < len) { + fieldSerializers(i).copy(source, target) + i += 1 + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala new file mode 100644 index 0000000..7ffa91c --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.typeinfo + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.table.Row +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.table.tree.Expression +import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo} + +/** + * TypeInformation for [[Row]]. + */ +class RowTypeInfo( + fieldTypes: Seq[TypeInformation[_]], + fieldNames: Seq[String]) + extends CaseClassTypeInfo[Row](classOf[Row], Array(), fieldTypes, fieldNames) { + + def this(fields: Seq[Expression]) = this(fields.map(_.typeInfo), fields.map(_.name)) + + if (fieldNames.toSet.size != fieldNames.size) { + throw new IllegalArgumentException("Field names must be unique.") + } + + override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[Row] = { + val fieldSerializers: Array[TypeSerializer[Any]] = new Array[TypeSerializer[Any]](getArity) + for (i <- 0 until getArity) { + fieldSerializers(i) = this.types(i).createSerializer(executionConfig) + .asInstanceOf[TypeSerializer[Any]] + } + + new RowSerializer(fieldSerializers) + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala new file mode 100644 index 0000000..604bdcf --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.scala + +import org.apache.flink.api.table.tree.Literal +import org.apache.flink.api.common.functions.GroupReduceFunction +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.examples.java.graph.util.PageRankData +import org.apache.flink.util.Collector + +import _root_.scala.collection.JavaConverters._ + +/** +* A basic implementation of the Page Rank algorithm using a bulk iteration. +* +* This implementation requires a set of pages and a set of directed links as input and works as +* follows. +* +* In each iteration, the rank of every page is evenly distributed to all pages it points to. Each +* page collects the partial ranks of all pages that point to it, sums them up, and applies a +* dampening factor to the sum. The result is the new rank of the page. A new iteration is started +* with the new ranks of all pages. This implementation terminates after a fixed number of +* iterations. This is the Wikipedia entry for the +* [[http://en.wikipedia.org/wiki/Page_rank Page Rank algorithm]] +* +* Input files are plain text files and must be formatted as follows: +* +* - Pages represented as an (long) ID separated by new-line characters. +* For example `"1\n2\n12\n42\n63\n"` gives five pages with IDs 1, 2, 12, 42, and 63. +* - Links are represented as pairs of page IDs which are separated by space characters. Links +* are separated by new-line characters. +* For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (directed) links (1)->(2), (2)->(12), +* (1)->(12), and (42)->(63). For this simple implementation it is required that each page has +* at least one incoming and one outgoing link (a page can point to itself). +* +* Usage: +* {{{ +* PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations> +* }}} +* +* If no parameters are provided, the program is run with default data from +* [[org.apache.flink.examples.java.graph.util.PageRankData]] and 10 iterations. +* +* This example shows how to use: +* +* - Bulk Iterations +* - Table API expressions +*/ +object PageRankExpression { + + private final val DAMPENING_FACTOR: Double = 0.85 + private final val EPSILON: Double = 0.0001 + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + // set up execution environment + val env = ExecutionEnvironment.getExecutionEnvironment + + // read input data + val pagesWithRanks = getPagesDataSet(env).map { p => (p, 1.0 / numPages) } + .as('pageId, 'rank) + + val links = getLinksDataSet(env) + + // build adjacency list from link input + val adjacencyLists = links + .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, AdjacencyList] { + + override def reduce( + values: _root_.java.lang.Iterable[Link], + out: Collector[AdjacencyList]): Unit = { + var outputId = -1L + val outputList = values.asScala map { t => outputId = t.sourceId; t.targetId } + out.collect(new AdjacencyList(outputId, outputList.toArray)) + } + + }).as('sourceId, 'targetIds) + + // start iteration + val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) { + currentRanks => + val newRanks = currentRanks.toTable + // distribute ranks to target pages + .join(adjacencyLists).where('pageId === 'sourceId) + .select('rank, 'targetIds).as[RankOutput] + .flatMap { + (in, out: Collector[(Long, Double)]) => + val targets = in.targetIds + val len = targets.length + targets foreach { t => out.collect((t, in.rank / len )) } + } + .as('pageId, 'rank) + // collect ranks and sum them up + .groupBy('pageId).select('pageId, 'rank.sum as 'rank) + // apply dampening factor + .select( + 'pageId, + ('rank * DAMPENING_FACTOR) + (Literal(1) - DAMPENING_FACTOR) / numPages as 'rank) + + + val termination = currentRanks.toTable + .as('curId, 'curRank).join(newRanks.as('newId, 'newRank)) + .where('curId === 'newId && ('curRank - 'newRank).abs > EPSILON) + + (newRanks, termination) + } + + val result = finalRanks + + // emit result + if (fileOutput) { + result.writeAsCsv(outputPath, "\n", " ") + } else { + result.print() + } + + // execute program + env.execute("Expression PageRank Example") + } + + // ************************************************************************* + // USER TYPES + // ************************************************************************* + + case class Link(sourceId: Long, targetId: Long) + + case class Page(pageId: Long, rank: Double) + + case class AdjacencyList(sourceId: Long, targetIds: Array[Long]) + + case class RankOutput(rank: Double, targetIds: Array[Long]) + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private def parseParameters(args: Array[String]): Boolean = { + if (args.length > 0) { + fileOutput = true + if (args.length == 5) { + pagesInputPath = args(0) + linksInputPath = args(1) + outputPath = args(2) + numPages = args(3).toLong + maxIterations = args(4).toInt + } else { + System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num " + + "pages> <num iterations>") + false + } + } else { + System.out.println("Executing PageRank Basic example with default parameters and built-in " + + "default data.") + System.out.println(" Provide parameters to read input data from files.") + System.out.println(" See the documentation for the correct format of input files.") + System.out.println(" Usage: PageRankBasic <pages path> <links path> <output path> <num " + + "pages> <num iterations>") + + numPages = PageRankData.getNumberOfPages + } + true + } + + private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = { + if (fileOutput) { + env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = " ", lineDelimiter = "\n") + .map(x => x._1) + } else { + env.generateSequence(1, 15) + } + } + + private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = { + if (fileOutput) { + env.readCsvFile[Link](linksInputPath, fieldDelimiter = " ", + includedFields = Array(0, 1)) + } else { + val edges = PageRankData.EDGES.map { case Array(v1, v2) => Link(v1.asInstanceOf[Long], + v2.asInstanceOf[Long])} + env.fromCollection(edges) + } + } + + private var fileOutput: Boolean = false + private var pagesInputPath: String = null + private var linksInputPath: String = null + private var outputPath: String = null + private var numPages: Double = 0 + private var maxIterations: Int = 10 + +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala new file mode 100644 index 0000000..0ff97bf --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.scala + +import org.apache.flink.streaming.api.scala._ + +import org.apache.flink.api.scala.table._ + +import scala.Stream._ +import scala.math._ +import scala.language.postfixOps +import scala.util.Random + +/** + * Simple example for demonstrating the use of the Table API with Flink Streaming. + */ +object StreamingExpressionFilter { + + case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long) extends Serializable + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val cars = genCarStream().toTable + .filter('carId === 0) + .select('carId, 'speed, 'distance + 1000 as 'distance, 'time % 5 as 'time) + .as[CarEvent] + + cars.print() + + StreamExecutionEnvironment.getExecutionEnvironment.execute("TopSpeedWindowing") + + } + + def genCarStream(): DataStream[CarEvent] = { + + def nextSpeed(carEvent : CarEvent) : CarEvent = + { + val next = + if (Random.nextBoolean()) min(100, carEvent.speed + 5) else max(0, carEvent.speed - 5) + CarEvent(carEvent.carId, next, carEvent.distance + next/3.6d,System.currentTimeMillis) + } + def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] = + { + Thread.sleep(1000) + speeds.append(carStream(speeds.map(nextSpeed))) + } + carStream(range(0, numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis()))) + } + + def parseParameters(args: Array[String]): Boolean = { + if (args.length > 0) { + if (args.length == 3) { + numOfCars = args(0).toInt + evictionSec = args(1).toInt + triggerMeters = args(2).toDouble + true + } + else { + System.err.println("Usage: TopSpeedWindowing <numCars> <evictSec> <triggerMeters>") + false + } + }else{ + true + } + } + + var numOfCars = 2 + var evictionSec = 10 + var triggerMeters = 50d + +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala new file mode 100644 index 0000000..96ec4ba --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.scala + +import org.apache.flink.api.table.tree.Literal +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ + +/** + * This program implements a modified version of the TPC-H query 3. The + * example demonstrates how to assign names to fields by extending the Tuple class. + * The original query can be found at + * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf) + * (page 29). + * + * This program implements the following SQL equivalent: + * + * {{{ + * SELECT + * l_orderkey, + * SUM(l_extendedprice*(1-l_discount)) AS revenue, + * o_orderdate, + * o_shippriority + * FROM customer, + * orders, + * lineitem + * WHERE + * c_mktsegment = '[SEGMENT]' + * AND c_custkey = o_custkey + * AND l_orderkey = o_orderkey + * AND o_orderdate < date '[DATE]' + * AND l_shipdate > date '[DATE]' + * GROUP BY + * l_orderkey, + * o_orderdate, + * o_shippriority; + * }}} + * + * Compared to the original TPC-H query this version does not sort the result by revenue + * and orderdate. + * + * Input files are plain text CSV files using the pipe character ('|') as field separator + * as generated by the TPC-H data generator which is available at + * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/). + * + * Usage: + * {{{ + * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv path> <result path> + * }}} + * + * This example shows how to use: + * - Table API expressions + * + */ +object TPCHQuery3Expression { + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + // set filter date + val dateFormat = new _root_.java.text.SimpleDateFormat("yyyy-MM-dd") + val date = dateFormat.parse("1995-03-12") + + // get execution environment + val env = ExecutionEnvironment.getExecutionEnvironment + + val lineitems = getLineitemDataSet(env) + .filter( l => dateFormat.parse(l.shipDate).after(date) ) + .as('id, 'extdPrice, 'discount, 'shipDate) + + val customers = getCustomerDataSet(env) + .as('id, 'mktSegment) + .filter( 'mktSegment === "AUTOMOBILE" ) + + val orders = getOrdersDataSet(env) + .filter( o => dateFormat.parse(o.orderDate).before(date) ) + .as('orderId, 'custId, 'orderDate, 'shipPrio) + + val items = + orders.join(customers) + .where('custId === 'id) + .select('orderId, 'orderDate, 'shipPrio) + .join(lineitems) + .where('orderId === 'id) + .select( + 'orderId, + 'extdPrice * (Literal(1.0f) - 'discount) as 'revenue, + 'orderDate, + 'shipPrio) + + val result = items + .groupBy('orderId, 'orderDate, 'shipPrio) + .select('orderId, 'revenue.sum, 'orderDate, 'shipPrio) + + // emit result + result.writeAsCsv(outputPath, "\n", "|") + + // execute program + env.execute("Scala TPCH Query 3 (Expression) Example") + } + + // ************************************************************************* + // USER DATA TYPES + // ************************************************************************* + + case class Lineitem(id: Long, extdPrice: Double, discount: Double, shipDate: String) + case class Customer(id: Long, mktSegment: String) + case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long) + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private var lineitemPath: String = null + private var customerPath: String = null + private var ordersPath: String = null + private var outputPath: String = null + + private def parseParameters(args: Array[String]): Boolean = { + if (args.length == 4) { + lineitemPath = args(0) + customerPath = args(1) + ordersPath = args(2) + outputPath = args(3) + true + } else { + System.err.println("This program expects data from the TPC-H benchmark as input data.\n" + + " Due to legal restrictions, we can not ship generated data.\n" + + " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + + " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" + + "<orders-csv path> <result path>"); + false + } + } + + private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = { + env.readCsvFile[Lineitem]( + lineitemPath, + fieldDelimiter = "|", + includedFields = Array(0, 5, 6, 10) ) + } + + private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = { + env.readCsvFile[Customer]( + customerPath, + fieldDelimiter = "|", + includedFields = Array(0, 6) ) + } + + private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = { + env.readCsvFile[Order]( + ordersPath, + fieldDelimiter = "|", + includedFields = Array(0, 1, 4, 7) ) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java new file mode 100644 index 0000000..0b2a5df --- /dev/null +++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.table.test; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.table.ExpressionException; +import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.java.table.JavaBatchTranslator; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class AggregationsITCase extends MultipleProgramsTestBase { + + + public AggregationsITCase(TestExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expected = ""; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testAggregationTypes() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + Table<JavaBatchTranslator> table = tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env)); + + Table<JavaBatchTranslator> result = + table.select("f0.sum, f0.min, f0.max, f0.count, f0.avg"); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "231,1,21,21,11"; + } + + @Test(expected = ExpressionException.class) + public void testAggregationOnNonExistingField() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env)); + + Table<JavaBatchTranslator> result = + table.select("foo.avg"); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } + + @Test + public void testWorkingAggregationDataTypes() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input = + env.fromElements( + new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"), + new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Ciao")); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(input); + + Table<JavaBatchTranslator> result = + table.select("f0.avg, f1.avg, f2.avg, f3.avg, f4.avg, f5.avg, f6.count"); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "1,1,1,1,1.5,1.5,2"; + } + + @Test + public void testAggregationWithArithmetic() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple2<Float, String>> input = + env.fromElements( + new Tuple2<Float, String>(1f, "Hello"), + new Tuple2<Float, String>(2f, "Ciao")); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(input); + + Table<JavaBatchTranslator> result = + table.select("(f0 + 2).avg + 2, f1.count + \" THE COUNT\""); + + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "5.5,2 THE COUNT"; + } + + @Test(expected = ExpressionException.class) + public void testNonWorkingDataTypes() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<Float, String>(1f, + "Hello")); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(input); + + Table<JavaBatchTranslator> result = + table.select("f1.sum"); + + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } + + @Test(expected = ExpressionException.class) + public void testNoNestedAggregation() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<Float, String>(1f, "Hello")); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(input); + + Table<JavaBatchTranslator> result = + table.select("f0.sum.sum"); + + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java new file mode 100644 index 0000000..ee877e9 --- /dev/null +++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import org.apache.flink.api.table.ExpressionException; +import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.table.JavaBatchTranslator; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class AsITCase extends MultipleProgramsTestBase { + + + public AsITCase(TestExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expected = ""; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testAs() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a, b, c"); + + DataSet<Row> ds = tableEnv.toSet(table, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; + } + + @Test(expected = ExpressionException.class) + public void testAsWithToFewFields() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a, b"); + + DataSet<Row> ds = tableEnv.toSet(table, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } + + @Test(expected = ExpressionException.class) + public void testAsWithToManyFields() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d"); + + DataSet<Row> ds = tableEnv.toSet(table, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } + + @Test(expected = ExpressionException.class) + public void testAsWithAmbiguousFields() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a, b, b"); + + DataSet<Row> ds = tableEnv.toSet(table, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } + + @Test(expected = ExpressionException.class) + public void testAsWithNonFieldReference1() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c"); + + DataSet<Row> ds = tableEnv.toSet(table, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } + + @Test(expected = ExpressionException.class) + public void testAsWithNonFieldReference2() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a as foo, b," + + " c"); + + DataSet<Row> ds = tableEnv.toSet(table, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java new file mode 100644 index 0000000..57824eb --- /dev/null +++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.java.table.JavaBatchTranslator; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class CastingITCase extends MultipleProgramsTestBase { + + + public CastingITCase(TestExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expected = ""; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testAutoCastToString() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input = + env.fromElements(new Tuple7<Byte, Short, Integer, Long, Float, Double, String>( + (byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello")); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(input); + + Table<JavaBatchTranslator> result = table.select( + "f0 + 'b', f1 + 's', f2 + 'i', f3 + 'L', f4 + 'f', f5 + \"d\""); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "1b,1s,1i,1L,1.0f,1.0d"; + } + + @Test + public void testNumericAutocastInArithmetic() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input = + env.fromElements(new Tuple7<Byte, Short, Integer, Long, Float, Double, String>( + (byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello")); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(input); + + Table<JavaBatchTranslator> result = table.select("f0 + 1, f1 +" + + " 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1"); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "2,2,2,2.0,2.0,2.0"; + } + + @Test + public void testNumericAutocastInComparison() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input = + env.fromElements( + new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"), + new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Hello")); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(input, "a,b,c,d,e,f,g"); + + Table<JavaBatchTranslator> result = table + .filter("a > 1 && b > 1 && c > 1L && d > 1.0f && e > 1.0d && f > 1"); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "2,2,2,2,2.0,2.0,Hello"; + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java new file mode 100644 index 0000000..938af2d --- /dev/null +++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import org.apache.flink.api.table.ExpressionException; +import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.table.JavaBatchTranslator; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class ExpressionsITCase extends MultipleProgramsTestBase { + + + public ExpressionsITCase(TestExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expected = ""; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testArithmetic() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple2<Integer, Integer>> input = + env.fromElements(new Tuple2<Integer, Integer>(5, 10)); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(input, "a, b"); + + Table<JavaBatchTranslator> result = table.select( + "a - 5, a + 5, a / 2, a * 2, a % 2, -a"); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "0,10,2,10,1,-5"; + } + + @Test + public void testLogic() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple2<Integer, Boolean>> input = + env.fromElements(new Tuple2<Integer, Boolean>(5, true)); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(input, "a, b"); + + Table<JavaBatchTranslator> result = table.select( + "b && true, b && false, b || false, !b"); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "true,false,true,false"; + } + + @Test + public void testComparisons() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple3<Integer, Integer, Integer>> input = + env.fromElements(new Tuple3<Integer, Integer, Integer>(5, 5, 4)); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(input, "a, b, c"); + + Table<JavaBatchTranslator> result = table.select( + "a > c, a >= b, a < c, a.isNull, a.isNotNull"); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "true,true,false,false,true"; + } + + @Test + public void testBitwiseOperation() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple2<Byte, Byte>> input = + env.fromElements(new Tuple2<Byte, Byte>((byte) 3, (byte) 5)); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(input, "a, b"); + + Table<JavaBatchTranslator> result = table.select( + "a & b, a | b, a ^ b, ~a"); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "1,7,6,-4"; + } + + @Test + public void testBitwiseWithAutocast() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple2<Integer, Byte>> input = + env.fromElements(new Tuple2<Integer, Byte>(3, (byte) 5)); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(input, "a, b"); + + Table<JavaBatchTranslator> result = table.select( + "a & b, a | b, a ^ b, ~a"); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "1,7,6,-4"; + } + + @Test(expected = ExpressionException.class) + public void testBitwiseWithNonWorkingAutocast() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple2<Float, Byte>> input = + env.fromElements(new Tuple2<Float, Byte>(3.0f, (byte) 5)); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(input, "a, b"); + + Table<JavaBatchTranslator> result = + table.select("a & b, a | b, a ^ b, ~a"); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java new file mode 100644 index 0000000..e709079 --- /dev/null +++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.table.JavaBatchTranslator; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class FilterITCase extends MultipleProgramsTestBase { + + + public FilterITCase(TestExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expected = ""; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testAllRejectingFilter() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(input, "a, b, c"); + + Table<JavaBatchTranslator> result = table + .filter("false"); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "\n"; + } + + @Test + public void testAllPassingFilter() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(input, "a, b, c"); + + Table<JavaBatchTranslator> result = table + .filter("true"); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; + } + + @Test + public void testFilterOnIntegerTupleField() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(input, "a, b, c"); + + Table<JavaBatchTranslator> result = table + .filter(" a % 2 === 0 "); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," + + "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + + "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"; + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java new file mode 100644 index 0000000..e0dd28e --- /dev/null +++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import org.apache.flink.api.table.ExpressionException; +import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.table.JavaBatchTranslator; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class GroupedAggregationsITCase extends MultipleProgramsTestBase { + + + public GroupedAggregationsITCase(TestExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expected = ""; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test(expected = ExpressionException.class) + public void testGroupingOnNonExistentField() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(input, "a, b, c"); + + Table<JavaBatchTranslator> result = table + .groupBy("foo").select("a.avg"); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } + + @Test + public void testGroupedAggregate() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(input, "a, b, c"); + + Table<JavaBatchTranslator> result = table + .groupBy("b").select("b, a.sum"); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"; + } + + @Test + public void testGroupingKeyForwardIfNotUsed() throws Exception { + + // the grouping key needs to be forwarded to the intermediate DataSet, even + // if we don't want the key in the output + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + + Table<JavaBatchTranslator> table = + tableEnv.toTable(input, "a, b, c"); + + Table<JavaBatchTranslator> result = table + .groupBy("b").select("a.sum"); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"; + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java new file mode 100644 index 0000000..9ed8f84 --- /dev/null +++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import org.apache.flink.api.table.ExpressionException; +import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.table.JavaBatchTranslator; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class JoinITCase extends MultipleProgramsTestBase { + + + public JoinITCase(TestExecutionMode mode) { + super(mode); + } + + private String resultPath; + private String expected = ""; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception { + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testJoin() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + Table<JavaBatchTranslator> in1 = tableEnv.toTable(ds1, "a, b, c"); + Table<JavaBatchTranslator> in2 = tableEnv.toTable(ds2, "d, e, f, g, h"); + + Table<JavaBatchTranslator> result = in1.join(in2).where("b === e").select("c, g"); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"; + } + + @Test + public void testJoinWithFilter() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + Table<JavaBatchTranslator> in1 = tableEnv.toTable(ds1, "a, b, c"); + Table<JavaBatchTranslator> in2 = tableEnv.toTable(ds2, "d, e, f, g, h"); + + Table<JavaBatchTranslator> result = in1.join(in2).where("b === e && b < 2").select("c, g"); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "Hi,Hallo\n"; + } + + @Test + public void testJoinWithMultipleKeys() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + Table<JavaBatchTranslator> in1 = tableEnv.toTable(ds1, "a, b, c"); + Table<JavaBatchTranslator> in2 = tableEnv.toTable(ds2, "d, e, f, g, h"); + + Table<JavaBatchTranslator> result = in1.join(in2).where("a === d && b === h").select("c, g"); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" + + "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"; + } + + @Test(expected = ExpressionException.class) + public void testJoinNonExistingKey() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + Table<JavaBatchTranslator> in1 = tableEnv.toTable(ds1, "a, b, c"); + Table<JavaBatchTranslator> in2 = tableEnv.toTable(ds2, "d, e, f, g, h"); + + Table<JavaBatchTranslator> result = in1.join(in2).where("foo === e").select("c, g"); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } + + @Test(expected = ExpressionException.class) + public void testJoinWithNonMatchingKeyTypes() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + Table<JavaBatchTranslator> in1 = tableEnv.toTable(ds1, "a, b, c"); + Table<JavaBatchTranslator> in2 = tableEnv.toTable(ds2, "d, e, f, g, h"); + + Table<JavaBatchTranslator> result = in1 + .join(in2).where("a === g").select("c, g"); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } + + @Test(expected = ExpressionException.class) + public void testJoinWithAmbiguousFields() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + Table<JavaBatchTranslator> in1 = tableEnv.toTable(ds1, "a, b, c"); + Table<JavaBatchTranslator> in2 = tableEnv.toTable(ds2, "d, e, f, g, c"); + + Table<JavaBatchTranslator> result = in1 + .join(in2).where("a === d").select("c, g"); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } + + @Test + public void testJoinWithAggregation() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + Table<JavaBatchTranslator> in1 = tableEnv.toTable(ds1, "a, b, c"); + Table<JavaBatchTranslator> in2 = tableEnv.toTable(ds2, "d, e, f, g, h"); + + Table<JavaBatchTranslator> result = in1 + .join(in2).where("a === d").select("g.count"); + + DataSet<Row> ds = tableEnv.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "6"; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java new file mode 100644 index 0000000..2f18ce6 --- /dev/null +++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import org.apache.flink.api.table.ExpressionException; +import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.table.JavaBatchTranslator; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class SelectITCase extends MultipleProgramsTestBase { + + + public SelectITCase(TestExecutionMode mode) { + super(mode); + } + + private String resultPath; + private String expected = ""; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception { + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testSimpleSelectAllWithAs() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + Table<JavaBatchTranslator> in = tableEnv.toTable(ds, "a,b,c"); + + Table<JavaBatchTranslator> result = in + .select("a, b, c"); + + DataSet<Row> resultSet = tableEnv.toSet(result, Row.class); + resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; + + } + + @Test + public void testSimpleSelectWithNaming() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + Table<JavaBatchTranslator> in = tableEnv.toTable(ds); + + Table<JavaBatchTranslator> result = in + .select("f0 as a, f1 as b") + .select("a, b"); + + DataSet<Row> resultSet = tableEnv.toSet(result, Row.class); + resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"; + } + + @Test(expected = ExpressionException.class) + public void testAsWithToFewFields() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + Table<JavaBatchTranslator> in = tableEnv.toTable(ds, "a, b"); + + DataSet<Row> resultSet = tableEnv.toSet(in, Row.class); + resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = " sorry dude "; + } + + @Test(expected = ExpressionException.class) + public void testAsWithToManyFields() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + Table<JavaBatchTranslator> in = tableEnv.toTable(ds, "a, b, c, d"); + + DataSet<Row> resultSet = tableEnv.toSet(in, Row.class); + resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = " sorry dude "; + } + + @Test(expected = ExpressionException.class) + public void testAsWithAmbiguousFields() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + Table<JavaBatchTranslator> in = tableEnv.toTable(ds, "a, b, c, b"); + + DataSet<Row> resultSet = tableEnv.toSet(in, Row.class); + resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = " today's not your day "; + } + + @Test(expected = ExpressionException.class) + public void testOnlyFieldRefInAs() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + Table<JavaBatchTranslator> in = tableEnv.toTable(ds, "a, b as c, d"); + + DataSet<Row> resultSet = tableEnv.toSet(in, Row.class); + resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "sorry bro"; + } +}