http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/RegressionData.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/RegressionData.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/RegressionData.scala new file mode 100644 index 0000000..062f510 --- /dev/null +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/RegressionData.scala @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.regression + +import org.apache.flink.ml.common.LabeledVector +import org.apache.flink.ml.math.DenseVector + +object RegressionData { + + val expectedWeights = Array[Double](3.0094) + val expectedWeight0: Double = 9.8158 + val expectedSquaredResidualSum: Double = 49.7596/2 + + val data: Seq[LabeledVector] = Seq( + LabeledVector(10.7949, DenseVector(0.2714)), + LabeledVector(10.6426, DenseVector(0.1008)), + LabeledVector(10.5603, DenseVector(0.5078)), + LabeledVector(12.8707, DenseVector(0.5856)), + LabeledVector(10.7026, DenseVector(0.7629)), + LabeledVector(9.8571, DenseVector(0.0830)), + LabeledVector(10.5001, DenseVector(0.6616)), + LabeledVector(11.2063, DenseVector(0.5170)), + LabeledVector(9.1892, DenseVector(0.1710)), + LabeledVector(12.2408, DenseVector(0.9386)), + LabeledVector(11.0307, DenseVector(0.5905)), + LabeledVector(10.1369, DenseVector(0.4406)), + LabeledVector(10.7609, DenseVector(0.9419)), + LabeledVector(12.5328, DenseVector(0.6559)), + LabeledVector(13.3560, DenseVector(0.4519)), + LabeledVector(14.7424, DenseVector(0.8397)), + LabeledVector(11.1057, DenseVector(0.5326)), + LabeledVector(11.6157, DenseVector(0.5539)), + LabeledVector(11.5744, DenseVector(0.6801)), + LabeledVector(11.1775, DenseVector(0.3672)), + LabeledVector(9.7991, DenseVector(0.2393)), + LabeledVector(9.8173, DenseVector(0.5789)), + LabeledVector(12.5642, DenseVector(0.8669)), + LabeledVector(9.9952, DenseVector(0.4068)), + LabeledVector(8.4354, DenseVector(0.1126)), + LabeledVector(13.7058, DenseVector(0.4438)), + LabeledVector(10.6672, DenseVector(0.3002)), + LabeledVector(11.6080, DenseVector(0.4014)), + LabeledVector(13.6926, DenseVector(0.8334)), + LabeledVector(9.5261, DenseVector(0.4036)), + LabeledVector(11.5837, DenseVector(0.3902)), + LabeledVector(11.5831, DenseVector(0.3604)), + LabeledVector(10.5038, DenseVector(0.1403)), + LabeledVector(10.9382, DenseVector(0.2601)), + LabeledVector(9.7325, DenseVector(0.0868)), + LabeledVector(12.0113, DenseVector(0.4294)), + LabeledVector(9.9219, DenseVector(0.2573)), + LabeledVector(10.0963, DenseVector(0.2976)), + LabeledVector(11.9999, DenseVector(0.4249)), + LabeledVector(12.0442, DenseVector(0.1192)) + ) + + val expectedNoInterceptWeights = Array[Double](5.0) + val expectedNoInterceptWeight0: Double = 0.0 + + val noInterceptData: Seq[LabeledVector] = Seq( + LabeledVector(217.228709, DenseVector(43.4457419)), + LabeledVector(450.037048, DenseVector(90.0074095)), + LabeledVector( 67.553478, DenseVector(13.5106955)), + LabeledVector( 26.976958, DenseVector( 5.3953916)), + LabeledVector(403.808709, DenseVector(80.7617418)), + LabeledVector(203.932158, DenseVector(40.7864316)), + LabeledVector(146.974958, DenseVector(29.3949916)), + LabeledVector( 46.869291, DenseVector( 9.3738582)), + LabeledVector(450.780834, DenseVector(90.1561667)), + LabeledVector(386.535619, DenseVector(77.3071239)), + LabeledVector(202.644342, DenseVector(40.5288684)), + LabeledVector(227.586507, DenseVector(45.5173013)), + LabeledVector(408.801080, DenseVector(81.7602161)), + LabeledVector(146.118550, DenseVector(29.2237100)), + LabeledVector(156.475382, DenseVector(31.2950763)), + LabeledVector(291.822515, DenseVector(58.3645030)), + LabeledVector( 61.506887, DenseVector(12.3013775)), + LabeledVector(363.949913, DenseVector(72.7899827)), + LabeledVector(398.050744, DenseVector(79.6101487)), + LabeledVector(246.053111, DenseVector(49.2106221)), + LabeledVector(225.494661, DenseVector(45.0989323)), + LabeledVector(265.986844, DenseVector(53.1973689)), + LabeledVector(110.459912, DenseVector(22.0919823)), + LabeledVector(122.716974, DenseVector(24.5433947)), + LabeledVector(128.014314, DenseVector(25.6028628)), + LabeledVector(252.538913, DenseVector(50.5077825)), + LabeledVector(393.632082, DenseVector(78.7264163)), + LabeledVector( 77.698941, DenseVector(15.5397881)), + LabeledVector(206.187568, DenseVector(41.2375135)), + LabeledVector(244.073426, DenseVector(48.8146851)), + LabeledVector(364.946890, DenseVector(72.9893780)), + LabeledVector( 4.627494, DenseVector( 0.9254987)), + LabeledVector(485.359565, DenseVector(97.0719130)), + LabeledVector(347.359190, DenseVector(69.4718380)), + LabeledVector(419.663211, DenseVector(83.9326422)), + LabeledVector(488.518318, DenseVector(97.7036635)), + LabeledVector( 28.082962, DenseVector( 5.6165925)), + LabeledVector(211.002441, DenseVector(42.2004881)), + LabeledVector(250.624124, DenseVector(50.1248248)), + LabeledVector(489.776669, DenseVector(97.9553337)) + ) + + + val expectedPolynomialWeights = Seq(0.2375, -0.3493, -0.1674) + val expectedPolynomialWeight0 = 0.0233 + val expectedPolynomialSquaredResidualSum = 1.5389e+03/2 + + val polynomialData: Seq[LabeledVector] = Seq( + LabeledVector(2.1415, DenseVector(3.6663)), + LabeledVector(10.9835, DenseVector(4.0761)), + LabeledVector(7.2507, DenseVector(0.5714)), + LabeledVector(11.9274, DenseVector(4.1102)), + LabeledVector(-4.2798, DenseVector(2.8456)), + LabeledVector(7.1929, DenseVector(0.4389)), + LabeledVector(4.5097, DenseVector(1.2532)), + LabeledVector(-3.6059, DenseVector(2.4610)), + LabeledVector(18.1132, DenseVector(4.3088)), + LabeledVector(19.2674, DenseVector(4.3420)), + LabeledVector(7.0664, DenseVector(0.7093)), + LabeledVector(20.1836, DenseVector(4.3677)), + LabeledVector(18.0609, DenseVector(4.3073)), + LabeledVector(-2.2090, DenseVector(2.1842)), + LabeledVector(1.1306, DenseVector(3.6013)), + LabeledVector(7.1903, DenseVector(0.6385)), + LabeledVector(-0.2668, DenseVector(1.8979)), + LabeledVector(12.2281, DenseVector(4.1208)), + LabeledVector(0.6086, DenseVector(3.5649)), + LabeledVector(18.4202, DenseVector(4.3177)), + LabeledVector(-4.1284, DenseVector(2.9508)), + LabeledVector(6.1964, DenseVector(0.1607)), + LabeledVector(4.9638, DenseVector(3.8211)), + LabeledVector(14.6677, DenseVector(4.2030)), + LabeledVector(-3.8132, DenseVector(3.0543)), + LabeledVector(-1.2891, DenseVector(3.4098)), + LabeledVector(-1.9390, DenseVector(3.3441)), + LabeledVector(0.7293, DenseVector(1.7650)), + LabeledVector(-4.1310, DenseVector(2.9497)), + LabeledVector(6.9131, DenseVector(0.7703)), + LabeledVector(-3.2060, DenseVector(3.1772)), + LabeledVector(6.0899, DenseVector(0.1432)), + LabeledVector(4.5567, DenseVector(1.2462)), + LabeledVector(6.4562, DenseVector(0.2078)), + LabeledVector(7.1903, DenseVector(0.4371)), + LabeledVector(2.8017, DenseVector(3.7056)), + LabeledVector(-3.4873, DenseVector(3.1267)), + LabeledVector(3.2918, DenseVector(1.4269)), + LabeledVector(17.0085, DenseVector(4.2760)), + LabeledVector(6.1622, DenseVector(0.1550)), + LabeledVector(-0.8192, DenseVector(1.9743)), + LabeledVector(1.0957, DenseVector(1.7170)), + LabeledVector(-0.9065, DenseVector(3.4448)), + LabeledVector(0.7986, DenseVector(3.5784)), + LabeledVector(6.6861, DenseVector(0.8409)), + LabeledVector(-2.3274, DenseVector(2.2039)), + LabeledVector(-1.0359, DenseVector(2.0051)), + LabeledVector(-4.2092, DenseVector(2.9084)), + LabeledVector(-3.1140, DenseVector(3.1921)), + LabeledVector(-1.4323, DenseVector(3.3961)) + ) + + val expectedRegWeights = Array[Double](0.0, 0.0, 0.0, 0.18, 0.2, 0.24) + val expectedRegWeight0 = 0.74 + + // Example values from scikit-learn L1 test: http://git.io/vf4V2 + val regularizationData: Seq[LabeledVector] = Seq( + LabeledVector(1.0, DenseVector(1.0,0.9 ,0.8 ,0.0 ,0.0 ,0.0)), + LabeledVector(1.0, DenseVector(1.0,0.84,0.98,0.0 ,0.0 ,0.0)), + LabeledVector(1.0, DenseVector(1.0,0.96,0.88,0.0 ,0.0 ,0.0)), + LabeledVector(1.0, DenseVector(1.0,0.91,0.99,0.0 ,0.0 ,0.0)), + LabeledVector(2.0, DenseVector(0.0,0.0 ,0.0 ,0.89,0.91,1.0)), + LabeledVector(2.0, DenseVector(0.0,0.0 ,0.0 ,0.79,0.84,1.0)), + LabeledVector(2.0, DenseVector(0.0,0.0 ,0.0 ,0.91,0.95,1.0)), + LabeledVector(2.0, DenseVector(0.0,0.0 ,0.0 ,0.93,1.0 ,1.0)) + ) +}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml new file mode 100644 index 0000000..19951cb --- /dev/null +++ b/flink-libraries/flink-table/pom.xml @@ -0,0 +1,258 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-libraries</artifactId> + <version>1.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-table</artifactId> + <name>flink-table</name> + + <packaging>jar</packaging> + + <dependencies> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-scala</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-examples-batch</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-compiler</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.codehaus.janino</groupId> + <artifactId>janino</artifactId> + <version>2.7.5</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-tests</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> + </dependency> + + </dependencies> + + <build> + <plugins> + <!-- Scala Compiler --> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.1.4</version> + <executions> + <!-- Run scala compiler in the process-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) compile phase --> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + + <!-- Run scala compiler in the process-test-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) test-compile phase --> + <execution> + <id>scala-test-compile</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <jvmArgs> + <jvmArg>-Xms128m</jvmArg> + <jvmArg>-Xmx512m</jvmArg> + </jvmArgs> + <compilerPlugins combine.children="append"> + <compilerPlugin> + <groupId>org.scalamacros</groupId> + <artifactId>paradise_${scala.version}</artifactId> + <version>${scala.macros.version}</version> + </compilerPlugin> + </compilerPlugins> + </configuration> + </plugin> + + <!-- Eclipse Integration --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-eclipse-plugin</artifactId> + <version>2.8</version> + <configuration> + <downloadSources>true</downloadSources> + <projectnatures> + <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> + <projectnature>org.eclipse.jdt.core.javanature</projectnature> + </projectnatures> + <buildcommands> + <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> + </buildcommands> + <classpathContainers> + <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> + <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> + </classpathContainers> + <excludes> + <exclude>org.scala-lang:scala-library</exclude> + <exclude>org.scala-lang:scala-compiler</exclude> + </excludes> + <sourceIncludes> + <sourceInclude>**/*.scala</sourceInclude> + <sourceInclude>**/*.java</sourceInclude> + </sourceIncludes> + </configuration> + </plugin> + + <!-- Adding scala source directories to build path --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.7</version> + <executions> + <!-- Add src/main/scala to eclipse build path --> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/scala</source> + </sources> + </configuration> + </execution> + <!-- Add src/test/scala to eclipse build path --> + <execution> + <id>add-test-source</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>src/test/scala</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.scalastyle</groupId> + <artifactId>scalastyle-maven-plugin</artifactId> + <version>0.5.0</version> + <executions> + <execution> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + <configuration> + <verbose>false</verbose> + <failOnViolation>true</failOnViolation> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + <failOnWarning>false</failOnWarning> + <sourceDirectory>${basedir}/src/main/scala</sourceDirectory> + <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory> + <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation> + <outputFile>${project.basedir}/scalastyle-output.xml</outputFile> + <outputEncoding>UTF-8</outputEncoding> + </configuration> + </plugin> + + </plugins> + </build> + + <profiles> + <profile> + <id>scala-2.10</id> + <activation> + <property> + <!-- this is the default scala profile --> + <name>!scala-2.11</name> + </property> + </activation> + <dependencies> + <dependency> + <groupId>org.scalamacros</groupId> + <artifactId>quasiquotes_${scala.binary.version}</artifactId> + <version>${scala.macros.version}</version> + </dependency> + </dependencies> + </profile> + </profiles> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java new file mode 100644 index 0000000..97113bb --- /dev/null +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <strong>Table API (Java)</strong><br> + * + * {@link org.apache.flink.api.java.table.TableEnvironment} can be used to create a + * {@link org.apache.flink.api.table.Table} from a {@link org.apache.flink.api.java.DataSet} + * or {@link org.apache.flink.streaming.api.datastream.DataStream}. + * + * <p> + * This can be used to perform SQL-like queries on data. Please have + * a look at {@link org.apache.flink.api.table.Table} to see which operations are supported and + * how query strings are written. + * + * <p> + * Example: + * + * <pre>{@code + * ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); + * + * DataSet<WC> input = env.fromElements( + * new WC("Hello", 1), + * new WC("Ciao", 1), + * new WC("Hello", 1)); + * + * Table table = TableUtil.from(input); + * + * Table filtered = table + * .groupBy("word") + * .select("word.count as count, word") + * .filter("count = 2"); + * + * DataSet<WC> result = TableUtil.toSet(filtered, WC.class); + * + * result.print(); + * env.execute(); + * }</pre> + * + * <p> + * As seen above, a {@link org.apache.flink.api.table.Table} can be converted back to the + * underlying API representation using {@link org.apache.flink.api.java.table.TableEnvironment#toDataSet(org.apache.flink.api.table.Table, java.lang.Class)} + * or {@link org.apache.flink.api.java.table.TableEnvironment#toDataStream(org.apache.flink.api.table.Table, java.lang.Class)}}. + */ +package org.apache.flink.api.java.table; http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/package-info.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/package-info.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/package-info.java new file mode 100644 index 0000000..d7fbc8e --- /dev/null +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/package-info.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <strong>Table API</strong><br> + * + * This package contains the generic part of the Table API. It can be used with Flink Streaming + * and Flink Batch. From Scala as well as from Java. + * + * When using the Table API, as user creates a [[org.apache.flink.api.table.Table]] from + * a DataSet or DataStream. On this relational operations can be performed. A table can also + * be converted back to a DataSet or DataStream. + * + * Packages [[org.apache.flink.api.scala.table]] and [[org.apache.flink.api.java.table]] contain + * the language specific part of the API. Refer to these packages for documentation on how + * the Table API can be used in Java and Scala. + */ +package org.apache.flink.api.table; http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java new file mode 100644 index 0000000..c043508 --- /dev/null +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.java; + + +import org.apache.flink.api.table.Table; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; + +/** + * Very simple example that shows how the Java Table API can be used. + */ +public class JavaTableExample { + + public static class WC { + public String word; + public int count; + + // Public constructor to make it a Flink POJO + public WC() { + + } + + public WC(String word, int count) { + this.word = word; + this.count = count; + } + + @Override + public String toString() { + return "WC " + word + " " + count; + } + } + + public static void main(String[] args) throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<WC> input = env.fromElements( + new WC("Hello", 1), + new WC("Ciao", 1), + new WC("Hello", 1)); + + Table table = tableEnv.fromDataSet(input); + + Table filtered = table + .groupBy("word") + .select("word.count as count, word") + .filter("count = 2"); + + DataSet<WC> result = tableEnv.toDataSet(filtered, WC.class); + + result.print(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala new file mode 100644 index 0000000..9dc9297 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table + +import java.lang.reflect.Modifier + +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.aggregation.AggregationFunction +import org.apache.flink.api.java.operators.JoinOperator.EquiJoin +import org.apache.flink.api.java.operators.Keys.ExpressionKeys +import org.apache.flink.api.java.operators.{GroupReduceOperator, Keys, MapOperator, UnsortedGrouping} +import org.apache.flink.api.java.{DataSet => JavaDataSet} +import org.apache.flink.api.table.expressions.analysis.ExtractEquiJoinFields +import org.apache.flink.api.table.plan._ +import org.apache.flink.api.table.runtime._ +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.typeinfo.{RenameOperator, RenamingProxyTypeInfo, RowTypeInfo} +import org.apache.flink.api.table.{ExpressionException, Row, Table} + +/** + * [[PlanTranslator]] for creating [[Table]]s from Java [[org.apache.flink.api.java.DataSet]]s and + * translating them back to Java [[org.apache.flink.api.java.DataSet]]s. + */ +class JavaBatchTranslator extends PlanTranslator { + + type Representation[A] = JavaDataSet[A] + + override def createTable[A]( + repr: Representation[A], + inputType: CompositeType[A], + expressions: Array[Expression], + resultFields: Seq[(String, TypeInformation[_])]): Table = { + + val rowDataSet = createSelect(expressions, repr, inputType) + + Table(Root(rowDataSet, resultFields)) + } + + override def translate[A](op: PlanNode)(implicit tpe: TypeInformation[A]): JavaDataSet[A] = { + + if (tpe.getTypeClass == classOf[Row]) { + // shortcut for DataSet[Row] + return translateInternal(op).asInstanceOf[JavaDataSet[A]] + } + + val clazz = tpe.getTypeClass + if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) { + throw new ExpressionException("Cannot create DataSet of type " + + clazz.getName + ". Only top-level classes or static member classes are supported.") + } + + + if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) { + throw new ExpressionException( + "A Table can only be converted to composite types, type is: " + + implicitly[TypeInformation[A]] + + ". Composite types would be tuples, case classes and POJOs.") + } + + val resultSet = translateInternal(op) + + val resultType = resultSet.getType.asInstanceOf[RowTypeInfo] + + val outputType = implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]] + + val resultNames = resultType.getFieldNames + val outputNames = outputType.getFieldNames.toSeq + + if (resultNames.toSet != outputNames.toSet) { + throw new ExpressionException(s"Expression result type $resultType does not have the same " + + s"fields as output type $outputType") + } + + for (f <- outputNames) { + val in = resultType.getTypeAt(resultType.getFieldIndex(f)) + val out = outputType.getTypeAt(outputType.getFieldIndex(f)) + if (!in.equals(out)) { + throw new ExpressionException(s"Types for field $f differ on input $resultType and " + + s"output $outputType.") + } + } + + val outputFields = outputNames map { + f => ResolvedFieldReference(f, resultType.getTypeAt(f)) + } + + val function = new ExpressionSelectFunction( + resultSet.getType.asInstanceOf[RowTypeInfo], + outputType, + outputFields) + + val opName = s"select(${outputFields.mkString(",")})" + val operator = new MapOperator(resultSet, outputType, function, opName) + + operator + } + + private def translateInternal(op: PlanNode): JavaDataSet[Row] = { + op match { + case Root(dataSet: JavaDataSet[Row], resultFields) => + dataSet + + case Root(_, _) => + throw new ExpressionException("Invalid Root for JavaBatchTranslator: " + op + ". " + + "Did you try converting a Table based on a DataSet to a DataStream or vice-versa?") + + case GroupBy(_, fields) => + throw new ExpressionException("Dangling GroupBy operation. Did you forget a " + + "SELECT statement?") + + case As(input, newNames) => + val translatedInput = translateInternal(input) + val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] + val proxyType = new RenamingProxyTypeInfo[Row](inType, newNames.toArray) + new RenameOperator(translatedInput, proxyType) + + case sel@Select(Filter(Join(leftInput, rightInput), predicate), selection) => + + val expandedInput = ExpandAggregations(sel) + + if (expandedInput.eq(sel)) { + val translatedLeftInput = translateInternal(leftInput) + val translatedRightInput = translateInternal(rightInput) + val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] + val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] + + createJoin( + predicate, + selection, + translatedLeftInput, + translatedRightInput, + leftInType, + rightInType, + JoinHint.OPTIMIZER_CHOOSES) + } else { + translateInternal(expandedInput) + } + + case Filter(Join(leftInput, rightInput), predicate) => + val translatedLeftInput = translateInternal(leftInput) + val translatedRightInput = translateInternal(rightInput) + val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] + val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] + + createJoin( + predicate, + leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) ++ + rightInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)), + translatedLeftInput, + translatedRightInput, + leftInType, + rightInType, + JoinHint.OPTIMIZER_CHOOSES) + + case Join(leftInput, rightInput) => + throw new ExpressionException("Join without filter condition encountered. " + + "Did you forget to add .where(...) ?") + + case sel@Select(input, selection) => + + val expandedInput = ExpandAggregations(sel) + + if (expandedInput.eq(sel)) { + val translatedInput = input match { + case GroupBy(groupByInput, groupExpressions) => + val translatedGroupByInput = translateInternal(groupByInput) + val inType = translatedGroupByInput.getType.asInstanceOf[CompositeType[Row]] + + val keyIndices = groupExpressions map { + case fe: ResolvedFieldReference => inType.getFieldIndex(fe.name) + case e => + throw new ExpressionException(s"Expression $e is not a valid key expression.") + } + + val keys = new Keys.ExpressionKeys(keyIndices.toArray, inType, false) + val grouping = new UnsortedGrouping(translatedGroupByInput, keys) + + new GroupReduceOperator( + grouping, + inType, + new NoExpressionAggregateFunction(), + "Nop Expression Aggregation") + + case _ => translateInternal(input) + } + + val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] + val inputFields = inType.getFieldNames + createSelect( + selection, + translatedInput, + inType) + } else { + translateInternal(expandedInput) + } + + case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) => + val translatedInput = translateInternal(input) + val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] + + val keyIndices = groupExpressions map { + case fe: ResolvedFieldReference => inType.getFieldIndex(fe.name) + case e => throw new ExpressionException(s"Expression $e is not a valid key expression.") + } + + val keys = new Keys.ExpressionKeys(keyIndices.toArray, inType, false) + + val grouping = new UnsortedGrouping(translatedInput, keys) + + val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map { + case (fieldName, fun) => + fun.getFactory.createAggregationFunction[Any]( + inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass) + } + + val aggIndices = aggregations map { + case (fieldName, _) => + inType.getFieldIndex(fieldName) + } + + val result = new GroupReduceOperator( + grouping, + inType, + new ExpressionAggregateFunction(aggIndices, aggFunctions), + "Expression Aggregation: " + agg) + + result + + case agg@Aggregate(input, aggregations) => + val translatedInput = translateInternal(input) + val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] + + val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map { + case (fieldName, fun) => + fun.getFactory.createAggregationFunction[Any]( + inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass) + } + + val aggIndices = aggregations map { + case (fieldName, _) => + inType.getFieldIndex(fieldName) + } + + val result = new GroupReduceOperator( + translatedInput, + inType, + new ExpressionAggregateFunction(aggIndices, aggFunctions), + "Expression Aggregation: " + agg) + + result + + + case Filter(input, predicate) => + val translatedInput = translateInternal(input) + val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] + val filter = new ExpressionFilterFunction[Row](predicate, inType) + translatedInput.filter(filter).name(predicate.toString) + + case uni@UnionAll(left, right) => + val translatedLeft = translateInternal(left) + val translatedRight = translateInternal(right) + translatedLeft.union(translatedRight).name("Union: " + uni) + } + } + + private def createSelect[I]( + fields: Seq[Expression], + input: JavaDataSet[I], + inputType: CompositeType[I]): JavaDataSet[Row] = { + + fields foreach { + f => + if (f.exists(_.isInstanceOf[Aggregation])) { + throw new ExpressionException("Found aggregate in " + fields.mkString(", ") + ".") + } + + } + + val resultType = new RowTypeInfo(fields) + + val function = new ExpressionSelectFunction(inputType, resultType, fields) + + val opName = s"select(${fields.mkString(",")})" + val operator = new MapOperator(input, resultType, function, opName) + + operator + } + + private def createJoin[L, R]( + predicate: Expression, + fields: Seq[Expression], + leftInput: JavaDataSet[L], + rightInput: JavaDataSet[R], + leftType: CompositeType[L], + rightType: CompositeType[R], + joinHint: JoinHint): JavaDataSet[Row] = { + + val resultType = new RowTypeInfo(fields) + + val (reducedPredicate, leftFields, rightFields) = + ExtractEquiJoinFields(leftType, rightType, predicate) + + if (leftFields.isEmpty || rightFields.isEmpty) { + throw new ExpressionException("Could not derive equi-join predicates " + + "for predicate " + predicate + ".") + } + + val leftKey = new ExpressionKeys[L](leftFields, leftType) + val rightKey = new ExpressionKeys[R](rightFields, rightType) + + val joiner = new ExpressionJoinFunction[L, R, Row]( + reducedPredicate, + leftType, + rightType, + resultType, + fields) + + new EquiJoin[L, R, Row]( + leftInput, + rightInput, + leftKey, + rightKey, + joiner, + resultType, + joinHint, + predicate.toString) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala new file mode 100644 index 0000000..a37c892 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table + +import java.lang.reflect.Modifier +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.plan._ +import org.apache.flink.api.table.runtime.{ExpressionFilterFunction, ExpressionSelectFunction} +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.typeinfo.RowTypeInfo +import org.apache.flink.api.table.{ExpressionException, Row, Table} +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.operators.StreamMap + +/** + * [[PlanTranslator]] for creating [[Table]]s from Java [[DataStream]]s and + * translating them back to Java [[DataStream]]s. + * + * This is very limited right now. Only select and filter are implemented. Also, the expression + * operations must be extended to allow windowing operations. + */ + +class JavaStreamingTranslator extends PlanTranslator { + + type Representation[A] = DataStream[A] + + override def createTable[A]( + repr: Representation[A], + inputType: CompositeType[A], + expressions: Array[Expression], + resultFields: Seq[(String, TypeInformation[_])]): Table = { + + val rowDataStream = createSelect(expressions, repr, inputType) + + new Table(Root(rowDataStream, resultFields)) + } + + override def translate[A](op: PlanNode)(implicit tpe: TypeInformation[A]): DataStream[A] = { + + if (tpe.getTypeClass == classOf[Row]) { + // shortcut for DataSet[Row] + return translateInternal(op).asInstanceOf[DataStream[A]] + } + + val clazz = tpe.getTypeClass + if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) { + throw new ExpressionException("Cannot create DataStream of type " + + clazz.getName + ". Only top-level classes or static member classes are supported.") + } + + if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) { + throw new ExpressionException( + "A Table can only be converted to composite types, type is: " + + implicitly[TypeInformation[A]] + + ". Composite types would be tuples, case classes and POJOs.") + + } + + val resultSet = translateInternal(op) + + val resultType = resultSet.getType.asInstanceOf[RowTypeInfo] + + val outputType = implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]] + + val resultNames = resultType.getFieldNames + val outputNames = outputType.getFieldNames.toSeq + + if (resultNames.toSet != outputNames.toSet) { + throw new ExpressionException(s"Expression result type $resultType does not have the same" + + s"fields as output type $outputType") + } + + for (f <- outputNames) { + val in = resultType.getTypeAt(resultType.getFieldIndex(f)) + val out = outputType.getTypeAt(outputType.getFieldIndex(f)) + if (!in.equals(out)) { + throw new ExpressionException(s"Types for field $f differ on input $resultType and " + + s"output $outputType.") + } + } + + val outputFields = outputNames map { + f => ResolvedFieldReference(f, resultType.getTypeAt(f)) + } + + val function = new ExpressionSelectFunction( + resultSet.getType.asInstanceOf[RowTypeInfo], + outputType, + outputFields) + + val opName = s"select(${outputFields.mkString(",")})" + + resultSet.transform(opName, outputType, new StreamMap[Row, A](function)) + } + + private def translateInternal(op: PlanNode): DataStream[Row] = { + op match { + case Root(dataSet: DataStream[Row], resultFields) => + dataSet + + case Root(_, _) => + throw new ExpressionException("Invalid Root for JavaStreamingTranslator: " + op + ". " + + "Did you try converting a Table based on a DataSet to a DataStream or vice-versa?") + + case GroupBy(_, fields) => + throw new ExpressionException("Dangling GroupBy operation. Did you forget a " + + "SELECT statement?") + + case As(input, newNames) => + throw new ExpressionException("As operation for Streams not yet implemented.") + + case sel@Select(Filter(Join(leftInput, rightInput), predicate), selection) => + + val expandedInput = ExpandAggregations(sel) + + if (expandedInput.eq(sel)) { + val translatedLeftInput = translateInternal(leftInput) + val translatedRightInput = translateInternal(rightInput) + val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] + val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] + + createJoin( + predicate, + selection, + translatedLeftInput, + translatedRightInput, + leftInType, + rightInType, + JoinHint.OPTIMIZER_CHOOSES) + } else { + translateInternal(expandedInput) + } + + case Filter(Join(leftInput, rightInput), predicate) => + val translatedLeftInput = translateInternal(leftInput) + val translatedRightInput = translateInternal(rightInput) + val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] + val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] + + createJoin( + predicate, + leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) ++ + rightInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)), + translatedLeftInput, + translatedRightInput, + leftInType, + rightInType, + JoinHint.OPTIMIZER_CHOOSES) + + case Join(leftInput, rightInput) => + throw new ExpressionException("Join without filter condition encountered. " + + "Did you forget to add .where(...) ?") + + case sel@Select(input, selection) => + + val expandedInput = ExpandAggregations(sel) + + if (expandedInput.eq(sel)) { + // no expansions took place + val translatedInput = translateInternal(input) + val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] + val inputFields = inType.getFieldNames + createSelect( + selection, + translatedInput, + inType) + } else { + translateInternal(expandedInput) + } + + case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) => + throw new ExpressionException("Aggregate operation for Streams not yet implemented.") + + case agg@Aggregate(input, aggregations) => + throw new ExpressionException("Aggregate operation for Streams not yet implemented.") + + case Filter(input, predicate) => + val translatedInput = translateInternal(input) + val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] + val filter = new ExpressionFilterFunction[Row](predicate, inType) + translatedInput.filter(filter) + + case UnionAll(left, right) => + val translatedLeft = translateInternal(left) + val translatedRight = translateInternal(right) + translatedLeft.union(translatedRight) + } + } + + private def createSelect[I]( + fields: Seq[Expression], + input: DataStream[I], + inputType: CompositeType[I]): DataStream[Row] = { + + fields foreach { + f => + if (f.exists(_.isInstanceOf[Aggregation])) { + throw new ExpressionException("Found aggregate in " + fields.mkString(", ") + ".") + } + + } + + val resultType = new RowTypeInfo(fields) + + val function = new ExpressionSelectFunction(inputType, resultType, fields) + + val opName = s"select(${fields.mkString(",")})" + + input.transform(opName, resultType, new StreamMap[I, Row](function)) + } + + private def createJoin[L, R]( + predicate: Expression, + fields: Seq[Expression], + leftInput: DataStream[L], + rightInput: DataStream[R], + leftType: CompositeType[L], + rightType: CompositeType[R], + joinHint: JoinHint): DataStream[Row] = { + + throw new ExpressionException("Join operation for Streams not yet implemented.") + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala new file mode 100644 index 0000000..5614031 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.table + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.table.Table +import org.apache.flink.streaming.api.datastream.DataStream + +/** + * Environment for working with the Table API. + * + * This can be used to convert [[DataSet]] or [[DataStream]] to a [[Table]] and back again. You + * can also use the provided methods to create a [[Table]] directly from a data source. + */ +class TableEnvironment { + + /** + * Transforms the given DataSet to a [[org.apache.flink.api.table.Table]]. + * The fields of the DataSet type are renamed to the given set of fields: + * + * Example: + * + * {{{ + * tableEnv.fromDataSet(set, "a, b") + * }}} + * + * This will transform the set containing elements of two fields to a table where the fields + * are named a and b. + */ + def fromDataSet[T](set: DataSet[T], fields: String): Table = { + new JavaBatchTranslator().createTable(set, fields) + } + + /** + * Transforms the given DataSet to a [[org.apache.flink.api.table.Table]]. + * The fields of the DataSet type are used to name the + * [[org.apache.flink.api.table.Table]] fields. + */ + def fromDataSet[T](set: DataSet[T]): Table = { + new JavaBatchTranslator().createTable(set) + } + + /** + * Transforms the given DataStream to a [[org.apache.flink.api.table.Table]]. + * The fields of the DataStream type are renamed to the given set of fields: + * + * Example: + * + * {{{ + * tableEnv.fromDataStream(set, "a, b") + * }}} + * + * This will transform the set containing elements of two fields to a table where the fields + * are named a and b. + */ + def fromDataStream[T](set: DataStream[T], fields: String): Table = { + new JavaStreamingTranslator().createTable(set, fields) + } + + /** + * Transforms the given DataStream to a [[org.apache.flink.api.table.Table]]. + * The fields of the DataStream type are used to name the + * [[org.apache.flink.api.table.Table]] fields. + */ + def fromDataStream[T](set: DataStream[T]): Table = { + new JavaStreamingTranslator().createTable(set) + } + + /** + * Converts the given [[org.apache.flink.api.table.Table]] to + * a DataSet. The given type must have exactly the same fields as the + * [[org.apache.flink.api.table.Table]]. That is, the names of the + * fields and the types must match. + */ + @SuppressWarnings(Array("unchecked")) + def toDataSet[T](table: Table, clazz: Class[T]): DataSet[T] = { + new JavaBatchTranslator().translate[T](table.operation)( + TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]]) + } + + /** + * Converts the given [[org.apache.flink.api.table.Table]] to + * a DataStream. The given type must have exactly the same fields as the + * [[org.apache.flink.api.table.Table]]. That is, the names of the + * fields and the types must match. + */ + @SuppressWarnings(Array("unchecked")) + def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = { + new JavaStreamingTranslator().translate[T](table.operation)( + TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]]) + + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala new file mode 100644 index 0000000..2508a3d --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.table + +import org.apache.flink.api.table._ +import org.apache.flink.api.table.expressions.{UnresolvedFieldReference, Expression} +import org.apache.flink.api.common.typeutils.CompositeType + +import org.apache.flink.api.scala._ + +/** + * Methods for converting a [[DataSet]] to a [[Table]]. A [[DataSet]] is + * wrapped in this by the implicit conversions in [[org.apache.flink.api.scala.table]]. + */ +class DataSetConversions[T](set: DataSet[T], inputType: CompositeType[T]) { + + /** + * Converts the [[DataSet]] to a [[Table]]. The field names can be specified like this: + * + * {{{ + * val in: DataSet[(String, Int)] = ... + * val table = in.as('a, 'b) + * }}} + * + * This results in a [[Table]] that has field `a` of type `String` and field `b` + * of type `Int`. + */ + def as(fields: Expression*): Table = { + new ScalaBatchTranslator().createTable(set, fields.toArray) + } + + /** + * Converts the [[DataSet]] to a [[Table]]. The field names will be taken from the field names + * of the input type. + * + * Example: + * + * {{{ + * val in: DataSet[(String, Int)] = ... + * val table = in.toTable + * }}} + * + * Here, the result is a [[Table]] that has field `_1` of type `String` and field `_2` + * of type `Int`. + */ + def toTable: Table = { + val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference) + as(resultFields: _*) + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala new file mode 100644 index 0000000..47bd100 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table + +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table._ +import org.apache.flink.api.table.expressions.{Expression, UnresolvedFieldReference} +import org.apache.flink.streaming.api.scala.DataStream + +class DataStreamConversions[T](stream: DataStream[T], inputType: CompositeType[T]) { + + /** + * Converts the [[DataStream]] to a [[Table]]. The field names can be specified like this: + * + * {{{ + * val in: DataSet[(String, Int)] = ... + * val table = in.as('a, 'b) + * }}} + * + * This results in a [[Table]] that has field `a` of type `String` and field `b` + * of type `Int`. + */ + + def as(fields: Expression*): Table = { + new ScalaStreamingTranslator().createTable( + stream, + fields.toArray, + checkDeterministicFields = true) + } + + /** + * Converts the [[DataStream]] to a [[Table]]. The field names will be taken from the field + * names of the input type. + * + * Example: + * + * {{{ + * val in: DataSet[(String, Int)] = ... + * val table = in.toTable + * }}} + * + * This results in a [[Table]] that has field `_1` of type `String` and field `_2` + * of type `Int`. + */ + + def toTable: Table = { + val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference) + as(resultFields: _*) + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala new file mode 100644 index 0000000..cdcf53e --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table + + +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.table.JavaBatchTranslator +import org.apache.flink.api.table.expressions.Expression +import org.apache.flink.api.scala.wrap +import org.apache.flink.api.table.plan._ +import org.apache.flink.api.table.Table +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.DataSet + +import scala.reflect.ClassTag + + +/** + * [[PlanTranslator]] for creating [[Table]]s from Scala [[DataSet]]s and + * translating them back to Scala [[DataSet]]s. + */ +class ScalaBatchTranslator extends PlanTranslator { + + private val javaTranslator = new JavaBatchTranslator + + type Representation[A] = DataSet[A] + + def createTable[A]( + repr: DataSet[A], + fields: Array[Expression]): Table = { + + val result = javaTranslator.createTable(repr.javaSet, fields) + + new Table(result.operation) + } + + override def translate[O](op: PlanNode)(implicit tpe: TypeInformation[O]): DataSet[O] = { + // fake it till you make it ... + wrap(javaTranslator.translate(op))(ClassTag.AnyRef.asInstanceOf[ClassTag[O]]) + } + + override def createTable[A]( + repr: Representation[A], + inputType: CompositeType[A], + expressions: Array[Expression], + resultFields: Seq[(String, TypeInformation[_])]): Table = { + + val result = javaTranslator.createTable(repr.javaSet, inputType, expressions, resultFields) + + Table(result.operation) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala new file mode 100644 index 0000000..88f1b83 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.table.JavaStreamingTranslator +import org.apache.flink.api.table.Table +import org.apache.flink.api.table.plan._ +import org.apache.flink.api.table.expressions.Expression +import org.apache.flink.streaming.api.scala.{DataStream, javaToScalaStream} + +/** + * [[PlanTranslator]] for creating [[Table]]s from Scala [[DataStream]]s and + * translating them back to Scala [[DataStream]]s. + * + * This is very limited right now. Only select and filter are implemented. Also, the expression + * operations must be extended to allow windowing operations. + */ +class ScalaStreamingTranslator extends PlanTranslator { + + private val javaTranslator = new JavaStreamingTranslator + + override type Representation[A] = DataStream[A] + + override def translate[O](op: PlanNode)(implicit tpe: TypeInformation[O]): DataStream[O] = { + // fake it till you make it ... + javaToScalaStream(javaTranslator.translate(op)) + } + + override def createTable[A]( + repr: Representation[A], + inputType: CompositeType[A], + expressions: Array[Expression], + resultFields: Seq[(String, TypeInformation[_])]): Table = { + + val result = + javaTranslator.createTable(repr.getJavaStream, inputType, expressions, resultFields) + + new Table(result.operation) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala new file mode 100644 index 0000000..4f2172e --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.api.table._ + +import org.apache.flink.streaming.api.scala.DataStream + +/** + * Methods for converting a [[Table]] to a [[DataSet]] or [[DataStream]]. A [[Table]] is + * wrapped in this by the implicit conversions in [[org.apache.flink.api.scala.table]]. + */ +class TableConversions(table: Table) { + + /** + * Converts the [[Table]] to a [[DataSet]]. + */ + def toDataSet[T: TypeInformation]: DataSet[T] = { + new ScalaBatchTranslator().translate[T](table.operation) + } + + /** + * Converts the [[Table]] to a [[DataStream]]. + */ + def toDataStream[T: TypeInformation]: DataStream[T] = { + new ScalaStreamingTranslator().translate[T](table.operation) + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala new file mode 100644 index 0000000..0be6be2 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.table + +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.common.typeinfo.TypeInformation + +import scala.language.implicitConversions + +/** + * These are all the operations that can be used to construct an [[Expression]] AST for expression + * operations. + * + * These operations must be kept in sync with the parser in + * [[org.apache.flink.api.table.parser.ExpressionParser]]. + */ +trait ImplicitExpressionOperations { + def expr: Expression + + def && (other: Expression) = And(expr, other) + def || (other: Expression) = Or(expr, other) + + def > (other: Expression) = GreaterThan(expr, other) + def >= (other: Expression) = GreaterThanOrEqual(expr, other) + def < (other: Expression) = LessThan(expr, other) + def <= (other: Expression) = LessThanOrEqual(expr, other) + + def === (other: Expression) = EqualTo(expr, other) + def !== (other: Expression) = NotEqualTo(expr, other) + + def unary_! = Not(expr) + def unary_- = UnaryMinus(expr) + + def isNull = IsNull(expr) + def isNotNull = IsNotNull(expr) + + def + (other: Expression) = Plus(expr, other) + def - (other: Expression) = Minus(expr, other) + def / (other: Expression) = Div(expr, other) + def * (other: Expression) = Mul(expr, other) + def % (other: Expression) = Mod(expr, other) + + def & (other: Expression) = BitwiseAnd(expr, other) + def | (other: Expression) = BitwiseOr(expr, other) + def ^ (other: Expression) = BitwiseXor(expr, other) + def unary_~ = BitwiseNot(expr) + + def abs = Abs(expr) + + def sum = Sum(expr) + def min = Min(expr) + def max = Max(expr) + def count = Count(expr) + def avg = Avg(expr) + + def substring(beginIndex: Expression, endIndex: Expression = Literal(Int.MaxValue)) = { + Substring(expr, beginIndex, endIndex) + } + + def cast(toType: TypeInformation[_]) = Cast(expr, toType) + + def as(name: Symbol) = Naming(expr, name.name) +} + +/** + * Implicit conversions from Scala Literals to Expression [[Literal]] and from [[Expression]] + * to [[ImplicitExpressionOperations]]. + */ +trait ImplicitExpressionConversions { + implicit class WithOperations(e: Expression) extends ImplicitExpressionOperations { + def expr = e + } + + implicit class SymbolExpression(s: Symbol) extends ImplicitExpressionOperations { + def expr = UnresolvedFieldReference(s.name) + } + + implicit class LiteralLongExpression(l: Long) extends ImplicitExpressionOperations { + def expr = Literal(l) + } + + implicit class LiteralIntExpression(i: Int) extends ImplicitExpressionOperations { + def expr = Literal(i) + } + + implicit class LiteralFloatExpression(f: Float) extends ImplicitExpressionOperations { + def expr = Literal(f) + } + + implicit class LiteralDoubleExpression(d: Double) extends ImplicitExpressionOperations { + def expr = Literal(d) + } + + implicit class LiteralStringExpression(str: String) extends ImplicitExpressionOperations { + def expr = Literal(str) + } + + implicit class LiteralBooleanExpression(bool: Boolean) extends ImplicitExpressionOperations { + def expr = Literal(bool) + } + + implicit def symbol2FieldExpression(sym: Symbol): Expression = UnresolvedFieldReference(sym.name) + implicit def int2Literal(i: Int): Expression = Literal(i) + implicit def long2Literal(l: Long): Expression = Literal(l) + implicit def double2Literal(d: Double): Expression = Literal(d) + implicit def float2Literal(d: Float): Expression = Literal(d) + implicit def string2Literal(str: String): Expression = Literal(str) + implicit def boolean2Literal(bool: Boolean): Expression = Literal(bool) +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala new file mode 100644 index 0000000..e74651b --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala + +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.{Row, Table} +import org.apache.flink.streaming.api.scala.DataStream + +import scala.language.implicitConversions + +/** + * == Table API (Scala) == + * + * Importing this package with: + * + * {{{ + * import org.apache.flink.api.scala.table._ + * }}} + * + * imports implicit conversions for converting a [[DataSet]] or [[DataStream]] to a + * [[Table]]. This can be used to perform SQL-like queries on data. Please have + * a look at [[Table]] to see which operations are supported and + * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]] to see how an + * expression can be specified. + * + * When writing a query you can use Scala Symbols to refer to field names. One would + * refer to field `a` by writing `'a`. Sometimes it is necessary to manually confert a + * Scala literal to an Expression Literal, in those cases use `Literal`, as in `Literal(3)`. + * + * Example: + * + * {{{ + * import org.apache.flink.api.scala._ + * import org.apache.flink.api.scala.table._ + * + * val env = ExecutionEnvironment.getExecutionEnvironment + * val input = env.fromElements(("Hello", 2), ("Hello", 5), ("Ciao", 3)) + * val result = input.as('word, 'count).groupBy('word).select('word, 'count.avg) + * result.print() + * + * env.execute() + * }}} + * + * A [[Table]] can be converted back to the underlying API + * representation using `as`: + * + * {{{ + * case class Word(word: String, count: Int) + * + * val result = in.select(...).as('word, 'count) + * val set = result.as[Word] + * }}} + */ +package object table extends ImplicitExpressionConversions { + + implicit def table2TableConversions(table: Table): TableConversions = { + new TableConversions(table) + } + + implicit def dataSet2DataSetConversions[T](set: DataSet[T]): DataSetConversions[T] = { + new DataSetConversions[T](set, set.getType.asInstanceOf[CompositeType[T]]) + } + + implicit def table2RowDataSet( + table: Table): DataSet[Row] = { + new ScalaBatchTranslator().translate[Row](table.operation) + } + + implicit def rowDataSet2Table( + rowDataSet: DataSet[Row]): Table = { + rowDataSet.toTable + } + + implicit def dataStream2DataSetConversions[T]( + stream: DataStream[T]): DataStreamConversions[T] = { + new DataStreamConversions[T]( + stream, + stream.getJavaStream.getType.asInstanceOf[CompositeType[T]]) + } + + implicit def table2RowDataStream( + table: Table): DataStream[Row] = { + new ScalaStreamingTranslator().translate[Row](table.operation) + } + + implicit def rowDataStream2Table( + rowDataStream: DataStream[Row]): Table = { + rowDataStream.toTable + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala new file mode 100644 index 0000000..51c0a4d --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table + +/** + * Exception for all errors occurring during expression evaluation. + */ +class ExpressionException(msg: String) extends RuntimeException(msg) http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala new file mode 100644 index 0000000..e3baab3 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table + +/** + * This is used for executing Table API operations. We use manually generated + * TypeInfo to check the field types and create serializers and comparators. + */ +class Row(arity: Int) extends Product { + + private val fields = new Array[Any](arity) + + def productArity = fields.length + + def productElement(i: Int): Any = fields(i) + + def setField(i: Int, value: Any): Unit = fields(i) = value + + def canEqual(that: Any) = false + + override def toString = fields.mkString(",") + +}