alpinegizmo commented on a change in pull request #14292:
URL: https://github.com/apache/flink/pull/14292#discussion_r535987359



##########
File path: docs/dev/table/streaming/time_attributes.md
##########
@@ -22,213 +22,42 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Flink is able to process streaming data based on different notions of *time*. 
+Flink can process data based on different notions of time. 
 
-- *Processing time* refers to the system time of the machine (also known as 
"wall-clock time") that is executing the respective operation.
-- *Event time* refers to the processing of streaming data based on timestamps 
which are attached to each row. The timestamps can encode when an event 
happened.
-- *Ingestion time* is the time that events enter Flink; internally, it is 
treated similarly to event time.
+*Processing time* refers to the machine's system time (also known as 
"wall-clock time") that is executing the respective operation.

Review comment:
       ```suggestion
   - *Processing time* refers to the machine's system time (also known as 
"wall-clock time") that is executing the respective operation.
   ```

##########
File path: docs/dev/table/streaming/time_attributes.md
##########
@@ -22,213 +22,42 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Flink is able to process streaming data based on different notions of *time*. 
+Flink can process data based on different notions of time. 
 
-- *Processing time* refers to the system time of the machine (also known as 
"wall-clock time") that is executing the respective operation.
-- *Event time* refers to the processing of streaming data based on timestamps 
which are attached to each row. The timestamps can encode when an event 
happened.
-- *Ingestion time* is the time that events enter Flink; internally, it is 
treated similarly to event time.
+*Processing time* refers to the machine's system time (also known as 
"wall-clock time") that is executing the respective operation.
+- *Event time* refers to the processing of streaming data based on timestamps 
that are attached to each row. The timestamps can encode when an event happened.
 
-For more information about time handling in Flink, see the introduction about 
[Event Time and Watermarks]({% link dev/event_time.md %}).
+For more information about time handling in Flink, see the introduction about 
[event time and watermarks]({% link dev/event_time.md %}).
 
-This page explains how time attributes can be defined for time-based 
operations in Flink's Table API & SQL.
 
 * This will be replaced by the TOC
 {:toc}
 
 Introduction to Time Attributes
 -------------------------------
 
-Time-based operations such as windows in both the [Table API]({% link 
dev/table/tableApi.md %}#group-windows) and [SQL]({% link 
dev/table/sql/queries.md %}#group-windows) require information about the notion 
of time and its origin. Therefore, tables can offer *logical time attributes* 
for indicating time and accessing corresponding timestamps in table programs.
+Time attributes can be part of every table schema.
+They are defined when creating a table from a `CREATE TABLE DDL` or a 
`DataStream`. 
+Once a time attribute is defined, it can be referenced as a field and used in 
time-based operations.
+As long as a time attribute is not modified and forwarded from one part of the 
query to another, it remains valid.

Review comment:
       ```suggestion
   As long as a time attribute is not modified, and is simply forwarded from 
one part of a query to another, it remains a valid time attribute. 
   ```

##########
File path: docs/dev/table/streaming/time_attributes.md
##########
@@ -22,213 +22,42 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Flink is able to process streaming data based on different notions of *time*. 
+Flink can process data based on different notions of time. 
 
-- *Processing time* refers to the system time of the machine (also known as 
"wall-clock time") that is executing the respective operation.
-- *Event time* refers to the processing of streaming data based on timestamps 
which are attached to each row. The timestamps can encode when an event 
happened.
-- *Ingestion time* is the time that events enter Flink; internally, it is 
treated similarly to event time.
+*Processing time* refers to the machine's system time (also known as 
"wall-clock time") that is executing the respective operation.
+- *Event time* refers to the processing of streaming data based on timestamps 
that are attached to each row. The timestamps can encode when an event happened.
 
-For more information about time handling in Flink, see the introduction about 
[Event Time and Watermarks]({% link dev/event_time.md %}).
+For more information about time handling in Flink, see the introduction about 
[event time and watermarks]({% link dev/event_time.md %}).
 
-This page explains how time attributes can be defined for time-based 
operations in Flink's Table API & SQL.
 
 * This will be replaced by the TOC
 {:toc}
 
 Introduction to Time Attributes
 -------------------------------
 
