WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] 
Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r284678079
 
 

 ##########
 File path: flink-python/pyflink/table/table.py
 ##########
 @@ -96,16 +105,413 @@ def where(self, predicate):
         """
         Filters out elements that don't pass the filter predicate. Similar to 
a SQL WHERE
         clause.
+
         Example:
         ::
-
             >>> tab.where("name = 'Fred'")
 
         :param predicate: Predicate expression string.
         :return: Result table.
         """
         return Table(self._j_table.where(predicate))
 
+    def group_by(self, fields):
+        """
+        Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+        to perform the aggregation on a per-group basis. Similar to a SQL 
GROUP BY statement.
+
+        Example:
+        ::
+            >>> tab.group_by("key").select("key, value.avg")
+
+        :param fields: Group keys.
+        :return: The grouped table.
+        """
+        return GroupedTable(self._j_table.groupBy(fields))
+
+    def distinct(self):
+        """
+        Removes duplicate values and returns only distinct (different) values.
+
+        Example:
+        ::
+            >>> tab.select("key, value").distinct()
+
+        :return: Result table.
+        """
+        return Table(self._j_table.distinct())
+
+    def join(self, right, join_predicate=None):
+        """
+        Joins two :class:`Table`s. Similar to a SQL join. The fields of the 
two joined
+        operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+        necessary. You can use where and select clauses after a join to 
further specify the
+        behaviour of the join.
+
+        .. note::
+            Both tables must be bound to the same :class:`TableEnvironment` .
+
+        Example:
+        ::
+            >>> left.join(right).where("a = b && c > 3").select("a, b, d")
+            >>> left.join(right, "a = b")
+
+        :param right: Right table.
+        :param join_predicate: Optional, the join predicate expression string.
+        :return: Result table.
+        """
+        if join_predicate is not None:
+            return Table(self._j_table.join(right._j_table, join_predicate))
+        else:
+            return Table(self._j_table.join(right._j_table))
+
+    def left_outer_join(self, right, join_predicate=None):
+        """
+        Joins two :class:`Table`s. Similar to a SQL left outer join. The 
fields of the two joined
+        operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+        necessary.
+
+        .. note::
+            Both tables must be bound to the same :class:`TableEnvironment` 
and its
+            :class:`TableConfig` must have null check enabled (default).
+
+        Example:
+        ::
+            >>> left.left_outer_join(right).select("a, b, d")
+            >>> left.left_outer_join(right, "a = b").select("a, b, d")
+
+        :param right: Right table.
+        :param join_predicate: Optional, the join predicate expression string.
+        :return: Result table.
+        """
+        if join_predicate is None:
+            return Table(self._j_table.leftOuterJoin(right._j_table))
+        else:
+            return Table(self._j_table.leftOuterJoin(right._j_table, 
join_predicate))
+
+    def right_outer_join(self, right, join_predicate):
+        """
+        Joins two :class:`Table`s. Similar to a SQL right outer join. The 
fields of the two joined
+        operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+        necessary.
+
+        .. note::
+            Both tables must be bound to the same :class:`TableEnvironment` 
and its
+            :class:`TableConfig` must have null check enabled (default).
+
+        Example:
+        ::
+            >>> left.right_outer_join(right, "a = b").select("a, b, d")
+
+        :param right: Right table.
+        :param join_predicate: The join predicate expression string.
+        :return: Result table.
+        """
+        return Table(self._j_table.rightOuterJoin(right._j_table, 
join_predicate))
+
+    def full_outer_join(self, right, join_predicate):
+        """
+        Joins two :class:`Table`s. Similar to a SQL full outer join. The 
fields of the two joined
+        operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+        necessary.
+
+        .. note::
+            Both tables must be bound to the same :class:`TableEnvironment` 
and its
+            :class:`TableConfig` must have null check enabled (default).
+
+        Example:
+        ::
+            >>> left.full_outer_join(right, "a = b").select("a, b, d")
+
+        :param right: Right table.
+        :param join_predicate: The join predicate expression string.
+        :return: Result table.
+        """
+        return Table(self._j_table.fullOuterJoin(right._j_table, 
join_predicate))
+
+    def minus(self, right):
+        """
+        Minus of two :class:`Table`s with duplicate records removed.
+        Similar to a SQL EXCEPT clause. Minus returns records from the left 
table that do not
+        exist in the right table. Duplicate records in the left table are 
returned
+        exactly once, i.e., duplicates are removed. Both tables must have 
identical field types.
+
+        .. note::
+            Both tables must be bound to the same :class:`TableEnvironment`.
+
+        Example:
+        ::
+            >>> left.minus(right)
+
+        :param right: Right table.
+        :return: Result table.
+        """
+        return Table(self._j_table.minus(right._j_table))
+
+    def minus_all(self, right):
+        """
+        Minus of two :class:`Table`s. Similar to a SQL EXCEPT ALL.
+        Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that 
do not exist in
+        the right table. A record that is present n times in the left table 
and m times
+        in the right table is returned (n - m) times, i.e., as many duplicates 
as are present
+        in the right table are removed. Both tables must have identical field 
types.
+
+        .. note::
+            Both tables must be bound to the same :class:`TableEnvironment`.
+
+        Example:
+        ::
+            >>> left.minus_all(right)
+
+        :param right: Right table.
+        :return: Result table.
+        """
+        return Table(self._j_table.minusAll(right._j_table))
+
+    def union(self, right):
+        """
+        Unions two :class:`Table`s with duplicate records removed.
+        Similar to a SQL UNION. The fields of the two union operations must 
fully overlap.
+
+        .. note::
+            Both tables must be bound to the same :class:`TableEnvironment`.
+
+        Example:
+        ::
+            >>> left.union(right)
+
+        :param right: Right table.
+        :return: Result table.
+        """
+        return Table(self._j_table.union(right._j_table))
+
+    def union_all(self, right):
+        """
+        Unions two :class:`Table`s. Similar to a SQL UNION ALL. The fields of 
the two union
+        operations must fully overlap.
+
+        .. note::
+            Both tables must be bound to the same :class:`TableEnvironment`.
+
+        Example:
+        ::
+            >>> left.union_all(right)
+
+        :param right: Right table.
+        :return: Result table.
+        """
+        return Table(self._j_table.unionAll(right._j_table))
+
+    def intersect(self, right):
+        """
+        Intersects two :class:`Table`s with duplicate records removed. 
Intersect returns records
+        that exist in both tables. If a record is present in one or both 
tables more than once,
+        it is returned just once, i.e., the resulting table has no duplicate 
records. Similar to a
+        SQL INTERSECT. The fields of the two intersect operations must fully 
overlap.
+
+        .. note::
+            Both tables must be bound to the same :class:`TableEnvironment`.
+
+        Example:
+        ::
+            >>> left.intersect(right)
+
+        :param right: Right table.
+        :return: Result table.
+        """
+        return Table(self._j_table.intersect(right._j_table))
+
+    def intersect_all(self, right):
+        """
+        Intersects two :class:`Table`s. IntersectAll returns records that 
exist in both tables.
+        If a record is present in both tables more than once, it is returned 
as many times as it
+        is present in both tables, i.e., the resulting table might have 
duplicate records. Similar
+        to an SQL INTERSECT ALL. The fields of the two intersect operations 
must fully overlap.
+
+        .. note::
+            Both tables must be bound to the same :class:`TableEnvironment`.
+
+        Example:
+        ::
+            >>> left.intersect_all(right)
+
+        :param right: Right table.
+        :return: Result table.
+        """
+        return Table(self._j_table.intersectAll(right._j_table))
+
+    def order_by(self, fields):
+        """
+        Sorts the given :class:`Table`. Similar to SQL ORDER BY.
+        The resulting Table is sorted globally sorted across all parallel 
partitions.
+
+        Example:
+        ::
+            >>> tab.order_by("name.desc")
+
+        :param fields: Order fields expression string,
+        :return: Result table.
+        """
+        return Table(self._j_table.orderBy(fields))
+
+    def offset(self, offset):
+        """
+        Limits a sorted result from an offset position.
+        Similar to a SQL OFFSET clause. Offset is technically part of the 
Order By operator and
+        thus must be preceded by it.
+        :func:`~pyflink.table.Table.offset` can be combined with a subsequent
+        :func:`~pyflink.table.Table.fetch` call to return n rows after 
skipping the first o rows.
+
+        Example:
+        ::
+            # skips the first 3 rows and returns all following rows.
+            >>> tab.order_by("name.desc").offset(3)
+            # skips the first 10 rows and returns the next 5 rows.
+            >>> tab.order_by("name.desc").offset(10).fetch(5)
+
+        :param offset: Number of records to skip.
+        :return: Result table.
+        """
+        return Table(self._j_table.offset(offset))
+
+    def fetch(self, fetch):
+        """
+        Limits a sorted result to the first n rows.
+        Similar to a SQL FETCH clause. Fetch is technically part of the Order 
By operator and
+        thus must be preceded by it.
+        :func:`~pyflink.table.Table.offset` can be combined with a preceding
+        :func:`~pyflink.table.Table.fetch` call to return n rows after 
skipping the first o rows.
+
+        Example:
+
+        Returns the first 3 records.
+        ::
+            >>> tab.order_by("name.desc").fetch(3)
+
+        Skips the first 10 rows and returns the next 5 rows.
+        ::
+            >>> tab.order_by("name.desc").offset(10).fetch(5)
+
+        :param fetch: The number of records to return. Fetch must be >= 0.
+        :return: Result table.
+        """
+        return Table(self._j_table.fetch(fetch))
+
+    def window(self, window):
+        """
+        Defines group window on the records of a table.
+
+        A group window groups the records of a table by assigning them to 
windows defined by a time
+        or row interval.
+
+        For streaming tables of infinite size, grouping into windows is 
required to define finite
+        groups on which group-based aggregates can be computed.
+
+        For batch tables of finite size, windowing essentially provides 
shortcuts for time-based
+        groupBy.
+
+        .. note::
+            Computing windowed aggregates on a streaming table is only a 
parallel operation
+            if additional grouping attributes are added to the
+            :func:`~pyflink.table.GroupWindowedTable.group_by` clause.
+            If the :func:`~pyflink.table.GroupWindowedTable.group_by` only 
references a GroupWindow
+            alias, the streamed table will be processed by a single task, 
i.e., with parallelism 1.
+
+        :param windows: A :class:`GroupWindow` created from :class:`Tumble`, 
:class:`Session` or
+                        :class:`Slide`.
+        :return: A :class:`GroupWindowedTable`.
+        """
+        # type: (GroupWindow) -> GroupWindowedTable
+        return GroupWindowedTable(self._j_table.window(window._java_window))
+
+    def over_window(self, *over_windows):
+        """
+        Defines over-windows on the records of a table.
+
+        An over-window defines for each record an interval of records over 
which aggregation
+        functions can be computed.
+
+        Example:
+        ::
+            >>> table.window(Over.partition_by("c").order_by("rowTime")\
+            ...     .preceding("10.seconds").alias("ow"))\
+            ...     .select("c, b.count over ow, e.sum over ow")
+
+        .. note::
+            Computing over window aggregates on a streaming table is only a 
parallel
+            operation if the window is partitioned. Otherwise, the whole 
stream will be processed
+            by a single task, i.e., with parallelism 1.
+
+        .. note::
+            Over-windows for batch tables are currently not supported.
+
+        :param windows: :class:`OverWindow`s created from :class:`Over`.
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to