dawidwys commented on a change in pull request #8050: [FLINK-11067][table] 
Convert TableEnvironments to interfaces
URL: https://github.com/apache/flink/pull/8050#discussion_r276213718
 
 

 ##########
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
 ##########
 @@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.StreamQueryConfig;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
+import org.apache.flink.table.descriptors.StreamTableDescriptor;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.TableFunction;
+
+import java.lang.reflect.Constructor;
+
+/**
+ * The {@link TableEnvironment} for a Java {@link StreamExecutionEnvironment} 
that works with
+ * {@link DataStream}s.
+ *
+ * <p>A TableEnvironment can be used to:
+ * <ul>
+ *     <li>convert a {@link DataStream} to a {@link Table}</li>
+ *     <li>register a {@link DataStream} in the {@link TableEnvironment}'s 
catalog</li>
+ *     <li>register a {@link Table} in the {@link TableEnvironment}'s 
catalog</li>
+ *     <li>scan a registered table to obtain a {@link Table}</li>
+ *     <li>specify a SQL query on registered tables to obtain a {@link 
Table}</li>
+ *     <li>convert a {@link Table} into a {@link DataStream}</li>
+ *     <li>explain the AST and execution plan of a {@link Table}</li>
+ * </ul>
+ */
+@PublicEvolving
+public interface StreamTableEnvironment extends TableEnvironment {
+
+       /**
+        * Registers a {@link TableFunction} under a unique name in the 
TableEnvironment's catalog.
+        * Registered functions can be referenced in Table API and SQL queries.
+        *
+        * @param name The name under which the function is registered.
+        * @param tableFunction The TableFunction to register.
+        * @tparam T The type of the output row.
+        */
+       <T> void registerFunction(String name, TableFunction<T> tableFunction);
+
+       /**
+        * Registers an {@link AggregateFunction} under a unique name in the 
TableEnvironment's catalog.
+        * Registered functions can be referenced in Table API and SQL queries.
+        *
+        * @param name The name under which the function is registered.
+        * @param aggregateFunction The AggregateFunction to register.
+        * @tparam T The type of the output value.
+        * @tparam ACC The type of aggregate accumulator.
+        */
+       <T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> 
aggregateFunction);
+
+       /**
+        * Converts the given {@link DataStream} into a {@link Table}.
+        *
+        * The field names of the {@link Table} are automatically derived from 
the type of the
+        * {@link DataStream}.
+        *
+        * @param dataStream The {@link DataStream} to be converted.
+        * @tparam T The type of the {@link DataStream}.
+        * @return The converted {@link Table}.
+        */
+       <T> Table fromDataStream(DataStream<T> dataStream);
+
+       /**
+        * Converts the given {@link DataStream} into a {@link Table} with 
specified field names.
+        *
+        * Example:
+        *
+        * {{{
+        *   DataStream<Tuple2<String, Long>> stream = ...
+        *   Table tab = tableEnv.fromDataStream(stream, "a, b");
+        * }}}
+        *
+        * @param dataStream The {@link DataStream} to be converted.
+        * @param fields The field names of the resulting {@link Table}.
+        * @tparam T The type of the {@link DataStream}.
+        * @return The converted {@link Table}.
+        */
+       <T> Table fromDataStream(DataStream<T> dataStream, String fields);
+
+       /**
+        * Registers the given {@link DataStream} as table in the {@link 
TableEnvironment}'s catalog.
+        * Registered tables can be referenced in SQL queries.
+        *
+        * The field names of the {@link Table} are automatically derived
+        * from the type of the {@link DataStream}.
+        *
+        * @param name The name under which the {@link DataStream} is 
registered in the catalog.
+        * @param dataStream The {@link DataStream} to register.
+        * @tparam T The type of the {@link DataStream} to register.
+        */
+       <T> void registerDataStream(String name, DataStream<T> dataStream);
+
+       /**
+        * Registers the given {@link DataStream} as table with specified field 
names in the
+        * {@link TableEnvironment}'s catalog.
+        * Registered tables can be referenced in SQL queries.
+        *
+        * Example:
+        *
+        * {{{
+        *   DataStream<Tuple2<String, Long>> set = ...
+        *   tableEnv.registerDataStream("myTable", set, "a, b")
+        * }}}
+        *
+        * @param name The name under which the {@link DataStream} is 
registered in the catalog.
+        * @param dataStream The {@link DataStream} to register.
+        * @param fields The field names of the registered table.
+        * @tparam T The type of the {@link DataStream} to register.
+        */
+       <T> void registerDataStream(String name, DataStream<T> dataStream, 
String fields);
+
+       /**
+        * Converts the given {@link Table} into an append {@link DataStream} 
of a specified type.
+        *
+        * The {@link Table} must only have insert (append) changes. If the 
{@link Table} is also modified
+        * by update or delete changes, the conversion will fail.
+        *
+        * <p>The fields of the {@link Table} are mapped to {@link DataStream} 
fields as follows:
+        * <ul>
+        *     <li>{@link org.apache.flink.types.Row} and {@link 
org.apache.flink.api.java.tuple.Tuple}
+        *     types: Fields are mapped by position, field types must 
match.</li>
+        *     <li>POJO {@link DataStream} types: Fields are mapped by field 
name, field types must match.</li>
+        * </ul>
+        *
+        * @param table The {@link Table} to convert.
+        * @param clazz The class of the type of the resulting {@link 
DataStream}.
+        * @tparam T The type of the resulting {@link DataStream}.
+        * @return The converted {@link DataStream}.
+        */
+       <T> DataStream<T> toAppendStream(Table table, Class<T> clazz);
+
+       /**
+        * Converts the given {@link Table} into an append {@link DataStream} 
of a specified type.
+        *
+        * The {@link Table} must only have insert (append) changes. If the 
{@link Table} is also modified
+        * by update or delete changes, the conversion will fail.
+        *
+        * <p>The fields of the {@link Table} are mapped to {@link DataStream} 
fields as follows:
+        * <ul>
+        *     <li>{@link org.apache.flink.types.Row} and {@link 
org.apache.flink.api.java.tuple.Tuple}
+        *     types: Fields are mapped by position, field types must 
match.</li>
+        *     <li>POJO {@link DataStream} types: Fields are mapped by field 
name, field types must match.</li>
+        * </ul>
+        *
+        * @param table The {@link Table} to convert.
+        * @param typeInfo The {@link TypeInformation} that specifies the type 
of the {@link DataStream}.
+        * @tparam T The type of the resulting {@link DataStream}.
+        * @return The converted {@link DataStream}.
+        */
+       <T> DataStream<T> toAppendStream(Table table, TypeInformation<T> 
typeInfo);
+
+       /**
+        * Converts the given {@link Table} into an append {@link DataStream} 
of a specified type.
+        *
+        * The {@link Table} must only have insert (append) changes. If the 
{@link Table} is also modified
+        * by update or delete changes, the conversion will fail.
+        *
+        * <p>The fields of the {@link Table} are mapped to {@link DataStream} 
fields as follows:
+        * <ul>
+        *     <li>{@link org.apache.flink.types.Row} and {@link 
org.apache.flink.api.java.tuple.Tuple}
+        *     types: Fields are mapped by position, field types must 
match.</li>
+        *     <li>POJO {@link DataStream} types: Fields are mapped by field 
name, field types must match.</li>
+        * </ul>
+        *
+        * @param table The {@link Table} to convert.
+        * @param clazz The class of the type of the resulting {@link 
DataStream}.
+        * @param queryConfig The configuration of the query to generate.
+        * @tparam T The type of the resulting {@link DataStream}.
+        * @return The converted {@link DataStream}.
+        */
+       <T> DataStream<T> toAppendStream(Table table, Class<T> clazz, 
StreamQueryConfig queryConfig);
+
+       /**
+        * Converts the given {@link Table} into an append {@link DataStream} 
of a specified type.
+        *
+        * The {@link Table} must only have insert (append) changes. If the 
{@link Table} is also modified
+        * by update or delete changes, the conversion will fail.
+        *
+        * <p>The fields of the {@link Table} are mapped to {@link DataStream} 
fields as follows:
+        * <ul>
+        *     <li>{@link org.apache.flink.types.Row} and {@link 
org.apache.flink.api.java.tuple.Tuple}
+        *     types: Fields are mapped by position, field types must 
match.</li>
+        *     <li>POJO {@link DataStream} types: Fields are mapped by field 
name, field types must match.</li>
+        * </ul>
+        *
+        * @param table The {@link Table} to convert.
+        * @param typeInfo The {@link TypeInformation} that specifies the type 
of the {@link DataStream}.
+        * @param queryConfig The configuration of the query to generate.
+        * @tparam T The type of the resulting {@link DataStream}.
+        * @return The converted {@link DataStream}.
+        */
+       <T> DataStream<T> toAppendStream(Table table, TypeInformation<T> 
typeInfo, StreamQueryConfig queryConfig);
+
+       /**
+        * Converts the given {@link Table} into a {@link DataStream} of add 
and retract messages.
+        * The message will be encoded as [[JTuple2]]. The first field is a 
{@link Boolean} flag,
 
 Review comment:
   `[[JTuple2]]` to `{@link JTuple2}`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to