-Time-based operations such as windows in both the [Table API]({% link 
dev/table/tableApi.md %}#group-windows) and [SQL]({% link 
dev/table/sql/queries.md %}#group-windows) require information about the notion 
of time and its origin. Therefore, tables can offer *logical time attributes* 
for indicating time and accessing corresponding timestamps in table programs.
+Time attributes can be part of every table schema.
+They are defined when creating a table from a `CREATE TABLE DDL` or a 
`DataStream`. 
+Once a time attribute is defined, it can be referenced as a field and used in 
time-based operations.
+As long as a time attribute is not modified and forwarded from one part of the 
query to another, it remains valid.
+Time attributes behave like regular timestamps and accessible for calculations.

Review comment:
       ```suggestion
   Time attributes behave like regular timestamps, and are accessible for 
calculations.
   ```

##########
File path: docs/dev/table/streaming/time_attributes.md
##########
@@ -22,213 +22,42 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Flink is able to process streaming data based on different notions of *time*. 
+Flink can process data based on different notions of time. 
 
-- *Processing time* refers to the system time of the machine (also known as 
"wall-clock time") that is executing the respective operation.
-- *Event time* refers to the processing of streaming data based on timestamps 
which are attached to each row. The timestamps can encode when an event 
happened.
-- *Ingestion time* is the time that events enter Flink; internally, it is 
treated similarly to event time.
+*Processing time* refers to the machine's system time (also known as 
"wall-clock time") that is executing the respective operation.
+- *Event time* refers to the processing of streaming data based on timestamps 
that are attached to each row. The timestamps can encode when an event happened.
 
-For more information about time handling in Flink, see the introduction about 
[Event Time and Watermarks]({% link dev/event_time.md %}).
+For more information about time handling in Flink, see the introduction about 
[event time and watermarks]({% link dev/event_time.md %}).
 
-This page explains how time attributes can be defined for time-based 
operations in Flink's Table API & SQL.
 
 * This will be replaced by the TOC
 {:toc}
 
 Introduction to Time Attributes
 -------------------------------
 
-Time-based operations such as windows in both the [Table API]({% link 
dev/table/tableApi.md %}#group-windows) and [SQL]({% link 
dev/table/sql/queries.md %}#group-windows) require information about the notion 
of time and its origin. Therefore, tables can offer *logical time attributes* 
for indicating time and accessing corresponding timestamps in table programs.
+Time attributes can be part of every table schema.
+They are defined when creating a table from a `CREATE TABLE DDL` or a 
`DataStream`. 
+Once a time attribute is defined, it can be referenced as a field and used in 
time-based operations.
+As long as a time attribute is not modified and forwarded from one part of the 
query to another, it remains valid.
+Time attributes behave like regular timestamps and accessible for calculations.
+When used in computation, time attributes are materialized and act as standard 
timestamps. 
+However, the opposite is not true; ordinary timestamp columns cannot be 
arbitrarily converted to time attributes in the middle of a query.

Review comment:
       ```suggestion
   However, ordinary timestamps cannot be used in place of, or be converted to, 
time attributes.
   ```

##########
File path: docs/dev/table/streaming/time_attributes.md
##########
@@ -318,98 +145,61 @@ val windowedTable = table.window(Tumble over 10.minutes 
on $"user_action_time" a
 </div>
 </div>
 
-### Using a TableSource
 
-The event time attribute is defined by a `TableSource` that implements the 
`DefinedRowtimeAttributes` interface. The `getRowtimeAttributeDescriptors()` 
method returns a list of `RowtimeAttributeDescriptor` for describing the final 
name of a time attribute, a timestamp extractor to derive the values of the 
attribute, and the watermark strategy associated with the attribute.
+Processing Time
+---------------
+
+Processing time allows a table program to produce results based on the time of 
the local machine. It is the simplest notion of time but does not provide 
determinism. It neither requires timestamp extraction nor watermark generation.

Review comment:
       ```suggestion
   Processing time allows a table program to produce results based on the time 
of the local machine. It is the simplest notion of time, but it will generate 
non-deterministic results. Processing time does not require timestamp 
extraction or watermark generation.
   ```

##########
File path: docs/dev/table/streaming/time_attributes.md
##########
@@ -22,213 +22,42 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Flink is able to process streaming data based on different notions of *time*. 
+Flink can process data based on different notions of time. 
 
-- *Processing time* refers to the system time of the machine (also known as 
"wall-clock time") that is executing the respective operation.
-- *Event time* refers to the processing of streaming data based on timestamps 
which are attached to each row. The timestamps can encode when an event 
happened.
-- *Ingestion time* is the time that events enter Flink; internally, it is 
treated similarly to event time.
+*Processing time* refers to the machine's system time (also known as 
"wall-clock time") that is executing the respective operation.
+- *Event time* refers to the processing of streaming data based on timestamps 
that are attached to each row. The timestamps can encode when an event happened.
 
