[GitHub] [flink] zhijiangW commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
zhijiangW commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r283635811 ## File path: flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.memory; + +import java.io.IOException; +import java.util.Collection; + +/** + * The provider used for requesting and releasing batch of memory segments. + */ +public interface MemorySegmentProvider { + /** Requests default number of memory segments. */ + Collection requestMemorySegments() throws IOException; + + Collection requestMemorySegments(int numRequiredBuffers) throws IOException; Review comment: We are indeed in the lack of semantics of requesting/releasing batch of buffers before, and I think it is reasonable to extend these methods. - `SegmentProvider` is not really needed because `RemoteInputChannel` still needs construct the `Buffer` internally after requesting `MemorySegment`. - We could extend the general `BufferProvider` interface to provide method of `requestBuffers(int numRequiredBuffers, BufferRecycler)`, and `RemoteInputChannel` already implements `BufferRecycler` interface. - We could also extend the general `BufferRecycler` interface to provide method of `recycle(Collection)`. To do so we make use of the existing semantics and architecture. But the only concern is we would affect the previous implementations to bring more changes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink
wuchong commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink URL: https://github.com/apache/flink/pull/8435#discussion_r283635271 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/DeclarativeAggregateFunction.java ## @@ -62,7 +62,7 @@ /** * All types of the aggregate buffer. */ - public abstract InternalType[] getAggBufferTypes(); + public abstract LogicalType[] getAggBufferTypes(); Review comment: Regarding to `DeclarativeAggregateFunction`, I think it is an API expose to "users" or out of table system. Maybe we should use `DataType` for `getAggBufferTypes` and `getResultType`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yuyang08 edited a comment on issue #8067: [FLINK-11746][formats] Add thrift format support to Flink
yuyang08 edited a comment on issue #8067: [FLINK-11746][formats] Add thrift format support to Flink URL: https://github.com/apache/flink/pull/8067#issuecomment-492088145 @twalthr , @rmetzger, @fhueske could you help to review this pr, or point us to the right person for reviewing? we have tested this change internally and it works for our use cases. We would love to get feedback from the community and see how we can merge it into the upstream. thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yuyang08 commented on issue #8067: [FLINK-11746][formats] Add thrift format support to Flink
yuyang08 commented on issue #8067: [FLINK-11746][formats] Add thrift format support to Flink URL: https://github.com/apache/flink/pull/8067#issuecomment-492088145 @twalthr , @rmetzger could you help to review this pr, or point us to the right person for reviewing? we have tested this change internally and it works for our use cases. We would love to get feedback from the community and see how we can merge it into the upstream. thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283633644 ## File path: flink-python/pyflink/table/table_config.py ## @@ -44,18 +47,55 @@ def parallelism(self): def parallelism(self, parallelism): self._parallelism = parallelism +@property +def timezone(self): +return self._j_table_config.getTimeZone().getID() Review comment: The reason for keeping current implementation is the same as above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283633442 ## File path: flink-python/pyflink/table/table.py ## @@ -118,3 +463,112 @@ def insert_into(self, table_name): :param table_name: Name of the :class:`TableSink` to which the :class:`Table` is written. """ self._j_table.insertInto(table_name) + +def __str__(self): +return self._j_table.toString() + + +class GroupedTable(object): +""" +A table that has been grouped on a set of grouping keys. +""" + +def __init__(self, java_table): +self._j_table = java_table + +def select(self, fields): +""" +Performs a selection operation on a grouped table. Similar to an SQL SELECT statement. +The field expressions can contain complex expressions and aggregations. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg + ' The average' as average") + + +:param fields: Expression string that contains group keys and aggregate function calls. +:return: Result table. +""" +return Table(self._j_table.select(fields)) + + +class GroupWindowedTable(object): +""" +A table that has been windowed for :class:`GroupWindow`s. +""" + +def __init__(self, java_group_windowed_table): +self._java_table = java_group_windowed_table + +def group_by(self, fields): +""" +Groups the elements by a mandatory window and one or more optional grouping attributes. +The window is specified by referring to its alias. + +If no additional grouping attribute is specified and if the input is a streaming table, +the aggregation will be performed by a single task, i.e., with parallelism 1. + +Aggregations are performed per group and defined by a subsequent +:func:`~pyflink.table.WindowGroupedTable.select` clause similar to SQL SELECT-GROUP-BY +query. + +Example: +:: +>>> tab.window(groupWindow.alias("w")).group_by("w, key").select("key, value.avg") + +:param fields: Group keys. +:return: A :class:`WindowGroupedTable`. +""" +return WindowGroupedTable(self._java_table.groupBy(fields)) + + +class WindowGroupedTable(object): +""" +A table that has been windowed and grouped for :class:`GroupWindow`s. +""" + +def __init__(self, java_window_grouped_table): +self._java_table = java_window_grouped_table + +def select(self, fields): +""" +Performs a selection operation on a window grouped table. Similar to an SQL SELECT +statement. +The field expressions can contain complex expressions and aggregations. + +Example: +:: +>>> window_grouped_table.select("key, window.start, value.avg as valavg") + +:param fields: Expression string. +:return: Result table. +""" +return Table(self._java_table.select(fields)) + + +class OverWindowedTable(object): +""" +A table that has been windowed for :class:`OverWindow`s. + +Unlike group windows, which are specified in the GROUP BY clause, over windows do not collapse +rows. Instead over window aggregates compute an aggregate for each input row over a range of +its neighboring rows. +""" + +def __init__(self, java_over_windowed_table): +self._java_table = java_over_windowed_table Review comment: According to the comments of sunjincheng121, I think it is better to change current name to _j_table. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283633425 ## File path: flink-python/pyflink/table/table.py ## @@ -118,3 +463,112 @@ def insert_into(self, table_name): :param table_name: Name of the :class:`TableSink` to which the :class:`Table` is written. """ self._j_table.insertInto(table_name) + +def __str__(self): +return self._j_table.toString() + + +class GroupedTable(object): +""" +A table that has been grouped on a set of grouping keys. +""" + +def __init__(self, java_table): +self._j_table = java_table + +def select(self, fields): +""" +Performs a selection operation on a grouped table. Similar to an SQL SELECT statement. +The field expressions can contain complex expressions and aggregations. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg + ' The average' as average") + + +:param fields: Expression string that contains group keys and aggregate function calls. +:return: Result table. +""" +return Table(self._j_table.select(fields)) + + +class GroupWindowedTable(object): +""" +A table that has been windowed for :class:`GroupWindow`s. +""" + +def __init__(self, java_group_windowed_table): +self._java_table = java_group_windowed_table + +def group_by(self, fields): +""" +Groups the elements by a mandatory window and one or more optional grouping attributes. +The window is specified by referring to its alias. + +If no additional grouping attribute is specified and if the input is a streaming table, +the aggregation will be performed by a single task, i.e., with parallelism 1. + +Aggregations are performed per group and defined by a subsequent +:func:`~pyflink.table.WindowGroupedTable.select` clause similar to SQL SELECT-GROUP-BY +query. + +Example: +:: +>>> tab.window(groupWindow.alias("w")).group_by("w, key").select("key, value.avg") + +:param fields: Group keys. +:return: A :class:`WindowGroupedTable`. +""" +return WindowGroupedTable(self._java_table.groupBy(fields)) + + +class WindowGroupedTable(object): +""" +A table that has been windowed and grouped for :class:`GroupWindow`s. +""" + +def __init__(self, java_window_grouped_table): +self._java_table = java_window_grouped_table Review comment: According to the comments of sunjincheng121, I think it is better to change current name to _j_table. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283633274 ## File path: flink-python/pyflink/table/table.py ## @@ -118,3 +463,112 @@ def insert_into(self, table_name): :param table_name: Name of the :class:`TableSink` to which the :class:`Table` is written. """ self._j_table.insertInto(table_name) + +def __str__(self): +return self._j_table.toString() + + +class GroupedTable(object): +""" +A table that has been grouped on a set of grouping keys. +""" + +def __init__(self, java_table): +self._j_table = java_table Review comment: According to the comments of sunjincheng121, I think it is better to keep the current name. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-5243) Implement an example for BipartiteGraph
[ https://issues.apache.org/jira/browse/FLINK-5243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16839089#comment-16839089 ] Jasleen Kaur commented on FLINK-5243: - [~ivan.mushketyk] Can I take this issue? > Implement an example for BipartiteGraph > --- > > Key: FLINK-5243 > URL: https://issues.apache.org/jira/browse/FLINK-5243 > Project: Flink > Issue Type: Sub-task > Components: Library / Graph Processing (Gelly) >Reporter: Ivan Mushketyk >Priority: Major > Labels: beginner > > Should implement example for BipartiteGraph in gelly-examples project > similarly to examples for Graph class. > Depends on this: https://issues.apache.org/jira/browse/FLINK-2254 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283632055 ## File path: flink-python/pyflink/table/table.py ## @@ -106,6 +113,344 @@ def where(self, predicate): """ return Table(self._j_table.where(predicate)) +def group_by(self, fields): +""" +Groups the elements on some grouping keys. Use this before a selection with aggregations +to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg") + +:param fields: Group keys. +:return: The grouped table. +""" +return GroupedTable(self._j_table.groupBy(fields)) + +def distinct(self): +""" +Removes duplicate values and returns onl +Example: +:: +>>> tab.select("key, value").distinct() + +:return: Result table. +""" +return Table(self._j_table.distinct()) + +def join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. You can use where and select clauses after a join to further specify the +behaviour of the join. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` . + +Example: +:: +>>> left.join(right).where("a = b && c > 3").select("a, b, d") +>>> left.join(right, "a = b") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is not None: +return Table(self._j_table.join(right._j_table, join_predicate)) +else: +return Table(self._j_table.join(right._j_table)) + +def left_outer_join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL left outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.left_outer_join(right).select("a, b, d") +>>> left.left_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is None: +return Table(self._j_table.leftOuterJoin(right._j_table)) +else: +return Table(self._j_table.leftOuterJoin( +right._j_table, join_predicate)) + +def right_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL right outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283632066 ## File path: flink-python/pyflink/table/table.py ## @@ -106,6 +113,344 @@ def where(self, predicate): """ return Table(self._j_table.where(predicate)) +def group_by(self, fields): +""" +Groups the elements on some grouping keys. Use this before a selection with aggregations +to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg") + +:param fields: Group keys. +:return: The grouped table. +""" +return GroupedTable(self._j_table.groupBy(fields)) + +def distinct(self): +""" +Removes duplicate values and returns onl +Example: +:: +>>> tab.select("key, value").distinct() + +:return: Result table. +""" +return Table(self._j_table.distinct()) + +def join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. You can use where and select clauses after a join to further specify the +behaviour of the join. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` . + +Example: +:: +>>> left.join(right).where("a = b && c > 3").select("a, b, d") +>>> left.join(right, "a = b") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is not None: +return Table(self._j_table.join(right._j_table, join_predicate)) +else: +return Table(self._j_table.join(right._j_table)) + +def left_outer_join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL left outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.left_outer_join(right).select("a, b, d") +>>> left.left_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is None: +return Table(self._j_table.leftOuterJoin(right._j_table)) +else: +return Table(self._j_table.leftOuterJoin( +right._j_table, join_predicate)) + +def right_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL right outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.right_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +""" +return Table(self._j_table.rightOuterJoin( +right._j_table, join_predicate)) + +def full_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL full outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283631890 ## File path: flink-python/pyflink/table/tests/test_batch_table_api.py ## @@ -0,0 +1,418 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# # distributed under the License is distributed on an "AS IS" BASIS, +# # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# # See the License for the specific language governing permissions and +# # limitations under the License. + +import os + +from pyflink.table.types import DataTypes +from pyflink.table.window import Tumble, Slide, Session + +from pyflink.testing.test_case_utils import PyFlinkBatchTableTestCase + + +class BatchTableTests(PyFlinkBatchTableTestCase): + +def test_select_alias(self): Review comment: That makes sense. I have rework all tests in the new commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
wuchong commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283631948 ## File path: flink-python/pyflink/table/table.py ## @@ -106,6 +113,344 @@ def where(self, predicate): """ return Table(self._j_table.where(predicate)) +def group_by(self, fields): +""" +Groups the elements on some grouping keys. Use this before a selection with aggregations +to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg") + +:param fields: Group keys. +:return: The grouped table. +""" +return GroupedTable(self._j_table.groupBy(fields)) + +def distinct(self): +""" +Removes duplicate values and returns onl +Example: +:: +>>> tab.select("key, value").distinct() + +:return: Result table. +""" +return Table(self._j_table.distinct()) + +def join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. You can use where and select clauses after a join to further specify the +behaviour of the join. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` . + +Example: +:: +>>> left.join(right).where("a = b && c > 3").select("a, b, d") +>>> left.join(right, "a = b") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is not None: +return Table(self._j_table.join(right._j_table, join_predicate)) +else: +return Table(self._j_table.join(right._j_table)) + +def left_outer_join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL left outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.left_outer_join(right).select("a, b, d") +>>> left.left_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is None: +return Table(self._j_table.leftOuterJoin(right._j_table)) +else: +return Table(self._j_table.leftOuterJoin( +right._j_table, join_predicate)) + +def right_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL right outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.right_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +""" +return Table(self._j_table.rightOuterJoin( +right._j_table, join_predicate)) + +def full_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL full outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.full_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +""" +return Table(self._j_table.fullOuterJoin( +right._j_table, join_predicate)) + +def minus(self, right): +""" +Minus of two :class:`Table`s with duplicate records removed. +Similar to a SQL EXCEPT clause. Minus returns records from the left table
[GitHub] [flink] wuchong opened a new pull request #8436: [FLINK-12507][table-runtime-blink] Fix AsyncLookupJoin doesn't close all generated ResultFutures
wuchong opened a new pull request #8436: [FLINK-12507][table-runtime-blink] Fix AsyncLookupJoin doesn't close all generated ResultFutures URL: https://github.com/apache/flink/pull/8436 ## What is the purpose of the change The `AsyncLookupJoin` doesn't close all the generated ResultFutures (which contains UDFs in it). Because currently we iterate on the BlockingQueue to close. But when the job is failing (we are using `failingSource` to trigger restore), some ResultFutures may not be callback and not in the BlockingQueue. ## Brief change log - Put all the ResultFutures to an ArrayList, and iterate on the list to close ResultFutures. ## Verifying this change ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12507) Fix AsyncLookupJoin doesn't close all generated ResultFutures
[ https://issues.apache.org/jira/browse/FLINK-12507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12507: --- Labels: pull-request-available (was: ) > Fix AsyncLookupJoin doesn't close all generated ResultFutures > - > > Key: FLINK-12507 > URL: https://issues.apache.org/jira/browse/FLINK-12507 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > Labels: pull-request-available > > There is a fragile test in AsyncLookupJoinITCase, that not all the udfs are > closed at the end. > {code:java} > 02:40:48.787 [ERROR] Tests run: 22, Failures: 2, Errors: 0, Skipped: 0, Time > elapsed: 47.098 s <<< FAILURE! - in > org.apache.flink.table.runtime.stream.sql.AsyncLookupJoinITCase > 02:40:48.791 [ERROR] > testAsyncJoinTemporalTableWithUdfFilter[StateBackend=HEAP](org.apache.flink.table.runtime.stream.sql.AsyncLookupJoinITCase) > Time elapsed: 1.266 s <<< FAILURE! > java.lang.AssertionError: expected:<0> but was:<2> > at > org.apache.flink.table.runtime.stream.sql.AsyncLookupJoinITCase.testAsyncJoinTemporalTableWithUdfFilter(AsyncLookupJoinITCase.scala:268) > 02:40:48.794 [ERROR] > testAsyncJoinTemporalTableWithUdfFilter[StateBackend=ROCKSDB](org.apache.flink.table.runtime.stream.sql.AsyncLookupJoinITCase) > Time elapsed: 1.033 s <<< FAILURE! > java.lang.AssertionError: expected:<0> but was:<2> > at > org.apache.flink.table.runtime.stream.sql.AsyncLookupJoinITCase.testAsyncJoinTemporalTableWithUdfFilter(AsyncLookupJoinITCase.scala:268) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8436: [FLINK-12507][table-runtime-blink] Fix AsyncLookupJoin doesn't close all generated ResultFutures
flinkbot commented on issue #8436: [FLINK-12507][table-runtime-blink] Fix AsyncLookupJoin doesn't close all generated ResultFutures URL: https://github.com/apache/flink/pull/8436#issuecomment-492084529 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283631291 ## File path: flink-python/pyflink/table/window.py ## @@ -0,0 +1,482 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABCMeta + +from py4j.java_gateway import get_method +from pyflink.java_gateway import get_gateway + +__all__ = [ +'Tumble', +'Session', +'Slide', +'Over', +'GroupWindow', +'OverWindow' +] + + +class GroupWindow(object): Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12507) Fix AsyncLookupJoin doesn't close all generated ResultFutures
Jark Wu created FLINK-12507: --- Summary: Fix AsyncLookupJoin doesn't close all generated ResultFutures Key: FLINK-12507 URL: https://issues.apache.org/jira/browse/FLINK-12507 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Reporter: Jark Wu Assignee: Jark Wu There is a fragile test in AsyncLookupJoinITCase, that not all the udfs are closed at the end. {code:java} 02:40:48.787 [ERROR] Tests run: 22, Failures: 2, Errors: 0, Skipped: 0, Time elapsed: 47.098 s <<< FAILURE! - in org.apache.flink.table.runtime.stream.sql.AsyncLookupJoinITCase 02:40:48.791 [ERROR] testAsyncJoinTemporalTableWithUdfFilter[StateBackend=HEAP](org.apache.flink.table.runtime.stream.sql.AsyncLookupJoinITCase) Time elapsed: 1.266 s <<< FAILURE! java.lang.AssertionError: expected:<0> but was:<2> at org.apache.flink.table.runtime.stream.sql.AsyncLookupJoinITCase.testAsyncJoinTemporalTableWithUdfFilter(AsyncLookupJoinITCase.scala:268) 02:40:48.794 [ERROR] testAsyncJoinTemporalTableWithUdfFilter[StateBackend=ROCKSDB](org.apache.flink.table.runtime.stream.sql.AsyncLookupJoinITCase) Time elapsed: 1.033 s <<< FAILURE! java.lang.AssertionError: expected:<0> but was:<2> at org.apache.flink.table.runtime.stream.sql.AsyncLookupJoinITCase.testAsyncJoinTemporalTableWithUdfFilter(AsyncLookupJoinITCase.scala:268) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] wuchong commented on issue #8384: [FLINK-11610][docs-zh] Translate the "Examples" page into Chinese
wuchong commented on issue #8384: [FLINK-11610][docs-zh] Translate the "Examples" page into Chinese URL: https://github.com/apache/flink/pull/8384#issuecomment-492083048 Thanks @YueYeShen , the translation looks good to me now. Will merge it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 removed a comment on issue #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog
bowenli86 removed a comment on issue #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8434#issuecomment-492080766 @xuefuz @irui-apache @zjuwangg can you please take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog
bowenli86 commented on issue #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8434#issuecomment-492080788 @xuefuz @irui-apache @zjuwangg can you please take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 opened a new pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog
bowenli86 opened a new pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8434 ## What is the purpose of the change This PR supports view related operations in `HiveCatalog` and creates `HiveCatalogView`. ## Brief change log - added view related operations in `HiveCatalog` - created `HiveCatalogView` - enabled view related unit tests in `HiveCatalogView` ## Verifying this change This change is already covered by existing tests in `HiveCatalogTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) Documentation will be added later along with doc of `HiveCatalog` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 closed pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog
bowenli86 closed pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8434 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog
bowenli86 commented on issue #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8434#issuecomment-492080766 @xuefuz @irui-apache @zjuwangg can you please take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8433: [FLINK-12505][hive] Unify database operations to HiveCatalogBase from its subclasses
bowenli86 commented on issue #8433: [FLINK-12505][hive] Unify database operations to HiveCatalogBase from its subclasses URL: https://github.com/apache/flink/pull/8433#issuecomment-492080742 @xuefuz @irui-apache @zjuwangg can you please take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283624288 ## File path: flink-python/pyflink/table/table_config.py ## @@ -44,18 +47,55 @@ def parallelism(self): def parallelism(self, parallelism): self._parallelism = parallelism +@property +def timezone(self): +return self._j_table_config.getTimeZone().getID() + +@timezone.setter +def timezone(self, timezone_id): +if timezone_id is not None and isinstance(timezone_id, str): Review comment: The reason of using string here is that timezone class in python, `tzinfo` is a abstract class. It means that users could implements a custom timezone class by themselves. We can't translate the custom python timezone to java timezone without information loss. So the easier way is using timezone ID in [tz database](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones). It is supported both in python and java. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283622904 ## File path: flink-python/pyflink/table/table.py ## @@ -118,3 +463,112 @@ def insert_into(self, table_name): :param table_name: Name of the :class:`TableSink` to which the :class:`Table` is written. """ self._j_table.insertInto(table_name) + +def __str__(self): +return self._j_table.toString() + + +class GroupedTable(object): +""" +A table that has been grouped on a set of grouping keys. +""" + +def __init__(self, java_table): +self._j_table = java_table + +def select(self, fields): +""" +Performs a selection operation on a grouped table. Similar to an SQL SELECT statement. +The field expressions can contain complex expressions and aggregations. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg + ' The average' as average") + + +:param fields: Expression string that contains group keys and aggregate function calls. +:return: Result table. +""" +return Table(self._j_table.select(fields)) + + +class GroupWindowedTable(object): +""" +A table that has been windowed for :class:`GroupWindow`s. +""" + +def __init__(self, java_group_windowed_table): +self._java_table = java_group_windowed_table Review comment: That makes sense. I have renamed all the private java table variable to `_j_table ` in the new commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283622697 ## File path: flink-python/pyflink/table/table.py ## @@ -106,6 +113,344 @@ def where(self, predicate): """ return Table(self._j_table.where(predicate)) +def group_by(self, fields): +""" +Groups the elements on some grouping keys. Use this before a selection with aggregations +to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg") + +:param fields: Group keys. +:return: The grouped table. +""" +return GroupedTable(self._j_table.groupBy(fields)) + +def distinct(self): +""" +Removes duplicate values and returns onl +Example: +:: +>>> tab.select("key, value").distinct() + +:return: Result table. +""" +return Table(self._j_table.distinct()) + +def join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. You can use where and select clauses after a join to further specify the +behaviour of the join. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` . + +Example: +:: +>>> left.join(right).where("a = b && c > 3").select("a, b, d") +>>> left.join(right, "a = b") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is not None: +return Table(self._j_table.join(right._j_table, join_predicate)) +else: +return Table(self._j_table.join(right._j_table)) + +def left_outer_join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL left outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283622709 ## File path: flink-python/pyflink/table/table.py ## @@ -106,6 +113,344 @@ def where(self, predicate): """ return Table(self._j_table.where(predicate)) +def group_by(self, fields): +""" +Groups the elements on some grouping keys. Use this before a selection with aggregations +to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg") + +:param fields: Group keys. +:return: The grouped table. +""" +return GroupedTable(self._j_table.groupBy(fields)) + +def distinct(self): +""" +Removes duplicate values and returns onl +Example: +:: +>>> tab.select("key, value").distinct() + +:return: Result table. +""" +return Table(self._j_table.distinct()) + +def join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. You can use where and select clauses after a join to further specify the +behaviour of the join. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` . + +Example: +:: +>>> left.join(right).where("a = b && c > 3").select("a, b, d") +>>> left.join(right, "a = b") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is not None: +return Table(self._j_table.join(right._j_table, join_predicate)) +else: +return Table(self._j_table.join(right._j_table)) + +def left_outer_join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL left outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.left_outer_join(right).select("a, b, d") +>>> left.left_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is None: +return Table(self._j_table.leftOuterJoin(right._j_table)) +else: +return Table(self._j_table.leftOuterJoin( +right._j_table, join_predicate)) + +def right_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL right outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.right_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +""" +return Table(self._j_table.rightOuterJoin( +right._j_table, join_predicate)) + +def full_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL full outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.full_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +""" +return Table(self._j_table.fullOuterJoin( +right._j_table, join_predicate)) + +def minus(self, right): +""" +Minus of two :class:`Table`s with duplicate records removed. +Similar to a SQL EXCEPT clause. Minus returns records from the left
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283622685 ## File path: flink-python/pyflink/table/table.py ## @@ -106,6 +113,344 @@ def where(self, predicate): """ return Table(self._j_table.where(predicate)) +def group_by(self, fields): +""" +Groups the elements on some grouping keys. Use this before a selection with aggregations +to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg") + +:param fields: Group keys. +:return: The grouped table. +""" +return GroupedTable(self._j_table.groupBy(fields)) + +def distinct(self): +""" +Removes duplicate values and returns onl Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
zhijiangW commented on issue #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#issuecomment-492074867 As for `ResultPartitionFactory`: The `ShuffleService` already takes the role of factory for creating partition/gate. Of course we could further extract a specific factory component from `ShuffleService`. But I think the factory might be different for different `ShuffleService` implementations, and it is regarded as a private tool inside specific `ShuffleService` implementation. E.g. for `FSShuffleService#FSResultPartition`, we might do not need the institute of `subpartition`, only need the logic `subpartition` index. Also for interactive queries, if it wants to reuse the `ResultPartitionWriter`, it also does not need to construct `subpartition` inside specific instance. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
zhijiangW commented on issue #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#issuecomment-492072639 Commits for `PartitionBufferPoolFactory`: In my previous review I saw it was not in the constructor of `NetworkEnvironment` in that commit, and then changed into the constructor in the following commit. So I mean the first introduced commit should be squashed with following commit to make sure it is in the constructor when introduced. Now I see it is already in right state, maybe some UI issue for commit sequence you confirmed before. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
zhijiangW commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r283619819 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionBufferPoolFactory.java ## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; + +import java.io.IOException; + +/** + * Factory of {@link BufferPool} for network partitions or channels. + */ +public interface PartitionBufferPoolFactory { + BufferPool create(int size, ResultPartitionType type, BufferPoolOwner owner) throws IOException; Review comment: empty line before `create` method? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
zhijiangW commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r283619661 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ## @@ -151,7 +159,7 @@ public static NetworkEnvironment create( checkNotNull(taskEventPublisher), resultPartitionFactory, singleInputGateFactory, - resultPartitionBufferPoolFactory); + resultPartitionBufferPoolFactory, inputGateBufferPoolFactory); Review comment: inputGateBufferPoolFactory should be in separate line for the commit of `[hotfix][network] Introduce InputGateBufferPoolFactory` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
zhijiangW commented on issue #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#issuecomment-492070855 @azagrebin thanks for the confirmation. As for `PartitionBufferPoolFactory`: - The current `BufferPoolFactory` exists only for partition/gate, and no other parts would need `BufferPool` in flink stack atm. The new introduced `PartitionBufferPoolFactory` is also for partition/gate, but has no relationship with `BufferPoolFactory` in the architecture form. So I think it is not very specific for existing two parallel interfaces related with `BufferPool`. - We could also pass the `ResultPartitionType` in constructor of `AbstractPartitionBufferPoolFactory`, then the new proposed interface method is shown as `create(int size, BufferPoolOwner owner)`, so the only difference with previous methods in `BufferPoolFactory` is we do not pass the `maxUsedBuffers` which is calculated based on `ResultPartitionType`. - The key motivation of proposing new interface is we could only pass one parameter factory in constructor of `ResultPartition`, otherwise we need two parameters of `NetworkEnvironmentConfiguration` and `BufferPoolFactory`. But for the constructor of `SingleInputGate`, it seems the same because we could use `NetworkEnvironmentConfiguration` to replace the current `isCreditBased` parameter. - Another possible advantage of new interface is we extract the logic of creating `BufferPool` to another place, then it makes simple for `SingleInputGate`,`ResultPartition` and `NetworkEnvironment`. - From the aspect of extended architecture, it seems no problem if we did nothing. Because different `ShuffleService` implementations might need different `InputGate` and `ResultPartitionWriter` implementations. Even we might extract the current `BlockingSubpartition` as a separate implementation of `ResultPartitionWriter` in future, and it does not need `BufferPoolFactory` any more. In summary I do not think it is very necessary to introduce new interface here, or not very worthing. If we want to make constructor of partition/gate simple, we could only pass `Supplier` as you suggested before, and it could be explained by `NetworkEnvironment` itself. If the way of `Supplier` seems not good, I have another option: - Add new method `createBufferPool(int numRequiredBuffers, BufferPoolOwner owner)` in existing `BufferPoolFactory`. - Make `AbstractPartitionBufferPoolFactory` implements `BufferPoolFactory` and we could also pass `ResultPartitionType` into constructor of `AbstractPartitionBufferPoolFactory`. - `ResultPartitionBufferPoolFactory/InputGateBufferPoolFactory` extends `AbstractPartitionBufferPoolFactory` and they only need to implement the new method `createBufferPool(int numRequiredBuffers, BufferPoolOwner owner)`. To do so we extend and reuse the general `BufferPoolFactory` and only abstract/distinguish the implementation of calculating `maxPoolSize` in partition/gate. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on issue #8389: [FLINK-12399][table] Fix FilterableTableSource does not change after applyPredicate
godfreyhe commented on issue #8389: [FLINK-12399][table] Fix FilterableTableSource does not change after applyPredicate URL: https://github.com/apache/flink/pull/8389#issuecomment-492069318 @walterddr thanks for the fix. It's a little difficult for users to understand `explainSource` and implement it, even though we rename `explainSource` to `getSourceDigest`(or other names). could we keep the logic about `re-compute digest after push down` in rules ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8389: [FLINK-12399][table] Fix FilterableTableSource does not change after applyPredicate
godfreyhe commented on a change in pull request #8389: [FLINK-12399][table] Fix FilterableTableSource does not change after applyPredicate URL: https://github.com/apache/flink/pull/8389#discussion_r283616783 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala ## @@ -81,7 +81,13 @@ class FlinkLogicalTableSourceScan( val terms = super.explainTerms(pw) .item("fields", tableSource.getTableSchema.getFieldNames.mkString(", ")) -val sourceDesc = tableSource.explainSource() +val auxiliarySourceDesc = tableSource match { + case fts: FilterableTableSource[_] => +s"FilterPushDown=${fts.isFilterPushedDown.toString}" Review comment: `s"FilterPushDown=${fts.isFilterPushedDown.toString}"` is not the digest of a `FilterableTableSource `, consider the following case: there are several different filters to be pushed down. so, i think the digest should be the string value of the pushed filters. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on issue #8392: [FLINK-12370][python][travis] Integrated Travis for Python Table API.
WeiZhong94 commented on issue #8392: [FLINK-12370][python][travis] Integrated Travis for Python Table API. URL: https://github.com/apache/flink/pull/8392#issuecomment-492064023 @sunjincheng121 Thanks for your review! Sorry for these mistakes. I have fixed them in the new commit and ensure the travis works correctly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime
wuchong commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime URL: https://github.com/apache/flink/pull/8244#discussion_r283609527 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala ## @@ -0,0 +1,1025 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.sql + +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeProcessOperator +import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode +import org.apache.flink.table.runtime.utils.UserDefinedFunctionTestUtils.{CountNullNonNull, CountPairs, LargerThanCount} +import org.apache.flink.table.runtime.utils.{StreamTestData, StreamingWithStateTestBase, TestingAppendSink} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.mutable + +@RunWith(classOf[Parameterized]) +class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode) { Review comment: Proctime over aggregates can't be covered by IT cases, because the result will be non-deterministic. `ProcTimeBoundedRangeOver` and `ProcTimeBoundedRowsOver` have been covered by `OverWindowHarnessTest`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Mr-Nineteen closed pull request #8229: [FLINK-12273]The default value of CheckpointRetentionPolicy should be …
Mr-Nineteen closed pull request #8229: [FLINK-12273]The default value of CheckpointRetentionPolicy should be … URL: https://github.com/apache/flink/pull/8229 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Mr-Nineteen commented on issue #8229: [FLINK-12273]The default value of CheckpointRetentionPolicy should be …
Mr-Nineteen commented on issue #8229: [FLINK-12273]The default value of CheckpointRetentionPolicy should be … URL: https://github.com/apache/flink/pull/8229#issuecomment-492060134 @StephanEwen Ok. I think it's important to change this default.If some users don't know much about flink, it's easy to bring huge losses in production.Therefore, flink needs to consider this aspect and also bring a better experience to users. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12443) Replace InternalType with LogicalType in blink runner
[ https://issues.apache.org/jira/browse/FLINK-12443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12443: --- Labels: pull-request-available (was: ) > Replace InternalType with LogicalType in blink runner > - > > Key: FLINK-12443 > URL: https://issues.apache.org/jira/browse/FLINK-12443 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > > LogicalType can cover all InternalType, and its function is more perfect. > With isNullable, we can do null check optimizer in code generator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink
flinkbot commented on issue #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink URL: https://github.com/apache/flink/pull/8435#issuecomment-492059404 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi opened a new pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink
JingsongLi opened a new pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink URL: https://github.com/apache/flink/pull/8435 ## What is the purpose of the change Use new LogicalType in blink planner. NOTE: Use only LogicalType to override the previous InternalType functionality. Not implementing the new functionality of LogicalType. LogicalType and InternalType: 1.LogicalType have nullable, so it has no singleton object. 2.YearMonthIntervalType and DayTimeIntervalType is an independent type. 3.MultisetType is not a MapType. ## Verifying this change ut ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12405) Introduce BLOCKING_PERSISTENT result partition type
[ https://issues.apache.org/jira/browse/FLINK-12405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruidong Li updated FLINK-12405: --- Description: The new {{ResultPartitionType}} : {{BLOCKING_PERSISTENT}} is similar to {{BLOCKING}} except it might be consumed for several times and will be released after TM shutdown or {{ResultPartition}} removal request. This is the basis for Interactive Programming. Here is the brief changes: * Introduce {{ResultPartitionType}} called {{BLOCKING_PERSISTENT}} * Introduce {{BlockingShuffleOutputFormat}} and {{BlockingShuffleDataSink}} which are used to generate {{GenericDataSinkBase}} with user specified {{IntermediateDataSetID}} (passed from TableAPI in later PR) * when {{JobGraphGenerator}} sees a {{GenericDataSinkBase}} with {{IntermediateDataSetID}}, it creates a {{IntermediateDataSet}} with this id, then {{OutputFormatVertex}} for this {{GenericDataSinkBase}} will be excluded in {{JobGraph}} * So the JobGraph may contains some JobVertex which has more {{IntermediateDataSet}} than its downstream consumers. Here are some design notes: * Why modify {{DataSet}} and {{JobGraphGenerator}} Since Blink Planner is not ready yet, and Batch Table is running on Flink Planner(based on DataSet). There will be another implementation once Blink Planner is ready. * Why use a special {{OutputFormat}} as placeholder We could add a {{cache()}} method for DataSet, but we do not want to change DataSet API any more. so a special {{OutputFormat}} as placeholder seems reasonable. was: The new {{ResultPartitionType}} : {{BLOCKING_PERSISTENT}} is similar to {{BLOCKING}} except it might be consumed for several times and will be released after TM shutdown or {{ResultPartition}} removal request. This is the basis for Interactive Programming. > Introduce BLOCKING_PERSISTENT result partition type > --- > > Key: FLINK-12405 > URL: https://issues.apache.org/jira/browse/FLINK-12405 > Project: Flink > Issue Type: Sub-task > Components: API / DataSet >Reporter: Ruidong Li >Assignee: Ruidong Li >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The new {{ResultPartitionType}} : {{BLOCKING_PERSISTENT}} is similar to > {{BLOCKING}} except it might be consumed for several times and will be > released after TM shutdown or {{ResultPartition}} removal request. > This is the basis for Interactive Programming. > Here is the brief changes: > * Introduce {{ResultPartitionType}} called {{BLOCKING_PERSISTENT}} > * Introduce {{BlockingShuffleOutputFormat}} and {{BlockingShuffleDataSink}} > which are used to generate {{GenericDataSinkBase}} with user specified > {{IntermediateDataSetID}} (passed from TableAPI in later PR) > * when {{JobGraphGenerator}} sees a {{GenericDataSinkBase}} with > {{IntermediateDataSetID}}, it creates a {{IntermediateDataSet}} with this id, > then {{OutputFormatVertex}} for this {{GenericDataSinkBase}} will be excluded > in {{JobGraph}} > * So the JobGraph may contains some JobVertex which has more > {{IntermediateDataSet}} than its downstream consumers. > Here are some design notes: > * Why modify {{DataSet}} and {{JobGraphGenerator}} > Since Blink Planner is not ready yet, and Batch Table is running on Flink > Planner(based on DataSet). > There will be another implementation once Blink Planner is ready. > * Why use a special {{OutputFormat}} as placeholder > We could add a {{cache()}} method for DataSet, but we do not want to change > DataSet API any more. so a special {{OutputFormat}} as placeholder seems > reasonable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime
KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime URL: https://github.com/apache/flink/pull/8244#discussion_r283604284 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.java ## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.JoinedRow; +import org.apache.flink.table.dataview.PerKeyStateDataViewStore; +import org.apache.flink.table.generated.AggsHandleFunction; +import org.apache.flink.table.generated.GeneratedAggsHandleFunction; +import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState; +import org.apache.flink.table.type.InternalType; +import org.apache.flink.table.typeutils.BaseRowTypeInfo; +import org.apache.flink.util.Collector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.ListIterator; + +/** + * A basic implementation to support unbounded event-time over-window. + */ +public abstract class RowTimeUnboundedOver extends KeyedProcessFunctionWithCleanupState { Review comment: AbstractRowTimeUnboundedPrecedingOver This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime
KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime URL: https://github.com/apache/flink/pull/8244#discussion_r283603446 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.JoinedRow; +import org.apache.flink.table.dataview.PerKeyStateDataViewStore; +import org.apache.flink.table.generated.AggsHandleFunction; +import org.apache.flink.table.generated.GeneratedAggsHandleFunction; +import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState; +import org.apache.flink.table.type.InternalType; +import org.apache.flink.table.typeutils.BaseRowTypeInfo; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Process Function for ROW clause processing-time bounded OVER window. + */ +public class ProcTimeBoundedRowsOver extends KeyedProcessFunctionWithCleanupState { Review comment: ProcTimeRowsBoundedPrecedingFunction, and update comment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime
KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime URL: https://github.com/apache/flink/pull/8244#discussion_r283603987 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.java ## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.JoinedRow; +import org.apache.flink.table.dataview.PerKeyStateDataViewStore; +import org.apache.flink.table.generated.AggsHandleFunction; +import org.apache.flink.table.generated.GeneratedAggsHandleFunction; +import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState; +import org.apache.flink.table.type.InternalType; +import org.apache.flink.table.typeutils.BaseRowTypeInfo; +import org.apache.flink.util.Collector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Process Function used for the aggregate in bounded proc-time OVER window. + */ +public class ProcTimeBoundedRangeOver extends KeyedProcessFunctionWithCleanupState { Review comment: and also update the comment of this class to show this class is responsible for range bounded preceding to current row, maybe also give some query examples. You can refer to the comment of the sub classes of `OverWindowFrame` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime
KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime URL: https://github.com/apache/flink/pull/8244#discussion_r283603486 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate; + +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.JoinedRow; +import org.apache.flink.table.dataview.PerKeyStateDataViewStore; +import org.apache.flink.table.generated.AggsHandleFunction; +import org.apache.flink.table.generated.GeneratedAggsHandleFunction; +import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState; +import org.apache.flink.table.type.InternalType; +import org.apache.flink.table.typeutils.BaseRowTypeInfo; +import org.apache.flink.util.Collector; + +/** + * Process Function for processing-time unbounded OVER window. + */ +public class ProcTimeUnboundedOver extends KeyedProcessFunctionWithCleanupState { Review comment: ProcTimeUnboundedPrecedingFunction, and update comment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime
KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime URL: https://github.com/apache/flink/pull/8244#discussion_r283598406 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecOverAggregate.scala ## @@ -134,6 +144,325 @@ class StreamExecOverAggregate( override protected def translateToPlanInternal( tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = { -throw new TableException("Implements this") +val tableConfig = tableEnv.getConfig + +if (logicWindow.groups.size > 1) { + throw new TableException( + "All aggregates must be computed on the same window.") +} + +val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0) Review comment: why use full package name here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime
KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime URL: https://github.com/apache/flink/pull/8244#discussion_r283607890 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala ## @@ -0,0 +1,1025 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.sql + +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeProcessOperator +import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode +import org.apache.flink.table.runtime.utils.UserDefinedFunctionTestUtils.{CountNullNonNull, CountPairs, LargerThanCount} +import org.apache.flink.table.runtime.utils.{StreamTestData, StreamingWithStateTestBase, TestingAppendSink} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.mutable + +@RunWith(classOf[Parameterized]) +class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode) { Review comment: Please add some more tests to cover the logic of `ProcTimeBoundedRangeOver` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime
KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime URL: https://github.com/apache/flink/pull/8244#discussion_r283603324 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.java ## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.JoinedRow; +import org.apache.flink.table.dataview.PerKeyStateDataViewStore; +import org.apache.flink.table.generated.AggsHandleFunction; +import org.apache.flink.table.generated.GeneratedAggsHandleFunction; +import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState; +import org.apache.flink.table.type.InternalType; +import org.apache.flink.table.typeutils.BaseRowTypeInfo; +import org.apache.flink.util.Collector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Process Function used for the aggregate in bounded proc-time OVER window. + */ +public class ProcTimeBoundedRangeOver extends KeyedProcessFunctionWithCleanupState { Review comment: ProcTimeRangeBoundedPrecedingFunction will be more accurate This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime
KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime URL: https://github.com/apache/flink/pull/8244#discussion_r283602389 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.java ## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate; Review comment: Could you move all over related classes to package `over`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12380) Add thread name in the log4j.properties
[ https://issues.apache.org/jira/browse/FLINK-12380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-12380. Resolution: Information Provided > Add thread name in the log4j.properties > --- > > Key: FLINK-12380 > URL: https://issues.apache.org/jira/browse/FLINK-12380 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > This is inspired by FLINK-12368 when users want to add sub-task index > information in the source code. We could add thread name, which already > contains sub-task index information, in the logs to avoid have to change the > source code. > Moreover, I found existing {{logback.xml}} in Flink already contains {{thread > name}} information. We should also add this in the {{log4j.properties.}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime
KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime URL: https://github.com/apache/flink/pull/8244#discussion_r283604242 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.java ## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.JoinedRow; +import org.apache.flink.table.dataview.PerKeyStateDataViewStore; +import org.apache.flink.table.generated.AggsHandleFunction; +import org.apache.flink.table.generated.GeneratedAggsHandleFunction; +import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState; +import org.apache.flink.table.type.InternalType; +import org.apache.flink.table.typeutils.BaseRowTypeInfo; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Process Function for ROWS clause event-time bounded OVER window. + */ +public class RowTimeBoundedRowsOver extends KeyedProcessFunctionWithCleanupState { Review comment: RowTimeRowsBoundedPrecedingFunction, and update comment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime
KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime URL: https://github.com/apache/flink/pull/8244#discussion_r283604194 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.java ## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.JoinedRow; +import org.apache.flink.table.dataview.PerKeyStateDataViewStore; +import org.apache.flink.table.generated.AggsHandleFunction; +import org.apache.flink.table.generated.GeneratedAggsHandleFunction; +import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState; +import org.apache.flink.table.type.InternalType; +import org.apache.flink.table.typeutils.BaseRowTypeInfo; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Process Function for RANGE clause event-time bounded OVER window. + */ +public class RowTimeBoundedRangeOver extends KeyedProcessFunctionWithCleanupState { Review comment: RowTimeRangeBoundedPrecedingFunction, and update comment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime
KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime URL: https://github.com/apache/flink/pull/8244#discussion_r283604565 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedRowsOver.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate; + +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.generated.GeneratedAggsHandleFunction; +import org.apache.flink.table.type.InternalType; +import org.apache.flink.util.Collector; + +import java.util.List; + +/** + * A ProcessFunction to support unbounded ROWS window. + * The ROWS clause defines on a physical level how many rows are included in a window frame. + */ +public class RowTimeUnboundedRowsOver extends RowTimeUnboundedOver { Review comment: RowTimeRowsUnboundedPrecedingFunction, and update comment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime
KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime URL: https://github.com/apache/flink/pull/8244#discussion_r283604420 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedRangeOver.java ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate; + +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.generated.GeneratedAggsHandleFunction; +import org.apache.flink.table.type.InternalType; +import org.apache.flink.util.Collector; + +import java.util.List; + +/** + * A ProcessFunction to support unbounded RANGE window. + * The RANGE option includes all the rows within the window frame + * that have the same ORDER BY values as the current row. + */ +public class RowTimeUnboundedRangeOver extends RowTimeUnboundedOver { Review comment: RowTimeRangeUnboundedPrecedingFunction, and update comment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime
KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime URL: https://github.com/apache/flink/pull/8244#discussion_r283600244 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecOverAggregate.scala ## @@ -134,6 +144,325 @@ class StreamExecOverAggregate( override protected def translateToPlanInternal( tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = { -throw new TableException("Implements this") +val tableConfig = tableEnv.getConfig + +if (logicWindow.groups.size > 1) { + throw new TableException( + "All aggregates must be computed on the same window.") +} + +val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0) + +val orderKeys = overWindow.orderKeys.getFieldCollations + +if (orderKeys.size() != 1) { + throw new TableException( + "The window can only be ordered by a single time column.") +} +val orderKey = orderKeys.get(0) + +if (!orderKey.direction.equals(ASCENDING)) { + throw new TableException( + "The window can only be ordered in ASCENDING mode.") +} + +val inputDS = getInputNodes.get(0).translateToPlan(tableEnv) + .asInstanceOf[StreamTransformation[BaseRow]] + +val inputIsAccRetract = StreamExecRetractionRules.isAccRetract(input) + +if (inputIsAccRetract) { + throw new TableException( + "Retraction on Over window aggregation is not supported yet. " + +"Note: Over window aggregation should not follow a non-windowed GroupBy aggregation.") +} + +if (!logicWindow.groups.get(0).keys.isEmpty && tableConfig.getMinIdleStateRetentionTime < 0) { + LOG.warn( +"No state retention interval configured for a query which accumulates state. " + + "Please provide a query configuration with valid retention interval to prevent " + + "excessive state size. You may specify a retention time of 0 to not clean up the state.") +} + +val timeType = outputRowType.getFieldList.get(orderKey.getFieldIndex).getType + +// check time field +if (!FlinkTypeFactory.isRowtimeIndicatorType(timeType) + && !FlinkTypeFactory.isProctimeIndicatorType(timeType)) { + throw new TableException( +"OVER windows' ordering in stream mode must be defined on a time attribute.") +} + +// identify window rowtime attribute +val rowTimeIdx: Option[Int] = if (FlinkTypeFactory.isRowtimeIndicatorType(timeType)) { + Some(orderKey.getFieldIndex) +} else if (FlinkTypeFactory.isProctimeIndicatorType(timeType)) { + None +} else { + throw new TableException( + "OVER windows can only be applied on time attributes.") +} + +val config = tableEnv.getConfig Review comment: already got table config in line 147 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime
KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime URL: https://github.com/apache/flink/pull/8244#discussion_r283598342 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecOverAggregate.scala ## @@ -17,25 +17,35 @@ */ package org.apache.flink.table.plan.nodes.physical.stream -import org.apache.flink.streaming.api.transformations.StreamTransformation -import org.apache.flink.table.CalcitePair -import org.apache.flink.table.api.{StreamTableEnvironment, TableException} -import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.dataformat.BaseRow -import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode} -import org.apache.flink.table.plan.util.RelExplainUtil - import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.Window.Group import org.apache.calcite.rel.core.{AggregateCall, Window} import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.calcite.rex.RexLiteral +import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING Review comment: Please keep import order align with other files, you can find some explaination here: https://flink.apache.org/contribute-code.html#imports This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12506) Add more over window unit tests
Kurt Young created FLINK-12506: -- Summary: Add more over window unit tests Key: FLINK-12506 URL: https://issues.apache.org/jira/browse/FLINK-12506 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime, Tests Reporter: Kurt Young We only have ITCase for streaming over window, need to add more unit tests for various process functions -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Armstrongya commented on a change in pull request #8366: [FLINK-12415][doc-zh]Translate HistoryServer page into Chinese
Armstrongya commented on a change in pull request #8366: [FLINK-12415][doc-zh]Translate HistoryServer page into Chinese URL: https://github.com/apache/flink/pull/8366#discussion_r283605925 ## File path: docs/monitoring/historyserver.zh.md ## @@ -22,62 +22,62 @@ specific language governing permissions and limitations under the License. --> -Flink has a history server that can be used to query the statistics of completed jobs after the corresponding Flink cluster has been shut down. +Flink 自带的 history server 可以在已执行完作业对应的 Flink 集群关闭之后查询该作业的统计信息。 -Furthermore, it exposes a REST API that accepts HTTP requests and responds with JSON data. +除此之外,它还提供了一个 REST API,可以通过 HTTP 以 JSON 格式发送和接收数据。 * This will be replaced by the TOC {:toc} -## Overview +## 概览 -The HistoryServer allows you to query the status and statistics of completed jobs that have been archived by a JobManager. +History server 可以查询被 JobManager 存档的已执行完作业的状态和统计信息。 -After you have configured the HistoryServer *and* JobManager, you start and stop the HistoryServer via its corresponding startup script: +在配置完 History server 和 JobManager 之后,用户可以通过下面的启动脚本来开启和关停 History server: {% highlight shell %} -# Start or stop the HistoryServer +# 开启或关停 History Server bin/historyserver.sh (start|start-foreground|stop) {% endhighlight %} -By default, this server binds to `localhost` and listens at port `8082`. +默认情况下,History server 绑定到本机 `localhost` 的 `8082` 端口。 -Currently, you can only run it as a standalone process. +目前你可以把它当做单独的进程来运行。 -## Configuration +## 配置 -The configuration keys `jobmanager.archive.fs.dir` and `historyserver.archive.fs.refresh-interval` need to be adjusted for archiving and displaying archived jobs. +存档和展示已完成的作业 需要调配 `jobmanager.archive.fs.dir` 和 `historyserver.archive.fs.refresh-interval` 这俩配置项。 **JobManager** -The archiving of completed jobs happens on the JobManager, which uploads the archived job information to a file system directory. You can configure the directory to archive completed jobs in `flink-conf.yaml` by setting a directory via `jobmanager.archive.fs.dir`. +JobManager 会进行已完成作业的存档,把已存档作业的信息上传到一个文件系统目录上。用户可以通过设置 `jobmanager.archive.fs.dir` 来配置这个存档目录,将 `flink-conf.yaml` 中已完成的作业都存档下来。 {% highlight yaml %} -# Directory to upload completed job information +# 已完成作业信息的上传目录 jobmanager.archive.fs.dir: hdfs:///completed-jobs {% endhighlight %} **HistoryServer** -The HistoryServer can be configured to monitor a comma-separated list of directories in via `historyserver.archive.fs.dir`. The configured directories are regularly polled for new archives; the polling interval can be configured via `historyserver.archive.fs.refresh-interval`. +History Server 可以监控 `historyserver.archive.fs.dir` 配置的用逗号分隔的文件目录列表。并且会为新存档定期轮询已配置的目录,轮询的间隔可以通过 `historyserver.archive.fs.refresh-interval` 来配置。 Review comment: @wuchong Thanks for your comments. It's more reasonable and fluent. I have optimized the translation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283605276 ## File path: flink-python/pyflink/table/tests/test_batch_table_api.py ## @@ -0,0 +1,418 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# # distributed under the License is distributed on an "AS IS" BASIS, +# # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# # See the License for the specific language governing permissions and +# # limitations under the License. + +import os + +from pyflink.table.types import DataTypes +from pyflink.table.window import Tumble, Slide, Session + +from pyflink.testing.test_case_utils import PyFlinkBatchTableTestCase + + +class BatchTableTests(PyFlinkBatchTableTestCase): Review comment: Good idea! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283604935 ## File path: flink-python/pyflink/table/table_config.py ## @@ -44,18 +47,55 @@ def parallelism(self): def parallelism(self, parallelism): self._parallelism = parallelism +@property +def timezone(self): +return self._j_table_config.getTimeZone().getID() + +@timezone.setter +def timezone(self, timezone_id): +if timezone_id is not None and isinstance(timezone_id, str): +j_timezone = self._jvm.java.util.TimeZone.getTimeZone(timezone_id) +self._j_table_config.setTimeZone(j_timezone) +else: +raise Exception("TableConfig.timezone should be a string!") + +@property +def null_check(self): +return self._j_table_config.getNullCheck() + +@null_check.setter +def null_check(self, null_check): +if null_check is not None and isinstance(null_check, bool): +self._j_table_config.setNullCheck(null_check) +else: +raise Exception("TableConfig.null_check should be a bool value!") + +@property +def max_generated_code_length(self): +return self._j_table_config.getMaxGeneratedCodeLength() Review comment: I think this method is needed as Python API program is just an API and the Java code-gen logic is shared by both Java/Scala/Python API. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] kisimple commented on issue #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor
kisimple commented on issue #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor URL: https://github.com/apache/flink/pull/7757#issuecomment-492054384 Highly appreciate your review :) @azagrebin I have addressed the comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8420: [FLINK-12408][python] Allow to define the data types in Python
flinkbot edited a comment on issue #8420: [FLINK-12408][python] Allow to define the data types in Python URL: https://github.com/apache/flink/pull/8420#issuecomment-491604302 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❗ 3. Needs [attention] from. - Needs attention by @twalthr [PMC] * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8420: [FLINK-12408][python] Allow to define the data types in Python
sunjincheng121 commented on issue #8420: [FLINK-12408][python] Allow to define the data types in Python URL: https://github.com/apache/flink/pull/8420#issuecomment-492054007 @flinkbot attention @twalthr This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-12494) JDBCOutputFormat support reconnect when link failure and flush by timeInterval
[ https://issues.apache.org/jira/browse/FLINK-12494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhaoshijie reassigned FLINK-12494: -- Assignee: zhaoshijie > JDBCOutputFormat support reconnect when link failure and flush by timeInterval > -- > > Key: FLINK-12494 > URL: https://issues.apache.org/jira/browse/FLINK-12494 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: 1.8.0 >Reporter: zhaoshijie >Assignee: zhaoshijie >Priority: Major > > when i JDBCSink(flink-1.4.2) wite recode to mysql,find exception as flow : > > {code:java} > java.util.concurrent.ExecutionException: > com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link > failure > The last packet successfully received from the server was 265,251 > milliseconds ago. The last packet sent successfully to the server was 265,252 > milliseconds ago. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.get(StreamRecordQueueEntry.java:68) > at > org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:129) > ... 2 more > Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: > Communications link failure > The last packet successfully received from the server was 265,251 > milliseconds ago. The last packet sent successfully to the server was 265,252 > milliseconds ago. > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at com.mysql.jdbc.Util.handleNewInstance(Util.java:411) > at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1116) > at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3364) > at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1983) > at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2163) > at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2624) > at > com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2127) > at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2293) > at > org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction.executeQuery(JDBCDimensionTableFunction.scala:199) > at > org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction.executeQueryAndCombine(JDBCDimensionTableFunction.scala:139) > at > org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction$$anon$1.apply(JDBCDimensionTableFunction.scala:83) > at > org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction$$anon$1.apply(JDBCDimensionTableFunction.scala:73) > at > org.apache.flink.streaming.api.functions.async.DimensionTableJoinFunction.lambda$asyncInvoke$0(DimensionTableJoinFunction.java:105) > at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:97) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > Caused by: java.net.SocketException: Connection reset > at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) > at java.net.SocketOutputStream.write(SocketOutputStream.java:153) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) > at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3345) > ... 16 more > {code} > i think it is too long not write record by connection(idleConnection),server > close connection initiative. sparse data is relatively common in fact, so i > think we should reconnect when then connection is invalid。 > besides,i find JDBCOutputFormat.flush only call by snapshotState method and > "batchCount >= batchInterval",also if ours sink records is sparse, we will > find actual write happended by very large time delay,so should we add a flush > condition:currentTime- lastFlushTime > timeInterval? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283585908 ## File path: flink-python/pyflink/table/tests/test_batch_table_api.py ## @@ -0,0 +1,418 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# # distributed under the License is distributed on an "AS IS" BASIS, +# # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# # See the License for the specific language governing permissions and +# # limitations under the License. + +import os + +from pyflink.table.types import DataTypes +from pyflink.table.window import Tumble, Slide, Session + +from pyflink.testing.test_case_utils import PyFlinkBatchTableTestCase + + +class BatchTableTests(PyFlinkBatchTableTestCase): + +def test_select_alias(self): Review comment: I think we should align the test structure with Java/Scala testing, which I mentioned in the test framework PR. ![image](https://user-images.githubusercontent.com/22488084/57662378-d7523c80-7621-11e9-92f2-59b58f9aafdb.png) So it's better to classification the test. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283601896 ## File path: flink-python/pyflink/table/table_config.py ## @@ -44,18 +47,55 @@ def parallelism(self): def parallelism(self, parallelism): self._parallelism = parallelism +@property +def timezone(self): +return self._j_table_config.getTimeZone().getID() + +@timezone.setter +def timezone(self, timezone_id): Review comment: Add some Doc, such as: `the timezone_id for a timezone, either an abbreviation such as "PST", a full name such as "America/Los_Angeles", or a custom timezone_id such as "GMT-8:00". ` What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283588400 ## File path: flink-python/pyflink/table/query_config.py ## @@ -0,0 +1,108 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABCMeta +from datetime import timedelta +from py4j.compat import long + +from pyflink.java_gateway import get_gateway + + +class QueryConfig(object): +""" +The :class:`QueryConfig` holds parameters to configure the behavior of queries. +""" + +__metaclass__ = ABCMeta + +def __init__(self, j_query_config): +self._j_query_config = j_query_config + + +class StreamQueryConfig(QueryConfig): +""" +The :class:`StreamQueryConfig` holds parameters to configure the behavior of streaming queries. +""" + +def __init__(self, j_stream_query_config=None): +self._jvm = get_gateway().jvm +if j_stream_query_config is not None: +self._j_stream_query_config = j_stream_query_config +else: +self._j_stream_query_config = self._jvm.StreamQueryConfig() +super(StreamQueryConfig, self).__init__(self._j_stream_query_config) + +def with_idle_state_retention_time(self, min_time, max_time): +""" +Specifies a minimum and a maximum time interval for how long idle state, i.e., state which +was not updated, will be retained. + +State will never be cleared until it was idle for less than the minimum time and will never +be kept if it was idle for more than the maximum time. + +When new data arrives for previously cleaned-up state, the new data will be handled as if it +was the first data. This can result in previous results being overwritten. + +Set to 0 (zero) to never clean-up the state. + +.. note:: +Cleaning up state requires additional bookkeeping which becomes less expensive for +larger differences of minTime and maxTime. The difference between minTime and maxTime +must be at least 5 minutes. + +:param min_time: The minimum time interval for which idle state is retained. Set to 0(zero) + to never clean-up the state. +:param max_time: The maximum time interval for which idle state is retained. Must be at + least 5 minutes greater than minTime. Set to 0 (zero) to never clean-up + the state. +:return: :class:`StreamQueryConfig` +""" +# type: (timedelta, timedelta) -> StreamQueryConfig +j_time_class = self._jvm.org.apache.flink.api.common.time.Time +j_min_time = j_time_class.milliseconds(long(round(min_time.total_seconds() * 1000))) +j_max_time = j_time_class.milliseconds(long(round(max_time.total_seconds() * 1000))) +self._j_stream_query_config = \ +self._j_stream_query_config.withIdleStateRetentionTime(j_min_time, j_max_time) +return self + +def get_min_idle_state_retention_time(self): +""" +:return: The minimum time until state which was not updated will be retained. Review comment: Add `State might be cleared and removed if it was not updated for the defined period of time.` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283589060 ## File path: flink-python/pyflink/table/query_config.py ## @@ -0,0 +1,108 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABCMeta +from datetime import timedelta +from py4j.compat import long + +from pyflink.java_gateway import get_gateway + + +class QueryConfig(object): +""" +The :class:`QueryConfig` holds parameters to configure the behavior of queries. +""" + +__metaclass__ = ABCMeta + +def __init__(self, j_query_config): +self._j_query_config = j_query_config + + +class StreamQueryConfig(QueryConfig): +""" +The :class:`StreamQueryConfig` holds parameters to configure the behavior of streaming queries. +""" + +def __init__(self, j_stream_query_config=None): +self._jvm = get_gateway().jvm +if j_stream_query_config is not None: +self._j_stream_query_config = j_stream_query_config +else: +self._j_stream_query_config = self._jvm.StreamQueryConfig() +super(StreamQueryConfig, self).__init__(self._j_stream_query_config) + +def with_idle_state_retention_time(self, min_time, max_time): Review comment: Python Doc needs to add time units, which is different from Java. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283588406 ## File path: flink-python/pyflink/table/query_config.py ## @@ -0,0 +1,108 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABCMeta +from datetime import timedelta +from py4j.compat import long + +from pyflink.java_gateway import get_gateway + + +class QueryConfig(object): +""" +The :class:`QueryConfig` holds parameters to configure the behavior of queries. +""" + +__metaclass__ = ABCMeta + +def __init__(self, j_query_config): +self._j_query_config = j_query_config + + +class StreamQueryConfig(QueryConfig): +""" +The :class:`StreamQueryConfig` holds parameters to configure the behavior of streaming queries. +""" + +def __init__(self, j_stream_query_config=None): +self._jvm = get_gateway().jvm +if j_stream_query_config is not None: +self._j_stream_query_config = j_stream_query_config +else: +self._j_stream_query_config = self._jvm.StreamQueryConfig() +super(StreamQueryConfig, self).__init__(self._j_stream_query_config) + +def with_idle_state_retention_time(self, min_time, max_time): +""" +Specifies a minimum and a maximum time interval for how long idle state, i.e., state which +was not updated, will be retained. + +State will never be cleared until it was idle for less than the minimum time and will never +be kept if it was idle for more than the maximum time. + +When new data arrives for previously cleaned-up state, the new data will be handled as if it +was the first data. This can result in previous results being overwritten. + +Set to 0 (zero) to never clean-up the state. + +.. note:: +Cleaning up state requires additional bookkeeping which becomes less expensive for +larger differences of minTime and maxTime. The difference between minTime and maxTime +must be at least 5 minutes. + +:param min_time: The minimum time interval for which idle state is retained. Set to 0(zero) + to never clean-up the state. +:param max_time: The maximum time interval for which idle state is retained. Must be at + least 5 minutes greater than minTime. Set to 0 (zero) to never clean-up + the state. +:return: :class:`StreamQueryConfig` +""" +# type: (timedelta, timedelta) -> StreamQueryConfig +j_time_class = self._jvm.org.apache.flink.api.common.time.Time +j_min_time = j_time_class.milliseconds(long(round(min_time.total_seconds() * 1000))) +j_max_time = j_time_class.milliseconds(long(round(max_time.total_seconds() * 1000))) +self._j_stream_query_config = \ +self._j_stream_query_config.withIdleStateRetentionTime(j_min_time, j_max_time) +return self + +def get_min_idle_state_retention_time(self): +""" +:return: The minimum time until state which was not updated will be retained. +""" +# type: () -> int +return self._j_stream_query_config.getMinIdleStateRetentionTime() + +def get_max_idle_state_retention_time(self): +""" +:return: The maximum time until state which was not updated will be retained. Review comment: Add `State will be cleared and removed if it was not updated for the defined period of time.` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283589635 ## File path: flink-python/pyflink/table/table.py ## @@ -106,6 +113,344 @@ def where(self, predicate): """ return Table(self._j_table.where(predicate)) +def group_by(self, fields): +""" +Groups the elements on some grouping keys. Use this before a selection with aggregations +to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg") + +:param fields: Group keys. +:return: The grouped table. +""" +return GroupedTable(self._j_table.groupBy(fields)) + +def distinct(self): +""" +Removes duplicate values and returns onl +Example: +:: +>>> tab.select("key, value").distinct() + +:return: Result table. +""" +return Table(self._j_table.distinct()) + +def join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. You can use where and select clauses after a join to further specify the +behaviour of the join. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` . + +Example: +:: +>>> left.join(right).where("a = b && c > 3").select("a, b, d") +>>> left.join(right, "a = b") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is not None: +return Table(self._j_table.join(right._j_table, join_predicate)) +else: +return Table(self._j_table.join(right._j_table)) + +def left_outer_join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL left outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.left_outer_join(right).select("a, b, d") +>>> left.left_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is None: +return Table(self._j_table.leftOuterJoin(right._j_table)) +else: +return Table(self._j_table.leftOuterJoin( +right._j_table, join_predicate)) + +def right_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL right outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.right_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +""" +return Table(self._j_table.rightOuterJoin( +right._j_table, join_predicate)) + +def full_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL full outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if Review comment: Same as above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283586736 ## File path: flink-python/pyflink/table/tests/test_batch_table_api.py ## @@ -0,0 +1,418 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# # distributed under the License is distributed on an "AS IS" BASIS, +# # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# # See the License for the specific language governing permissions and +# # limitations under the License. + +import os + +from pyflink.table.types import DataTypes +from pyflink.table.window import Tumble, Slide, Session + +from pyflink.testing.test_case_utils import PyFlinkBatchTableTestCase + + +class BatchTableTests(PyFlinkBatchTableTestCase): Review comment: And I think we also need a base test class which can run both Batch and Stream, For very special tests we can choose to use Stream or Batch to execute? What do you think? @WeiZhong94 @dianfu This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283596541 ## File path: flink-python/pyflink/table/table.py ## @@ -106,6 +113,344 @@ def where(self, predicate): """ return Table(self._j_table.where(predicate)) +def group_by(self, fields): +""" +Groups the elements on some grouping keys. Use this before a selection with aggregations +to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg") + +:param fields: Group keys. +:return: The grouped table. +""" +return GroupedTable(self._j_table.groupBy(fields)) + +def distinct(self): +""" +Removes duplicate values and returns onl +Example: +:: +>>> tab.select("key, value").distinct() + +:return: Result table. +""" +return Table(self._j_table.distinct()) + +def join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. You can use where and select clauses after a join to further specify the +behaviour of the join. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` . + +Example: +:: +>>> left.join(right).where("a = b && c > 3").select("a, b, d") +>>> left.join(right, "a = b") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is not None: +return Table(self._j_table.join(right._j_table, join_predicate)) +else: +return Table(self._j_table.join(right._j_table)) + +def left_outer_join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL left outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.left_outer_join(right).select("a, b, d") +>>> left.left_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is None: +return Table(self._j_table.leftOuterJoin(right._j_table)) +else: +return Table(self._j_table.leftOuterJoin( +right._j_table, join_predicate)) + +def right_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL right outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.right_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +""" +return Table(self._j_table.rightOuterJoin( +right._j_table, join_predicate)) + +def full_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL full outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.full_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +""" +return Table(self._j_table.fullOuterJoin( +right._j_table, join_predicate)) + +def minus(self, right): +""" +Minus of two :class:`Table`s with duplicate records removed. +Similar to a SQL EXCEPT clause. Minus returns records from the left
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283589608 ## File path: flink-python/pyflink/table/table.py ## @@ -106,6 +113,344 @@ def where(self, predicate): """ return Table(self._j_table.where(predicate)) +def group_by(self, fields): +""" +Groups the elements on some grouping keys. Use this before a selection with aggregations +to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg") + +:param fields: Group keys. +:return: The grouped table. +""" +return GroupedTable(self._j_table.groupBy(fields)) + +def distinct(self): +""" +Removes duplicate values and returns onl +Example: +:: +>>> tab.select("key, value").distinct() + +:return: Result table. +""" +return Table(self._j_table.distinct()) + +def join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. You can use where and select clauses after a join to further specify the +behaviour of the join. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` . + +Example: +:: +>>> left.join(right).where("a = b && c > 3").select("a, b, d") +>>> left.join(right, "a = b") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is not None: +return Table(self._j_table.join(right._j_table, join_predicate)) +else: +return Table(self._j_table.join(right._j_table)) + +def left_outer_join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL left outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.left_outer_join(right).select("a, b, d") +>>> left.left_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is None: +return Table(self._j_table.leftOuterJoin(right._j_table)) +else: +return Table(self._j_table.leftOuterJoin( +right._j_table, join_predicate)) + +def right_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL right outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if Review comment: `~pyflink.table.Table.as_` -> `~pyflink.table.Table.alias` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283602349 ## File path: flink-python/pyflink/table/table_config.py ## @@ -44,18 +47,55 @@ def parallelism(self): def parallelism(self, parallelism): self._parallelism = parallelism +@property +def timezone(self): +return self._j_table_config.getTimeZone().getID() + +@timezone.setter +def timezone(self, timezone_id): +if timezone_id is not None and isinstance(timezone_id, str): +j_timezone = self._jvm.java.util.TimeZone.getTimeZone(timezone_id) +self._j_table_config.setTimeZone(j_timezone) +else: +raise Exception("TableConfig.timezone should be a string!") + +@property +def null_check(self): +return self._j_table_config.getNullCheck() + +@null_check.setter +def null_check(self, null_check): +if null_check is not None and isinstance(null_check, bool): +self._j_table_config.setNullCheck(null_check) +else: +raise Exception("TableConfig.null_check should be a bool value!") + +@property +def max_generated_code_length(self): +return self._j_table_config.getMaxGeneratedCodeLength() Review comment: Do we need this method for Python API(this is only using in Java code gen).? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283590341 ## File path: flink-python/pyflink/table/table.py ## @@ -106,6 +113,344 @@ def where(self, predicate): """ return Table(self._j_table.where(predicate)) +def group_by(self, fields): +""" +Groups the elements on some grouping keys. Use this before a selection with aggregations +to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg") + +:param fields: Group keys. +:return: The grouped table. +""" +return GroupedTable(self._j_table.groupBy(fields)) + +def distinct(self): +""" +Removes duplicate values and returns onl +Example: +:: +>>> tab.select("key, value").distinct() + +:return: Result table. +""" +return Table(self._j_table.distinct()) + +def join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. You can use where and select clauses after a join to further specify the +behaviour of the join. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` . + +Example: +:: +>>> left.join(right).where("a = b && c > 3").select("a, b, d") +>>> left.join(right, "a = b") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is not None: +return Table(self._j_table.join(right._j_table, join_predicate)) +else: +return Table(self._j_table.join(right._j_table)) + +def left_outer_join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL left outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.left_outer_join(right).select("a, b, d") +>>> left.left_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is None: +return Table(self._j_table.leftOuterJoin(right._j_table)) +else: +return Table(self._j_table.leftOuterJoin( +right._j_table, join_predicate)) + +def right_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL right outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.right_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +""" +return Table(self._j_table.rightOuterJoin( +right._j_table, join_predicate)) + +def full_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL full outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.full_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +""" +return Table(self._j_table.fullOuterJoin( +right._j_table, join_predicate)) + +def minus(self, right): +""" +Minus of two :class:`Table`s with duplicate records removed. +Similar to a SQL EXCEPT clause. Minus returns records from the left
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283590748 ## File path: flink-python/pyflink/table/table.py ## @@ -106,6 +113,344 @@ def where(self, predicate): """ return Table(self._j_table.where(predicate)) +def group_by(self, fields): +""" +Groups the elements on some grouping keys. Use this before a selection with aggregations +to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg") + +:param fields: Group keys. +:return: The grouped table. +""" +return GroupedTable(self._j_table.groupBy(fields)) + +def distinct(self): +""" +Removes duplicate values and returns onl +Example: +:: +>>> tab.select("key, value").distinct() + +:return: Result table. +""" +return Table(self._j_table.distinct()) + +def join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. You can use where and select clauses after a join to further specify the +behaviour of the join. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` . + +Example: +:: +>>> left.join(right).where("a = b && c > 3").select("a, b, d") +>>> left.join(right, "a = b") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is not None: +return Table(self._j_table.join(right._j_table, join_predicate)) +else: +return Table(self._j_table.join(right._j_table)) + +def left_outer_join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL left outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.left_outer_join(right).select("a, b, d") +>>> left.left_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is None: +return Table(self._j_table.leftOuterJoin(right._j_table)) +else: +return Table(self._j_table.leftOuterJoin( +right._j_table, join_predicate)) + +def right_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL right outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.right_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +""" +return Table(self._j_table.rightOuterJoin( +right._j_table, join_predicate)) + +def full_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL full outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.as_` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.full_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +""" +return Table(self._j_table.fullOuterJoin( +right._j_table, join_predicate)) Review comment: Can we in one line. ? This is an automated message from the Apache Git Service. To respond to the
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283601943 ## File path: flink-python/pyflink/table/table_config.py ## @@ -44,18 +47,55 @@ def parallelism(self): def parallelism(self, parallelism): self._parallelism = parallelism +@property +def timezone(self): +return self._j_table_config.getTimeZone().getID() + +@timezone.setter +def timezone(self, timezone_id): +if timezone_id is not None and isinstance(timezone_id, str): Review comment: It's there some simple way to translate the python timezone to java timezone? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283583541 ## File path: flink-python/pyflink/table/table.py ## @@ -106,6 +113,344 @@ def where(self, predicate): """ return Table(self._j_table.where(predicate)) +def group_by(self, fields): +""" +Groups the elements on some grouping keys. Use this before a selection with aggregations +to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg") + +:param fields: Group keys. +:return: The grouped table. +""" +return GroupedTable(self._j_table.groupBy(fields)) + +def distinct(self): +""" +Removes duplicate values and returns onl Review comment: `Removes duplicate values and returns onl` -> `Removes duplicate values and returns only distinct (different) values.` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r283599842 ## File path: flink-python/pyflink/table/table.py ## @@ -118,3 +463,112 @@ def insert_into(self, table_name): :param table_name: Name of the :class:`TableSink` to which the :class:`Table` is written. """ self._j_table.insertInto(table_name) + +def __str__(self): +return self._j_table.toString() + + +class GroupedTable(object): +""" +A table that has been grouped on a set of grouping keys. +""" + +def __init__(self, java_table): +self._j_table = java_table + +def select(self, fields): +""" +Performs a selection operation on a grouped table. Similar to an SQL SELECT statement. +The field expressions can contain complex expressions and aggregations. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg + ' The average' as average") + + +:param fields: Expression string that contains group keys and aggregate function calls. +:return: Result table. +""" +return Table(self._j_table.select(fields)) + + +class GroupWindowedTable(object): +""" +A table that has been windowed for :class:`GroupWindow`s. +""" + +def __init__(self, java_group_windowed_table): +self._java_table = java_group_windowed_table Review comment: Both `_j_table` and `_j_group_windowed_table` are makes sense to me. Because if we using `_j_table` all JVM *table and be ref by `_j_table`. if we using ` _j_group_windowed_table` we also need `_j_over_windowed_table`, `_j_windowed_grouped_table` etc. For simple, I prefer using `_j_table` for all of the class in `Table.py`? What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10837) Kafka 2.0 test KafkaITCase.testOneToOneSources dead locked on deleteTestTopic?
[ https://issues.apache.org/jira/browse/FLINK-10837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16839011#comment-16839011 ] Bowen Li commented on FLINK-10837: -- Hi [~pnowojski] , do you know how to change the "Affected Version" to 1.9.0? Seems it doesn't allow me to do so > Kafka 2.0 test KafkaITCase.testOneToOneSources dead locked on deleteTestTopic? > -- > > Key: FLINK-10837 > URL: https://issues.apache.org/jira/browse/FLINK-10837 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.0 >Reporter: Piotr Nowojski >Priority: Major > Fix For: 1.7.0 > > > https://api.travis-ci.org/v3/job/452439034/log.txt > {noformat} > Tests in error: > > KafkaITCase.testOneToOneSources:97->KafkaConsumerTestBase.runOneToOneExactlyOnceTest:875->KafkaTestBase.deleteTestTopic:206->Object.wait:502->Object.wait:-2 > » TestTimedOut > {noformat} > {noformat} > java.lang.InterruptedException > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262) > at > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.deleteTestTopic(KafkaTestEnvironmentImpl.java:158) > at > org.apache.flink.streaming.connectors.kafka.KafkaTestBase.deleteTestTopic(KafkaTestBase.java:206) > at > org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runOneToOneExactlyOnceTest(KafkaConsumerTestBase.java:875) > at > org.apache.flink.streaming.connectors.kafka.KafkaITCase.testOneToOneSources(KafkaITCase.java:97) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.lang.Thread.run(Thread.java:748) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] yanghua edited a comment on issue #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore
yanghua edited a comment on issue #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore URL: https://github.com/apache/flink/pull/8410#issuecomment-492047904 It's very strange, the Travis reported compile error : ``` 16:40:05.120 [ERROR] /home/travis/build/apache/flink/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java:[343,95] constructor CheckpointCoordinatorConfiguration in class org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration cannot be applied to given types; 16:40:05.120 [ERROR] required: long,long,long,int,org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy,boolean,boolean 16:40:05.120 [ERROR] found: long,long,long,int,org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy,boolean ``` But the class `JobGraphTest` does not use `CheckpointCoordinatorConfiguration`. Will trigger rebuild. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on issue #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore
yanghua commented on issue #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore URL: https://github.com/apache/flink/pull/8410#issuecomment-492047904 It's very strange, the Travis reported compile error : ``` 16:40:05.120 [ERROR] /home/travis/build/apache/flink/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java:[343,95] constructor CheckpointCoordinatorConfiguration in class org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration cannot be applied to given types; 16:40:05.120 [ERROR] required: long,long,long,int,org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy,boolean,boolean 16:40:05.120 [ERROR] found: long,long,long,int,org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy,boolean ``` But the class `JobGraphTest` does not use `CheckpointCoordinatorConfiguration`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10837) Kafka 2.0 test KafkaITCase.testOneToOneSources dead locked on deleteTestTopic?
[ https://issues.apache.org/jira/browse/FLINK-10837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16839008#comment-16839008 ] Bowen Li commented on FLINK-10837: -- Ran into this again at build https://travis-ci.org/apache/flink/builds/532040077 Logs: https://api.travis-ci.org/v3/job/532040083/log.txt {code:java} 00:34:35.293 [INFO] Tests run: 12, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 314.882 s - in org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase 00:34:35.650 [INFO] 00:34:35.650 [INFO] Results: 00:34:35.650 [INFO] 00:34:35.650 [ERROR] Errors: 00:34:35.650 [ERROR] KafkaITCase.testTimestamps:264->KafkaTestBase.deleteTestTopic:204->Object.wait:-2 » TestTimedOut 00:34:35.650 [INFO] 00:34:35.650 [ERROR] Tests run: 46, Failures: 0, Errors: 1, Skipped: 0 {code} > Kafka 2.0 test KafkaITCase.testOneToOneSources dead locked on deleteTestTopic? > -- > > Key: FLINK-10837 > URL: https://issues.apache.org/jira/browse/FLINK-10837 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.0 >Reporter: Piotr Nowojski >Priority: Major > Fix For: 1.7.0 > > > https://api.travis-ci.org/v3/job/452439034/log.txt > {noformat} > Tests in error: > > KafkaITCase.testOneToOneSources:97->KafkaConsumerTestBase.runOneToOneExactlyOnceTest:875->KafkaTestBase.deleteTestTopic:206->Object.wait:502->Object.wait:-2 > » TestTimedOut > {noformat} > {noformat} > java.lang.InterruptedException > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262) > at > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.deleteTestTopic(KafkaTestEnvironmentImpl.java:158) > at > org.apache.flink.streaming.connectors.kafka.KafkaTestBase.deleteTestTopic(KafkaTestBase.java:206) > at > org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runOneToOneExactlyOnceTest(KafkaConsumerTestBase.java:875) > at > org.apache.flink.streaming.connectors.kafka.KafkaITCase.testOneToOneSources(KafkaITCase.java:97) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.lang.Thread.run(Thread.java:748) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-10837) Kafka 2.0 test KafkaITCase.testOneToOneSources dead locked on deleteTestTopic?
[ https://issues.apache.org/jira/browse/FLINK-10837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reopened FLINK-10837: -- > Kafka 2.0 test KafkaITCase.testOneToOneSources dead locked on deleteTestTopic? > -- > > Key: FLINK-10837 > URL: https://issues.apache.org/jira/browse/FLINK-10837 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.0 >Reporter: Piotr Nowojski >Priority: Major > Fix For: 1.7.0 > > > https://api.travis-ci.org/v3/job/452439034/log.txt > {noformat} > Tests in error: > > KafkaITCase.testOneToOneSources:97->KafkaConsumerTestBase.runOneToOneExactlyOnceTest:875->KafkaTestBase.deleteTestTopic:206->Object.wait:502->Object.wait:-2 > » TestTimedOut > {noformat} > {noformat} > java.lang.InterruptedException > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262) > at > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.deleteTestTopic(KafkaTestEnvironmentImpl.java:158) > at > org.apache.flink.streaming.connectors.kafka.KafkaTestBase.deleteTestTopic(KafkaTestBase.java:206) > at > org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runOneToOneExactlyOnceTest(KafkaConsumerTestBase.java:875) > at > org.apache.flink.streaming.connectors.kafka.KafkaITCase.testOneToOneSources(KafkaITCase.java:97) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.lang.Thread.run(Thread.java:748) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r283595220 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala ## @@ -69,45 +66,19 @@ import _root_.scala.collection.mutable */ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment { - // the catalog to hold all registered and translated tables - // we disable caching here to prevent side effects - private val internalSchema: CalciteSchema = CalciteSchema.createRootSchema(false, false) - private val rootSchema: SchemaPlus = internalSchema.plus() - // Table API/SQL function catalog private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog() - // the configuration to create a Calcite planner - private lazy val frameworkConfig: FrameworkConfig = Frameworks -.newConfigBuilder -.defaultSchema(rootSchema) -.parserConfig(getSqlParserConfig) -.costFactory(new DataSetCostFactory) -.typeSystem(new FlinkTypeSystem) -.operatorTable(getSqlOperatorTable) -.sqlToRelConverterConfig(getSqlToRelConverterConfig) -// the converter is needed when calling temporal table functions from SQL, because -// they reference a history table represented with a tree of table operations -.context(Contexts.of( - new TableOperationConverter.ToRelConverterSupplier(expressionBridge) -)) -// set the executor to evaluate constant expressions -.executor(new ExpressionReducer(config)) -.build + private val BUILTIN_CATALOG_NAME = "builtin" Review comment: does TableEnvironment need to know the default catalog's name in CatalogManager? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r283595427 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerSchema.java ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.CatalogNotExistException; + +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.SchemaVersion; +import org.apache.calcite.schema.Table; + +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Bridge between {@link CatalogManager} and {@link Schema}. This way we can query Flink's specific catalogs + * from Calcite. + * + * The mapping for {@link Catalog}s is modeled as a strict two-level reference structure for Flink in Calcite, + * the full path of tables and views is of format [catalog_name].[db_name].[meta-object_name]. + * + * It also supports {@link ExternalCatalog}s. An external catalog maps 1:1 to Calcite's schema. + */ +@Internal +public class CatalogManagerSchema implements Schema { Review comment: can we use a better name for this class? Not sure if we should support ExternalCatalog yet, but even if we do, its naming has been a bit confusing This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8392: [FLINK-12370][python][travis] Integrated Travis for Python Table API.
sunjincheng121 commented on issue #8392: [FLINK-12370][python][travis] Integrated Travis for Python Table API. URL: https://github.com/apache/flink/pull/8392#issuecomment-492046091 > I find one thing we need double check, is the cost of the testing, maybe there is a bug need fix(I am not sure, I will check it): > ![image](https://user-images.githubusercontent.com/22488084/57661099-0bc2fa00-761c-11e9-8a11-cdd7e49c3fe6.png) > @WeiZhong94 the script bug, see follows: ![image](https://user-images.githubusercontent.com/22488084/57664749-05d51500-762c-11e9-8963-f0cf2f264ba7.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung merged pull request #8407: [FLINK-12374][table-planner-blink] Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan to StreamTransformation.
KurtYoung merged pull request #8407: [FLINK-12374][table-planner-blink] Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan to StreamTransformation. URL: https://github.com/apache/flink/pull/8407 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12374) Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan to StreamTransformation.
[ https://issues.apache.org/jira/browse/FLINK-12374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12374: --- Labels: pull-request-available (was: ) > Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan > to StreamTransformation. > > > Key: FLINK-12374 > URL: https://issues.apache.org/jira/browse/FLINK-12374 > Project: Flink > Issue Type: Task > Components: Table SQL / Planner >Reporter: Jing Zhang >Assignee: Jing Zhang >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12374) Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan to StreamTransformation.
[ https://issues.apache.org/jira/browse/FLINK-12374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-12374. -- Resolution: Implemented Fix Version/s: 1.9.0 merge in 1.9.0: 2826ff80c4b056d7af589649238b3acabca43837 > Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan > to StreamTransformation. > > > Key: FLINK-12374 > URL: https://issues.apache.org/jira/browse/FLINK-12374 > Project: Flink > Issue Type: Task > Components: Table SQL / Planner >Reporter: Jing Zhang >Assignee: Jing Zhang >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12502) Harden JobMasterTest#testRequestNextInputSplitWithDataSourceFailover
[ https://issues.apache.org/jira/browse/FLINK-12502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] leesf reassigned FLINK-12502: - Assignee: leesf > Harden JobMasterTest#testRequestNextInputSplitWithDataSourceFailover > > > Key: FLINK-12502 > URL: https://issues.apache.org/jira/browse/FLINK-12502 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: leesf >Priority: Major > > The {{JobMasterTest#testRequestNextInputSplitWithDataSourceFailover}} relies > on how many files you have in your working directory. This assumption is > quite brittle. Instead we should explicitly instantiate an > {{InputSplitAssigner}} with a defined number of input splits. > Moreover, we should make the assertions more explicit: Input split > comparisons should not rely solely on the length of the input split data. > Maybe it is also not necessary to capture the full > {{TaskDeploymentDescriptor}} because we could already know the producer's and > consumer's {{JobVertexID}} when we create the {{JobGraph}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12504) NullPoint here NullPointException there.. It's every where
[ https://issues.apache.org/jira/browse/FLINK-12504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838959#comment-16838959 ] Ken Krugler commented on FLINK-12504: - Not sure why you say "It's not print". Your stacktrace ends with {{PrintSinkFunction}} calling {{PrintSinkOutputWriter}} and that's a sink that (from the JavaDocs) is an "Implementation of the SinkFunction writing every tuple to the standard output or standard error stream". Which you definitely don't want to do in a production workflow, that's only something you'd do for testing small workflows running locally. In any case, I'd suggest closing this issue and re-posting to the Flink user list, thanks. > NullPoint here NullPointException there.. It's every where > -- > > Key: FLINK-12504 > URL: https://issues.apache.org/jira/browse/FLINK-12504 > Project: Flink > Issue Type: Bug >Reporter: Chethan UK >Priority: Major > > I was trying to push data from Kafka to Cassandra, after around 220K > sometimes, 300K points are pushed into C*, this > java.lang.NullPointerException throws in.. > ``` > java.lang.NullPointerException > at > org.apache.flink.api.common.functions.util.PrintSinkOutputWriter.write(PrintSinkOutputWriter.java:73) > at > org.apache.flink.streaming.api.functions.sink.PrintSinkFunction.invoke(PrintSinkFunction.java:81) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > ``` > How can normal Flink Users understand these error? The Job's keep failing and > it's very unstable to be considered in production... > > In RoadMap, is there plans to make Kotlin supported language as well? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunjincheng121 commented on issue #8392: [FLINK-12370][python][travis] Integrated Travis for Python Table API.
sunjincheng121 commented on issue #8392: [FLINK-12370][python][travis] Integrated Travis for Python Table API. URL: https://github.com/apache/flink/pull/8392#issuecomment-492027462 We have two options for Python integration with Travis. 1. It is the addition of a python stage that PR currently reflects. Instead of using MVN for testing, it runs a separate shell. 2. Using MVN to execute the shell, Python validation is also MVN is tested. I personally prefer the current PR program. The reason is that: - The Python API is a different language from Java/Scala. The code management and testing methods are not dependent on MVN. Python multi-version and Python API functional tests are combined using Conda and Tox. So we choose not to use the MVN solution; - The Python API is a new language API that has just been incorporated into Flink. We would like to have an intuitive understanding of Python's details in the Travis process and intuitively observe the time spent testing the Python API. So choose a separate stage and a separate run output solution. Of course, there are 2 aspects, 1. Independent stage 2. Is it integrated into MVN, that is, 2x2=4 options, I tend to the current PR. What do you think? @aljoscha @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog
flinkbot commented on issue #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8434#issuecomment-492026774 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8392: [FLINK-12370][python][travis] Integrated Travis for Python Table API.
sunjincheng121 commented on issue #8392: [FLINK-12370][python][travis] Integrated Travis for Python Table API. URL: https://github.com/apache/flink/pull/8392#issuecomment-492026843 I find one thing we need double check, is the cost of the testing, maybe there is a bug need fix(I am not sure, I will check it): ![image](https://user-images.githubusercontent.com/22488084/57661099-0bc2fa00-761c-11e9-8a11-cdd7e49c3fe6.png) @WeiZhong94 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12234) Support view related operations in HiveCatalog
[ https://issues.apache.org/jira/browse/FLINK-12234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12234: --- Labels: pull-request-available (was: ) > Support view related operations in HiveCatalog > -- > > Key: FLINK-12234 > URL: https://issues.apache.org/jira/browse/FLINK-12234 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > > Support view related operations in HiveCatalog, which implements > ReadableWritableCatalog API -- This message was sent by Atlassian JIRA (v7.6.3#76005)