Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4256#discussion_r125538537
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -351,13 +351,109 @@ val windowedTable = tEnv
     Query Configuration
     -------------------
     
    -In stream processing, compuations are constantly happening and there are 
many use cases that require to update previously emitted results. There are 
many ways in which a query can compute and emit updates. These do not affect 
the semantics of the query but might lead to approximated results. 
    +Table API and SQL queries have the same semantics regardless whether their 
input is bounded batch input or unbounded stream input. In many cases, 
continuous queries on streaming input are capable of computing accurate results 
that are identical to offline computed results. However, this is not possible 
in general case because continuous queries have to restrict the size of state 
they maintain in order to avoid to run out of storage and to be able to process 
unbounded streaming data over a long period of time. Consequently, a continuous 
query might only be able to provide approximated results depending on the 
characteristics of the input data and the query itself.
     
    -Flink's Table API and SQL interface use a `QueryConfig` to control the 
computation and emission of results and updates.
    +Flink's Table API and SQL interface provide parameters to tune the 
accuracy and resource consumption of continuous queries. The parameters are 
specified via a `QueryConfig` object. The `QueryConfig` can be obtained from 
the `TableEnvironment` and is passed back when a `Table` is translated, i.e., 
when it is [transformed into a 
DataStream](common.html#convert-a-table-into-a-datastream-or-dataset) or 
[emitted via a TableSink](common.html#emit-a-table).
     
    -### State Retention
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
    +
    +// obtain query configuration from TableEnvironment
    +StreamQueryConfig qConfig = tableEnv.queryConfig();
    +// set query parameters
    +qConfig.withIdleStateRetentionTime(Time.hours(12));
    +...
    +
    +// define query
    +Table result = ...
    +
    +// emit result Table via a TableSink
    +result.writeToSink(sink, qConfig);
    +
    +// convert result Table into a DataStream<Row>
    +DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, 
qConfig);
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +val tableEnv = TableEnvironment.getTableEnvironment(env)
    +
    +// obtain query configuration from TableEnvironment
    +val qConfig: StreamQueryConfig = tableEnv.queryConfig
    +// set query parameters
    +qConfig.withIdleStateRetentionTime(Time.hours(12))
    +...
    +
    +// define query
    +val result: Table = ???
    +
    +// emit result Table via a TableSink
    +result.writeToSink(sink, qConfig)
    +
    +// convert result Table into a DataStream
    --- End diff --
    
    DataStream -> DataStream[Row] ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to