-For more information about time handling in Flink, see the introduction about 
[Event Time and Watermarks]({% link dev/event_time.md %}).
+For more information about time handling in Flink, see the introduction about 
[event time and watermarks]({% link dev/event_time.md %}).
 
-This page explains how time attributes can be defined for time-based 
operations in Flink's Table API & SQL.
 
 * This will be replaced by the TOC
 {:toc}
 
 Introduction to Time Attributes
 -------------------------------
 
-Time-based operations such as windows in both the [Table API]({% link 
dev/table/tableApi.md %}#group-windows) and [SQL]({% link 
dev/table/sql/queries.md %}#group-windows) require information about the notion 
of time and its origin. Therefore, tables can offer *logical time attributes* 
for indicating time and accessing corresponding timestamps in table programs.
+Time attributes can be part of every table schema.
+They are defined when creating a table from a `CREATE TABLE DDL` or a 
`DataStream`. 
+Once a time attribute is defined, it can be referenced as a field and used in 
time-based operations.
+As long as a time attribute is not modified and forwarded from one part of the 
query to another, it remains valid.
+Time attributes behave like regular timestamps and accessible for calculations.
+When used in computation, time attributes are materialized and act as standard 
timestamps. 
+However, the opposite is not true; ordinary timestamp columns cannot be 
arbitrarily converted to time attributes in the middle of a query.
 
-Time attributes can be part of every table schema. They are defined when 
creating a table from a CREATE TABLE DDL or a `DataStream` or are pre-defined 
when using a `TableSource`. Once a time attribute has been defined at the 
beginning, it can be referenced as a field and can be used in time-based 
operations.
-
-As long as a time attribute is not modified and is simply forwarded from one 
part of the query to another, it remains a valid time attribute. Time 
attributes behave like regular timestamps and can be accessed for calculations. 
If a time attribute is used in a calculation, it will be materialized and 
becomes a regular timestamp. Regular timestamps do not cooperate with Flink's 
time and watermarking system and thus can not be used for time-based operations 
anymore.
-
-Table programs require that the corresponding time characteristic has been 
specified for the streaming environment:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
-
-// alternatively:
-// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default
-
-// alternatively:
-// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
-// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-<div data-lang="python" markdown="1">
-{% highlight python %}
-env = StreamExecutionEnvironment.get_execution_environment()
-
-env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)  # 
default
-
-# alternatively:
-# env.set_stream_time_characteristic(TimeCharacteristic.IngestionTime)
-# env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-</div>
-
-Processing time
----------------
-
-Processing time allows a table program to produce results based on the time of 
the local machine. It is the simplest notion of time but does not provide 
determinism. It neither requires timestamp extraction nor watermark generation.
-
-There are three ways to define a processing time attribute.
-
-### Defining in create table DDL
-
-The processing time attribute is defined as a computed column in create table 
DDL using the system `PROCTIME()` function. Please see [CREATE TABLE DDL]({% 
link dev/table/sql/create.md %}#create-table) for more information about 
computed column.
-
-{% highlight sql %}
-
-CREATE TABLE user_actions (
-  user_name STRING,
-  data STRING,
-  user_action_time AS PROCTIME() -- declare an additional field as a 
processing time attribute
-) WITH (
-  ...
-);
-
-SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT 
user_name)
-FROM user_actions
-GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
-
-{% endhighlight %}
-
-
-### During DataStream-to-Table Conversion
-
-The processing time attribute is defined with the `.proctime` property during 
schema definition. The time attribute must only extend the physical schema by 
an additional logical field. Thus, it can only be defined at the end of the 
schema definition.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<Tuple2<String, String>> stream = ...;
-
-// declare an additional logical field as a processing time attribute
-Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), 
$("user_action_time").proctime());
-
-WindowedTable windowedTable = table.window(
-        Tumble.over(lit(10).minutes())
-            .on($("user_action_time"))
-            .as("userActionWindow"));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val stream: DataStream[(String, String)] = ...
-
-// declare an additional logical field as a processing time attribute
-val table = tEnv.fromDataStream(stream, $"UserActionTimestamp", $"user_name", 
$"data", $"user_action_time".proctime)
-
-val windowedTable = table.window(Tumble over 10.minutes on $"user_action_time" 
as "userActionWindow")
-{% endhighlight %}
-</div>
-</div>
-
-### Using a TableSource
-
-The processing time attribute is defined by a `TableSource` that implements 
the `DefinedProctimeAttribute` interface. The logical time attribute is 
appended to the physical schema defined by the return type of the `TableSource`.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-// define a table source with a processing attribute
-public class UserActionSource implements StreamTableSource<Row>, 
DefinedProctimeAttribute {
-
-       @Override
-       public TypeInformation<Row> getReturnType() {
-               String[] names = new String[] {"user_name" , "data"};
-               TypeInformation[] types = new TypeInformation[] 
{Types.STRING(), Types.STRING()};
-               return Types.ROW(names, types);
-       }
-
-       @Override
-       public DataStream<Row> getDataStream(StreamExecutionEnvironment 
execEnv) {
-               // create stream
-               DataStream<Row> stream = ...;
-               return stream;
-       }
-
-       @Override
-       public String getProctimeAttribute() {
-               // field with this name will be appended as a third field
-               return "user_action_time";
-       }
-}
-
-// register table source
-tEnv.registerTableSource("user_actions", new UserActionSource());
-
-WindowedTable windowedTable = tEnv
-       .from("user_actions")
-       .window(Tumble
-           .over(lit(10).minutes())
-           .on($("user_action_time"))
-           .as("userActionWindow"));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-// define a table source with a processing attribute
-class UserActionSource extends StreamTableSource[Row] with 
DefinedProctimeAttribute {
-
-       override def getReturnType = {
-               val names = Array[String]("user_name" , "data")
-               val types = Array[TypeInformation[_]](Types.STRING, 
Types.STRING)
-               Types.ROW(names, types)
-       }
-
-       override def getDataStream(execEnv: StreamExecutionEnvironment): 
DataStream[Row] = {
-               // create stream
-               val stream = ...
-               stream
-       }
-
-       override def getProctimeAttribute = {
-               // field with this name will be appended as a third field
-               "user_action_time"
-       }
-}
-
-// register table source
-tEnv.registerTableSource("user_actions", new UserActionSource)
-
-val windowedTable = tEnv
-       .from("user_actions")
-       .window(Tumble over 10.minutes on $"user_action_time" as 
"userActionWindow")
-{% endhighlight %}
-</div>
-</div>
-
-Event time
+Event Time
 ----------
 
-Event time allows a table program to produce results based on the time that is 
contained in every record. This allows for consistent results even in case of 
out-of-order events or late events. It also ensures replayable results of the 
table program when reading records from persistent storage.
+Event time allows a table program to produce results based on timestamps in 
every record, allowing for consistent results even in case of out-of-order 
events or late events. It also ensures the replayability of the results of the 
table program when reading records from persistent storage.
 
-Additionally, event time allows for unified syntax for table programs in both 
batch and streaming environments. A time attribute in a streaming environment 
can be a regular field of a record in a batch environment.
+Additionally, event time allows for unified syntax for table programs in both 
batch and streaming environments. A time attribute in a streaming environment 
can be a regular column of a row in a batch environment.
 
-In order to handle out-of-order events and distinguish between on-time and 
late events in streaming, Flink needs to extract timestamps from events and 
make some kind of progress in time (so-called [watermarks]({% link 
dev/event_time.md %})).
+To handle out-of-order events and distinguish between on-time and late events 
in streaming, Flink needs to extract timestamps from events and make some 
progress in time (so-called [watermarks]({% link dev/event_time.md %})).

Review comment:
       ```suggestion
   To handle out-of-order events and to distinguish between on-time and late 
events in streaming, Flink needs to know the timestamp for each row, and it 
also needs regular indications of how far along in event time the processing 
has progressed so far (via so-called [watermarks]({% link dev/event_time.md 
%})).
   ```

##########
File path: docs/dev/table/streaming/time_attributes.md
##########
@@ -22,213 +22,42 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Flink is able to process streaming data based on different notions of *time*. 
+Flink can process data based on different notions of time. 
 
-- *Processing time* refers to the system time of the machine (also known as 
"wall-clock time") that is executing the respective operation.
-- *Event time* refers to the processing of streaming data based on timestamps 
which are attached to each row. The timestamps can encode when an event 
happened.
-- *Ingestion time* is the time that events enter Flink; internally, it is 
treated similarly to event time.
+*Processing time* refers to the machine's system time (also known as 
"wall-clock time") that is executing the respective operation.
+- *Event time* refers to the processing of streaming data based on timestamps 
that are attached to each row. The timestamps can encode when an event happened.
 
-For more information about time handling in Flink, see the introduction about 
[Event Time and Watermarks]({% link dev/event_time.md %}).
+For more information about time handling in Flink, see the introduction about 
[event time and watermarks]({% link dev/event_time.md %}).
 
-This page explains how time attributes can be defined for time-based 
operations in Flink's Table API & SQL.
 
 * This will be replaced by the TOC
 {:toc}
 
 Introduction to Time Attributes
 -------------------------------
 
-Time-based operations such as windows in both the [Table API]({% link 
dev/table/tableApi.md %}#group-windows) and [SQL]({% link 
dev/table/sql/queries.md %}#group-windows) require information about the notion 
of time and its origin. Therefore, tables can offer *logical time attributes* 
for indicating time and accessing corresponding timestamps in table programs.
+Time attributes can be part of every table schema.
+They are defined when creating a table from a `CREATE TABLE DDL` or a 
`DataStream`. 
+Once a time attribute is defined, it can be referenced as a field and used in 
time-based operations.
+As long as a time attribute is not modified and forwarded from one part of the 
query to another, it remains valid.
+Time attributes behave like regular timestamps and accessible for calculations.
+When used in computation, time attributes are materialized and act as standard 
timestamps. 
+However, the opposite is not true; ordinary timestamp columns cannot be 
arbitrarily converted to time attributes in the middle of a query.
 
-Time attributes can be part of every table schema. They are defined when 
creating a table from a CREATE TABLE DDL or a `DataStream` or are pre-defined 
when using a `TableSource`. Once a time attribute has been defined at the 
beginning, it can be referenced as a field and can be used in time-based 
operations.
-
-As long as a time attribute is not modified and is simply forwarded from one 
part of the query to another, it remains a valid time attribute. Time 
attributes behave like regular timestamps and can be accessed for calculations. 
If a time attribute is used in a calculation, it will be materialized and 
becomes a regular timestamp. Regular timestamps do not cooperate with Flink's 
time and watermarking system and thus can not be used for time-based operations 
anymore.
-
-Table programs require that the corresponding time characteristic has been 
specified for the streaming environment:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
-
-// alternatively:
-// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default
-
-// alternatively:
-// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
-// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-<div data-lang="python" markdown="1">
-{% highlight python %}
-env = StreamExecutionEnvironment.get_execution_environment()
-
-env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)  # 
default
-
-# alternatively:
-# env.set_stream_time_characteristic(TimeCharacteristic.IngestionTime)
-# env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-</div>
-
-Processing time
----------------
-
-Processing time allows a table program to produce results based on the time of 
the local machine. It is the simplest notion of time but does not provide 
determinism. It neither requires timestamp extraction nor watermark generation.
-
-There are three ways to define a processing time attribute.
-
-### Defining in create table DDL
-
-The processing time attribute is defined as a computed column in create table 
DDL using the system `PROCTIME()` function. Please see [CREATE TABLE DDL]({% 
link dev/table/sql/create.md %}#create-table) for more information about 
computed column.
-
-{% highlight sql %}
-
-CREATE TABLE user_actions (
-  user_name STRING,
-  data STRING,
-  user_action_time AS PROCTIME() -- declare an additional field as a 
processing time attribute
-) WITH (
-  ...
-);
-
-SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT 
user_name)
-FROM user_actions
-GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
-
-{% endhighlight %}
-
-
-### During DataStream-to-Table Conversion
-
-The processing time attribute is defined with the `.proctime` property during 
schema definition. The time attribute must only extend the physical schema by 
an additional logical field. Thus, it can only be defined at the end of the 
schema definition.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<Tuple2<String, String>> stream = ...;
-
-// declare an additional logical field as a processing time attribute
-Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), 
$("user_action_time").proctime());
-
-WindowedTable windowedTable = table.window(
-        Tumble.over(lit(10).minutes())
-            .on($("user_action_time"))
-            .as("userActionWindow"));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val stream: DataStream[(String, String)] = ...
-
-// declare an additional logical field as a processing time attribute
-val table = tEnv.fromDataStream(stream, $"UserActionTimestamp", $"user_name", 
$"data", $"user_action_time".proctime)
-
-val windowedTable = table.window(Tumble over 10.minutes on $"user_action_time" 
as "userActionWindow")
-{% endhighlight %}
-</div>
-</div>
-
-### Using a TableSource
-
-The processing time attribute is defined by a `TableSource` that implements 
the `DefinedProctimeAttribute` interface. The logical time attribute is 
appended to the physical schema defined by the return type of the `TableSource`.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-// define a table source with a processing attribute
-public class UserActionSource implements StreamTableSource<Row>, 
DefinedProctimeAttribute {
-
-       @Override
-       public TypeInformation<Row> getReturnType() {
-               String[] names = new String[] {"user_name" , "data"};
-               TypeInformation[] types = new TypeInformation[] 
{Types.STRING(), Types.STRING()};
-               return Types.ROW(names, types);
-       }
-
-       @Override
-       public DataStream<Row> getDataStream(StreamExecutionEnvironment 
execEnv) {
-               // create stream
-               DataStream<Row> stream = ...;
-               return stream;
-       }
-
-       @Override
-       public String getProctimeAttribute() {
-               // field with this name will be appended as a third field
-               return "user_action_time";
-       }
-}
-
-// register table source
-tEnv.registerTableSource("user_actions", new UserActionSource());
-
-WindowedTable windowedTable = tEnv
-       .from("user_actions")
-       .window(Tumble
-           .over(lit(10).minutes())
-           .on($("user_action_time"))
-           .as("userActionWindow"));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-// define a table source with a processing attribute
-class UserActionSource extends StreamTableSource[Row] with 
DefinedProctimeAttribute {
-
-       override def getReturnType = {
-               val names = Array[String]("user_name" , "data")
-               val types = Array[TypeInformation[_]](Types.STRING, 
Types.STRING)
-               Types.ROW(names, types)
-       }
-
-       override def getDataStream(execEnv: StreamExecutionEnvironment): 
DataStream[Row] = {
-               // create stream
-               val stream = ...
-               stream
-       }
-
-       override def getProctimeAttribute = {
-               // field with this name will be appended as a third field
-               "user_action_time"
-       }
-}
-
-// register table source
-tEnv.registerTableSource("user_actions", new UserActionSource)
-
-val windowedTable = tEnv
-       .from("user_actions")
-       .window(Tumble over 10.minutes on $"user_action_time" as 
"userActionWindow")
-{% endhighlight %}
-</div>
-</div>
-
-Event time
+Event Time
 ----------
 
-Event time allows a table program to produce results based on the time that is 
contained in every record. This allows for consistent results even in case of 
out-of-order events or late events. It also ensures replayable results of the 
table program when reading records from persistent storage.
+Event time allows a table program to produce results based on timestamps in 
every record, allowing for consistent results even in case of out-of-order 
events or late events. It also ensures the replayability of the results of the 
table program when reading records from persistent storage.

Review comment:
       Can we really argue that results are consistent with late events?
   
   ```suggestion
   Event time allows a table program to produce results based on timestamps in 
every record, allowing for consistent results despite out-of-order or late 
events. It also ensures the replayability of the results of the table program 
when reading records from persistent storage.
   ```

##########
File path: docs/dev/table/streaming/time_attributes.md
##########
@@ -22,213 +22,42 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Flink is able to process streaming data based on different notions of *time*. 
+Flink can process data based on different notions of time. 
 
-- *Processing time* refers to the system time of the machine (also known as 
"wall-clock time") that is executing the respective operation.
-- *Event time* refers to the processing of streaming data based on timestamps 
which are attached to each row. The timestamps can encode when an event 
happened.
-- *Ingestion time* is the time that events enter Flink; internally, it is 
treated similarly to event time.
+*Processing time* refers to the machine's system time (also known as 
"wall-clock time") that is executing the respective operation.
+- *Event time* refers to the processing of streaming data based on timestamps 
that are attached to each row. The timestamps can encode when an event happened.
 
-For more information about time handling in Flink, see the introduction about 
[Event Time and Watermarks]({% link dev/event_time.md %}).
+For more information about time handling in Flink, see the introduction about 
[event time and watermarks]({% link dev/event_time.md %}).
 
-This page explains how time attributes can be defined for time-based 
operations in Flink's Table API & SQL.
 
 * This will be replaced by the TOC
 {:toc}
 
 Introduction to Time Attributes
 -------------------------------
 
-Time-based operations such as windows in both the [Table API]({% link 
dev/table/tableApi.md %}#group-windows) and [SQL]({% link 
dev/table/sql/queries.md %}#group-windows) require information about the notion 
of time and its origin. Therefore, tables can offer *logical time attributes* 
for indicating time and accessing corresponding timestamps in table programs.
+Time attributes can be part of every table schema.
+They are defined when creating a table from a `CREATE TABLE DDL` or a 
`DataStream`. 
+Once a time attribute is defined, it can be referenced as a field and used in 
time-based operations.
+As long as a time attribute is not modified and forwarded from one part of the 
query to another, it remains valid.
+Time attributes behave like regular timestamps and accessible for calculations.
+When used in computation, time attributes are materialized and act as standard 
timestamps. 

Review comment:
       ```suggestion
   When used in calculations, time attributes are materialized and act as 
standard timestamps. 
   ```

##########
File path: docs/dev/table/streaming/time_attributes.md
##########
@@ -251,14 +80,12 @@ GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
 
 ### During DataStream-to-Table Conversion
 
-The event time attribute is defined with the `.rowtime` property during schema 
definition. [Timestamps and watermarks]({% link dev/event_time.md %}) must have 
been assigned in the `DataStream` that is converted.
+When converting a `DataStream` to a table, an event time attribute can be 
defined with the `.rowtime` property during schema definition. [Timestamps and 
watermarks]({% link dev/event_time.md %}) must have been already assigned in 
the `DataStream` being converted.
 
-There are two ways of defining the time attribute when converting a 
`DataStream` into a `Table`. Depending on whether the specified `.rowtime` 
field name exists in the schema of the `DataStream` or not, the timestamp field 
is either
+There are two ways of defining the time attribute when converting a 
`DataStream` into a `Table`. Depending on whether the specified `.rowtime` 
field name exists in the schema of the `DataStream`, the timestamp is either 
(1) appended as a new column to the schema or
+(2) replaces an existing column.

Review comment:
       ```suggestion
   There are two ways of defining the time attribute when converting a 
`DataStream` into a `Table`. Depending on whether the specified `.rowtime` 
field name exists in the schema of the `DataStream`, the timestamp is either 
(1) appended as a new column, or it
   (2) replaces an existing column.
   ```

##########
File path: docs/dev/table/streaming/time_attributes.md
##########
@@ -251,14 +80,12 @@ GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
 
 ### During DataStream-to-Table Conversion
 
-The event time attribute is defined with the `.rowtime` property during schema 
definition. [Timestamps and watermarks]({% link dev/event_time.md %}) must have 
been assigned in the `DataStream` that is converted.
+When converting a `DataStream` to a table, an event time attribute can be 
defined with the `.rowtime` property during schema definition. [Timestamps and 
watermarks]({% link dev/event_time.md %}) must have been already assigned in 
the `DataStream` being converted.

Review comment:
       ```suggestion
   When converting a `DataStream` to a table, an event time attribute can be 
defined with the `.rowtime` property during schema definition. [Timestamps and 
watermarks]({% link dev/event_time.md %}) must have already been assigned in 
the `DataStream` being converted.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to