[GitHub] [flink] zhijiangW commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-13 Thread GitBox
zhijiangW commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r283635811
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java
 ##
 @@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * The provider used for requesting and releasing batch of memory segments.
+ */
+public interface MemorySegmentProvider {
+   /** Requests default number of memory segments. */
+   Collection requestMemorySegments() throws IOException;
+
+   Collection requestMemorySegments(int numRequiredBuffers) 
throws IOException;
 
 Review comment:
   We are indeed in the lack of semantics of requesting/releasing batch of 
buffers before, and I think it is reasonable to extend these methods. 
   
   - `SegmentProvider` is not really needed because `RemoteInputChannel` still 
needs construct the `Buffer` internally after requesting `MemorySegment`.
   
   - We could extend the general `BufferProvider` interface to provide method 
of `requestBuffers(int numRequiredBuffers, BufferRecycler)`,  and 
`RemoteInputChannel` already implements `BufferRecycler` interface.
   
   - We could also extend the general `BufferRecycler`  interface to provide 
method of `recycle(Collection)`. 
   
   To do so we make use of the existing semantics and architecture. But the 
only concern is we would affect the previous implementations to bring more 
changes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-05-13 Thread GitBox
wuchong commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r283635271
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/DeclarativeAggregateFunction.java
 ##
 @@ -62,7 +62,7 @@
/**
 * All types of the aggregate buffer.
 */
-   public abstract InternalType[] getAggBufferTypes();
+   public abstract LogicalType[] getAggBufferTypes();
 
 Review comment:
   Regarding to `DeclarativeAggregateFunction`, I think it is an API expose to 
"users" or out of table system. Maybe we should use `DataType` for 
`getAggBufferTypes` and `getResultType`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yuyang08 edited a comment on issue #8067: [FLINK-11746][formats] Add thrift format support to Flink

2019-05-13 Thread GitBox
yuyang08 edited a comment on issue #8067: [FLINK-11746][formats] Add thrift 
format support to Flink
URL: https://github.com/apache/flink/pull/8067#issuecomment-492088145
 
 
   @twalthr , @rmetzger, @fhueske   could you help to review this pr, or point  
us to the right person for reviewing?  we have tested this change internally 
and it works for our use cases. We would love to get feedback from the 
community and see how we can merge it into the upstream. thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yuyang08 commented on issue #8067: [FLINK-11746][formats] Add thrift format support to Flink

2019-05-13 Thread GitBox
yuyang08 commented on issue #8067: [FLINK-11746][formats] Add thrift format 
support to Flink
URL: https://github.com/apache/flink/pull/8067#issuecomment-492088145
 
 
   @twalthr , @rmetzger  could you help to review this pr, or point  us to the 
right person for reviewing?  we have tested this change internally and it works 
for our use cases. We would love to get feedback from the community and see how 
we can merge it into the upstream. thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] 
Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283633644
 
 

 ##
 File path: flink-python/pyflink/table/table_config.py
 ##
 @@ -44,18 +47,55 @@ def parallelism(self):
 def parallelism(self, parallelism):
 self._parallelism = parallelism
 
+@property
+def timezone(self):
+return self._j_table_config.getTimeZone().getID()
 
 Review comment:
   The reason for keeping current implementation is the same as above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] 
Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283633442
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -118,3 +463,112 @@ def insert_into(self, table_name):
 :param table_name: Name of the :class:`TableSink` to which the 
:class:`Table` is written.
 """
 self._j_table.insertInto(table_name)
+
+def __str__(self):
+return self._j_table.toString()
+
+
+class GroupedTable(object):
+"""
+A table that has been grouped on a set of grouping keys.
+"""
+
+def __init__(self, java_table):
+self._j_table = java_table
+
+def select(self, fields):
+"""
+Performs a selection operation on a grouped table. Similar to an SQL 
SELECT statement.
+The field expressions can contain complex expressions and aggregations.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg + ' The average' as 
average")
+
+
+:param fields: Expression string that contains group keys and 
aggregate function calls.
+:return: Result table.
+"""
+return Table(self._j_table.select(fields))
+
+
+class GroupWindowedTable(object):
+"""
+A table that has been windowed for :class:`GroupWindow`s.
+"""
+
+def __init__(self, java_group_windowed_table):
+self._java_table = java_group_windowed_table
+
+def group_by(self, fields):
+"""
+Groups the elements by a mandatory window and one or more optional 
grouping attributes.
+The window is specified by referring to its alias.
+
+If no additional grouping attribute is specified and if the input is a 
streaming table,
+the aggregation will be performed by a single task, i.e., with 
parallelism 1.
+
+Aggregations are performed per group and defined by a subsequent
+:func:`~pyflink.table.WindowGroupedTable.select` clause similar to SQL 
SELECT-GROUP-BY
+query.
+
+Example:
+::
+>>> tab.window(groupWindow.alias("w")).group_by("w, 
key").select("key, value.avg")
+
+:param fields: Group keys.
+:return: A :class:`WindowGroupedTable`.
+"""
+return WindowGroupedTable(self._java_table.groupBy(fields))
+
+
+class WindowGroupedTable(object):
+"""
+A table that has been windowed and grouped for :class:`GroupWindow`s.
+"""
+
+def __init__(self, java_window_grouped_table):
+self._java_table = java_window_grouped_table
+
+def select(self, fields):
+"""
+Performs a selection operation on a window grouped table. Similar to 
an SQL SELECT
+statement.
+The field expressions can contain complex expressions and aggregations.
+
+Example:
+::
+>>> window_grouped_table.select("key, window.start, value.avg as 
valavg")
+
+:param fields: Expression string.
+:return: Result table.
+"""
+return Table(self._java_table.select(fields))
+
+
+class OverWindowedTable(object):
+"""
+A table that has been windowed for :class:`OverWindow`s.
+
+Unlike group windows, which are specified in the GROUP BY clause, over 
windows do not collapse
+rows. Instead over window aggregates compute an aggregate for each input 
row over a range of
+its neighboring rows.
+"""
+
+def __init__(self, java_over_windowed_table):
+self._java_table = java_over_windowed_table
 
 Review comment:
   According to the comments of sunjincheng121, I think it is better to change 
current name to _j_table.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] 
Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283633425
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -118,3 +463,112 @@ def insert_into(self, table_name):
 :param table_name: Name of the :class:`TableSink` to which the 
:class:`Table` is written.
 """
 self._j_table.insertInto(table_name)
+
+def __str__(self):
+return self._j_table.toString()
+
+
+class GroupedTable(object):
+"""
+A table that has been grouped on a set of grouping keys.
+"""
+
+def __init__(self, java_table):
+self._j_table = java_table
+
+def select(self, fields):
+"""
+Performs a selection operation on a grouped table. Similar to an SQL 
SELECT statement.
+The field expressions can contain complex expressions and aggregations.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg + ' The average' as 
average")
+
+
+:param fields: Expression string that contains group keys and 
aggregate function calls.
+:return: Result table.
+"""
+return Table(self._j_table.select(fields))
+
+
+class GroupWindowedTable(object):
+"""
+A table that has been windowed for :class:`GroupWindow`s.
+"""
+
+def __init__(self, java_group_windowed_table):
+self._java_table = java_group_windowed_table
+
+def group_by(self, fields):
+"""
+Groups the elements by a mandatory window and one or more optional 
grouping attributes.
+The window is specified by referring to its alias.
+
+If no additional grouping attribute is specified and if the input is a 
streaming table,
+the aggregation will be performed by a single task, i.e., with 
parallelism 1.
+
+Aggregations are performed per group and defined by a subsequent
+:func:`~pyflink.table.WindowGroupedTable.select` clause similar to SQL 
SELECT-GROUP-BY
+query.
+
+Example:
+::
+>>> tab.window(groupWindow.alias("w")).group_by("w, 
key").select("key, value.avg")
+
+:param fields: Group keys.
+:return: A :class:`WindowGroupedTable`.
+"""
+return WindowGroupedTable(self._java_table.groupBy(fields))
+
+
+class WindowGroupedTable(object):
+"""
+A table that has been windowed and grouped for :class:`GroupWindow`s.
+"""
+
+def __init__(self, java_window_grouped_table):
+self._java_table = java_window_grouped_table
 
 Review comment:
   According to the comments of sunjincheng121, I think it is better to change 
current name to _j_table.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] 
Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283633274
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -118,3 +463,112 @@ def insert_into(self, table_name):
 :param table_name: Name of the :class:`TableSink` to which the 
:class:`Table` is written.
 """
 self._j_table.insertInto(table_name)
+
+def __str__(self):
+return self._j_table.toString()
+
+
+class GroupedTable(object):
+"""
+A table that has been grouped on a set of grouping keys.
+"""
+
+def __init__(self, java_table):
+self._j_table = java_table
 
 Review comment:
   According to the comments of sunjincheng121, I think it is better to keep 
the current name.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-5243) Implement an example for BipartiteGraph

2019-05-13 Thread Jasleen Kaur (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16839089#comment-16839089
 ] 

Jasleen Kaur commented on FLINK-5243:
-

[~ivan.mushketyk] Can I take this issue?

> Implement an example for BipartiteGraph
> ---
>
> Key: FLINK-5243
> URL: https://issues.apache.org/jira/browse/FLINK-5243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Library / Graph Processing (Gelly)
>Reporter: Ivan Mushketyk
>Priority: Major
>  Labels: beginner
>
> Should implement example for BipartiteGraph in gelly-examples project 
> similarly to examples for Graph class.
> Depends on this: https://issues.apache.org/jira/browse/FLINK-2254



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] 
Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283632055
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -106,6 +113,344 @@ def where(self, predicate):
 """
 return Table(self._j_table.where(predicate))
 
+def group_by(self, fields):
+"""
+Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+to perform the aggregation on a per-group basis. Similar to a SQL 
GROUP BY statement.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg")
+
+:param fields: Group keys.
+:return: The grouped table.
+"""
+return GroupedTable(self._j_table.groupBy(fields))
+
+def distinct(self):
+"""
+Removes duplicate values and returns onl
+Example:
+::
+>>> tab.select("key, value").distinct()
+
+:return: Result table.
+"""
+return Table(self._j_table.distinct())
+
+def join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL join. The fields of the 
two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary. You can use where and select clauses after a join to 
further specify the
+behaviour of the join.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` .
+
+Example:
+::
+>>> left.join(right).where("a = b && c > 3").select("a, b, d")
+>>> left.join(right, "a = b")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is not None:
+return Table(self._j_table.join(right._j_table, join_predicate))
+else:
+return Table(self._j_table.join(right._j_table))
+
+def left_outer_join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL left outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.left_outer_join(right).select("a, b, d")
+>>> left.left_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is None:
+return Table(self._j_table.leftOuterJoin(right._j_table))
+else:
+return Table(self._j_table.leftOuterJoin(
+right._j_table, join_predicate))
+
+def right_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL right outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] 
Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283632066
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -106,6 +113,344 @@ def where(self, predicate):
 """
 return Table(self._j_table.where(predicate))
 
+def group_by(self, fields):
+"""
+Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+to perform the aggregation on a per-group basis. Similar to a SQL 
GROUP BY statement.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg")
+
+:param fields: Group keys.
+:return: The grouped table.
+"""
+return GroupedTable(self._j_table.groupBy(fields))
+
+def distinct(self):
+"""
+Removes duplicate values and returns onl
+Example:
+::
+>>> tab.select("key, value").distinct()
+
+:return: Result table.
+"""
+return Table(self._j_table.distinct())
+
+def join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL join. The fields of the 
two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary. You can use where and select clauses after a join to 
further specify the
+behaviour of the join.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` .
+
+Example:
+::
+>>> left.join(right).where("a = b && c > 3").select("a, b, d")
+>>> left.join(right, "a = b")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is not None:
+return Table(self._j_table.join(right._j_table, join_predicate))
+else:
+return Table(self._j_table.join(right._j_table))
+
+def left_outer_join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL left outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.left_outer_join(right).select("a, b, d")
+>>> left.left_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is None:
+return Table(self._j_table.leftOuterJoin(right._j_table))
+else:
+return Table(self._j_table.leftOuterJoin(
+right._j_table, join_predicate))
+
+def right_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL right outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.right_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+"""
+return Table(self._j_table.rightOuterJoin(
+right._j_table, join_predicate))
+
+def full_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL full outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] 
Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283631890
 
 

 ##
 File path: flink-python/pyflink/table/tests/test_batch_table_api.py
 ##
 @@ -0,0 +1,418 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+# #  distributed under the License is distributed on an "AS IS" BASIS,
+# #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# #  See the License for the specific language governing permissions and
+# # limitations under the License.
+
+import os
+
+from pyflink.table.types import DataTypes
+from pyflink.table.window import Tumble, Slide, Session
+
+from pyflink.testing.test_case_utils import PyFlinkBatchTableTestCase
+
+
+class BatchTableTests(PyFlinkBatchTableTestCase):
+
+def test_select_alias(self):
 
 Review comment:
   That makes sense. I have rework all tests in the new commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
wuchong commented on a change in pull request #8401: [FLINK-12407][python] Add 
all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283631948
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -106,6 +113,344 @@ def where(self, predicate):
 """
 return Table(self._j_table.where(predicate))
 
+def group_by(self, fields):
+"""
+Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+to perform the aggregation on a per-group basis. Similar to a SQL 
GROUP BY statement.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg")
+
+:param fields: Group keys.
+:return: The grouped table.
+"""
+return GroupedTable(self._j_table.groupBy(fields))
+
+def distinct(self):
+"""
+Removes duplicate values and returns onl
+Example:
+::
+>>> tab.select("key, value").distinct()
+
+:return: Result table.
+"""
+return Table(self._j_table.distinct())
+
+def join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL join. The fields of the 
two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary. You can use where and select clauses after a join to 
further specify the
+behaviour of the join.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` .
+
+Example:
+::
+>>> left.join(right).where("a = b && c > 3").select("a, b, d")
+>>> left.join(right, "a = b")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is not None:
+return Table(self._j_table.join(right._j_table, join_predicate))
+else:
+return Table(self._j_table.join(right._j_table))
+
+def left_outer_join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL left outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.left_outer_join(right).select("a, b, d")
+>>> left.left_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is None:
+return Table(self._j_table.leftOuterJoin(right._j_table))
+else:
+return Table(self._j_table.leftOuterJoin(
+right._j_table, join_predicate))
+
+def right_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL right outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.right_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+"""
+return Table(self._j_table.rightOuterJoin(
+right._j_table, join_predicate))
+
+def full_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL full outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.full_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+"""
+return Table(self._j_table.fullOuterJoin(
+right._j_table, join_predicate))
+
+def minus(self, right):
+"""
+Minus of two :class:`Table`s with duplicate records removed.
+Similar to a SQL EXCEPT clause. Minus returns records from the left 
table 

[GitHub] [flink] wuchong opened a new pull request #8436: [FLINK-12507][table-runtime-blink] Fix AsyncLookupJoin doesn't close all generated ResultFutures

2019-05-13 Thread GitBox
wuchong opened a new pull request #8436: [FLINK-12507][table-runtime-blink] Fix 
AsyncLookupJoin doesn't close all generated ResultFutures
URL: https://github.com/apache/flink/pull/8436
 
 
   
   
   
   ## What is the purpose of the change
   
   The `AsyncLookupJoin` doesn't close all the generated ResultFutures (which 
contains UDFs in it). Because currently we iterate on the BlockingQueue to 
close. But when the job is failing (we are using `failingSource` to trigger 
restore),  some ResultFutures may not be callback and not in the BlockingQueue.
   
   ## Brief change log
   
 - Put all the ResultFutures to an ArrayList, and iterate on the list to 
close ResultFutures.
   
   ## Verifying this change
   
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12507) Fix AsyncLookupJoin doesn't close all generated ResultFutures

2019-05-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12507:
---
Labels: pull-request-available  (was: )

> Fix AsyncLookupJoin doesn't close all generated ResultFutures
> -
>
> Key: FLINK-12507
> URL: https://issues.apache.org/jira/browse/FLINK-12507
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>  Labels: pull-request-available
>
> There is a fragile test in AsyncLookupJoinITCase, that not all the udfs are 
> closed at the end.
> {code:java}
> 02:40:48.787 [ERROR] Tests run: 22, Failures: 2, Errors: 0, Skipped: 0, Time 
> elapsed: 47.098 s <<< FAILURE! - in 
> org.apache.flink.table.runtime.stream.sql.AsyncLookupJoinITCase
> 02:40:48.791 [ERROR] 
> testAsyncJoinTemporalTableWithUdfFilter[StateBackend=HEAP](org.apache.flink.table.runtime.stream.sql.AsyncLookupJoinITCase)
>   Time elapsed: 1.266 s  <<< FAILURE!
> java.lang.AssertionError: expected:<0> but was:<2>
>   at 
> org.apache.flink.table.runtime.stream.sql.AsyncLookupJoinITCase.testAsyncJoinTemporalTableWithUdfFilter(AsyncLookupJoinITCase.scala:268)
> 02:40:48.794 [ERROR] 
> testAsyncJoinTemporalTableWithUdfFilter[StateBackend=ROCKSDB](org.apache.flink.table.runtime.stream.sql.AsyncLookupJoinITCase)
>   Time elapsed: 1.033 s  <<< FAILURE!
> java.lang.AssertionError: expected:<0> but was:<2>
>   at 
> org.apache.flink.table.runtime.stream.sql.AsyncLookupJoinITCase.testAsyncJoinTemporalTableWithUdfFilter(AsyncLookupJoinITCase.scala:268)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8436: [FLINK-12507][table-runtime-blink] Fix AsyncLookupJoin doesn't close all generated ResultFutures

2019-05-13 Thread GitBox
flinkbot commented on issue #8436: [FLINK-12507][table-runtime-blink] Fix 
AsyncLookupJoin doesn't close all generated ResultFutures
URL: https://github.com/apache/flink/pull/8436#issuecomment-492084529
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] 
Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283631291
 
 

 ##
 File path: flink-python/pyflink/table/window.py
 ##
 @@ -0,0 +1,482 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABCMeta
+
+from py4j.java_gateway import get_method
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+'Tumble',
+'Session',
+'Slide',
+'Over',
+'GroupWindow',
+'OverWindow'
+]
+
+
+class GroupWindow(object):
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12507) Fix AsyncLookupJoin doesn't close all generated ResultFutures

2019-05-13 Thread Jark Wu (JIRA)
Jark Wu created FLINK-12507:
---

 Summary: Fix AsyncLookupJoin doesn't close all generated 
ResultFutures
 Key: FLINK-12507
 URL: https://issues.apache.org/jira/browse/FLINK-12507
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Reporter: Jark Wu
Assignee: Jark Wu


There is a fragile test in AsyncLookupJoinITCase, that not all the udfs are 
closed at the end.

{code:java}
02:40:48.787 [ERROR] Tests run: 22, Failures: 2, Errors: 0, Skipped: 0, Time 
elapsed: 47.098 s <<< FAILURE! - in 
org.apache.flink.table.runtime.stream.sql.AsyncLookupJoinITCase
02:40:48.791 [ERROR] 
testAsyncJoinTemporalTableWithUdfFilter[StateBackend=HEAP](org.apache.flink.table.runtime.stream.sql.AsyncLookupJoinITCase)
  Time elapsed: 1.266 s  <<< FAILURE!
java.lang.AssertionError: expected:<0> but was:<2>
at 
org.apache.flink.table.runtime.stream.sql.AsyncLookupJoinITCase.testAsyncJoinTemporalTableWithUdfFilter(AsyncLookupJoinITCase.scala:268)

02:40:48.794 [ERROR] 
testAsyncJoinTemporalTableWithUdfFilter[StateBackend=ROCKSDB](org.apache.flink.table.runtime.stream.sql.AsyncLookupJoinITCase)
  Time elapsed: 1.033 s  <<< FAILURE!
java.lang.AssertionError: expected:<0> but was:<2>
at 
org.apache.flink.table.runtime.stream.sql.AsyncLookupJoinITCase.testAsyncJoinTemporalTableWithUdfFilter(AsyncLookupJoinITCase.scala:268)
{code}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wuchong commented on issue #8384: [FLINK-11610][docs-zh] Translate the "Examples" page into Chinese

2019-05-13 Thread GitBox
wuchong commented on issue #8384: [FLINK-11610][docs-zh] Translate the 
"Examples" page into Chinese
URL: https://github.com/apache/flink/pull/8384#issuecomment-492083048
 
 
   Thanks @YueYeShen , the translation looks good to me now. 
   
   Will merge it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 removed a comment on issue #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog

2019-05-13 Thread GitBox
bowenli86 removed a comment on issue #8434: [FLINK-12234][hive] Support view 
related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8434#issuecomment-492080766
 
 
   @xuefuz @irui-apache @zjuwangg can you please take a look?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog

2019-05-13 Thread GitBox
bowenli86 commented on issue #8434: [FLINK-12234][hive] Support view related 
operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8434#issuecomment-492080788
 
 
   @xuefuz @irui-apache @zjuwangg can you please take a look?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 opened a new pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog

2019-05-13 Thread GitBox
bowenli86 opened a new pull request #8434: [FLINK-12234][hive] Support view 
related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8434
 
 
   ## What is the purpose of the change
   
   This PR supports view related operations in `HiveCatalog` and creates 
`HiveCatalogView`.
   
   ## Brief change log
   
   - added view related operations in `HiveCatalog`
   - created `HiveCatalogView`
   - enabled view related unit tests in `HiveCatalogView`
   
   ## Verifying this change
   
   This change is already covered by existing tests in `HiveCatalogTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs) Documentation will be 
added later along with doc of `HiveCatalog`
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 closed pull request #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog

2019-05-13 Thread GitBox
bowenli86 closed pull request #8434: [FLINK-12234][hive] Support view related 
operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8434
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog

2019-05-13 Thread GitBox
bowenli86 commented on issue #8434: [FLINK-12234][hive] Support view related 
operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8434#issuecomment-492080766
 
 
   @xuefuz @irui-apache @zjuwangg can you please take a look?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8433: [FLINK-12505][hive] Unify database operations to HiveCatalogBase from its subclasses

2019-05-13 Thread GitBox
bowenli86 commented on issue #8433: [FLINK-12505][hive] Unify database 
operations to HiveCatalogBase from its subclasses
URL: https://github.com/apache/flink/pull/8433#issuecomment-492080742
 
 
   @xuefuz @irui-apache @zjuwangg can you please take a look?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] 
Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283624288
 
 

 ##
 File path: flink-python/pyflink/table/table_config.py
 ##
 @@ -44,18 +47,55 @@ def parallelism(self):
 def parallelism(self, parallelism):
 self._parallelism = parallelism
 
+@property
+def timezone(self):
+return self._j_table_config.getTimeZone().getID()
+
+@timezone.setter
+def timezone(self, timezone_id):
+if timezone_id is not None and isinstance(timezone_id, str):
 
 Review comment:
   The reason of using string here is that timezone class in python, `tzinfo` 
is a abstract class. It means that users could implements a custom timezone 
class by themselves. We can't  translate the custom python timezone to java 
timezone without information loss. So the easier way is using timezone ID in 
[tz database](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones). It 
is supported both in python and java.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] 
Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283622904
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -118,3 +463,112 @@ def insert_into(self, table_name):
 :param table_name: Name of the :class:`TableSink` to which the 
:class:`Table` is written.
 """
 self._j_table.insertInto(table_name)
+
+def __str__(self):
+return self._j_table.toString()
+
+
+class GroupedTable(object):
+"""
+A table that has been grouped on a set of grouping keys.
+"""
+
+def __init__(self, java_table):
+self._j_table = java_table
+
+def select(self, fields):
+"""
+Performs a selection operation on a grouped table. Similar to an SQL 
SELECT statement.
+The field expressions can contain complex expressions and aggregations.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg + ' The average' as 
average")
+
+
+:param fields: Expression string that contains group keys and 
aggregate function calls.
+:return: Result table.
+"""
+return Table(self._j_table.select(fields))
+
+
+class GroupWindowedTable(object):
+"""
+A table that has been windowed for :class:`GroupWindow`s.
+"""
+
+def __init__(self, java_group_windowed_table):
+self._java_table = java_group_windowed_table
 
 Review comment:
   That makes sense. I have renamed all the private java table variable to 
`_j_table ` in the new commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] 
Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283622697
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -106,6 +113,344 @@ def where(self, predicate):
 """
 return Table(self._j_table.where(predicate))
 
+def group_by(self, fields):
+"""
+Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+to perform the aggregation on a per-group basis. Similar to a SQL 
GROUP BY statement.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg")
+
+:param fields: Group keys.
+:return: The grouped table.
+"""
+return GroupedTable(self._j_table.groupBy(fields))
+
+def distinct(self):
+"""
+Removes duplicate values and returns onl
+Example:
+::
+>>> tab.select("key, value").distinct()
+
+:return: Result table.
+"""
+return Table(self._j_table.distinct())
+
+def join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL join. The fields of the 
two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary. You can use where and select clauses after a join to 
further specify the
+behaviour of the join.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` .
+
+Example:
+::
+>>> left.join(right).where("a = b && c > 3").select("a, b, d")
+>>> left.join(right, "a = b")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is not None:
+return Table(self._j_table.join(right._j_table, join_predicate))
+else:
+return Table(self._j_table.join(right._j_table))
+
+def left_outer_join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL left outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] 
Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283622709
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -106,6 +113,344 @@ def where(self, predicate):
 """
 return Table(self._j_table.where(predicate))
 
+def group_by(self, fields):
+"""
+Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+to perform the aggregation on a per-group basis. Similar to a SQL 
GROUP BY statement.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg")
+
+:param fields: Group keys.
+:return: The grouped table.
+"""
+return GroupedTable(self._j_table.groupBy(fields))
+
+def distinct(self):
+"""
+Removes duplicate values and returns onl
+Example:
+::
+>>> tab.select("key, value").distinct()
+
+:return: Result table.
+"""
+return Table(self._j_table.distinct())
+
+def join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL join. The fields of the 
two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary. You can use where and select clauses after a join to 
further specify the
+behaviour of the join.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` .
+
+Example:
+::
+>>> left.join(right).where("a = b && c > 3").select("a, b, d")
+>>> left.join(right, "a = b")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is not None:
+return Table(self._j_table.join(right._j_table, join_predicate))
+else:
+return Table(self._j_table.join(right._j_table))
+
+def left_outer_join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL left outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.left_outer_join(right).select("a, b, d")
+>>> left.left_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is None:
+return Table(self._j_table.leftOuterJoin(right._j_table))
+else:
+return Table(self._j_table.leftOuterJoin(
+right._j_table, join_predicate))
+
+def right_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL right outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.right_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+"""
+return Table(self._j_table.rightOuterJoin(
+right._j_table, join_predicate))
+
+def full_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL full outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.full_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+"""
+return Table(self._j_table.fullOuterJoin(
+right._j_table, join_predicate))
+
+def minus(self, right):
+"""
+Minus of two :class:`Table`s with duplicate records removed.
+Similar to a SQL EXCEPT clause. Minus returns records from the left 

[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] 
Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283622685
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -106,6 +113,344 @@ def where(self, predicate):
 """
 return Table(self._j_table.where(predicate))
 
+def group_by(self, fields):
+"""
+Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+to perform the aggregation on a per-group basis. Similar to a SQL 
GROUP BY statement.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg")
+
+:param fields: Group keys.
+:return: The grouped table.
+"""
+return GroupedTable(self._j_table.groupBy(fields))
+
+def distinct(self):
+"""
+Removes duplicate values and returns onl
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-13 Thread GitBox
zhijiangW commented on issue #8416: [FLINK-12331] Introduce partition/gate 
setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#issuecomment-492074867
 
 
   As for `ResultPartitionFactory`: The `ShuffleService` already takes the role 
of factory for creating partition/gate. Of course we could further extract a 
specific factory component from `ShuffleService`. But I think the factory might 
be different for different `ShuffleService` implementations, and it is regarded 
as a private tool inside specific `ShuffleService` implementation. 
   
   E.g. for `FSShuffleService#FSResultPartition`, we might do not need the 
institute of `subpartition`, only need the logic `subpartition` index. Also for 
interactive queries, if it wants to reuse the `ResultPartitionWriter`, it also 
does not need to construct `subpartition` inside specific instance.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-13 Thread GitBox
zhijiangW commented on issue #8416: [FLINK-12331] Introduce partition/gate 
setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#issuecomment-492072639
 
 
   Commits for `PartitionBufferPoolFactory`: In my previous review I saw it was 
not in the constructor of `NetworkEnvironment` in that commit, and then changed 
into the constructor in the following commit. So I mean the first introduced 
commit should be squashed with following commit to make sure it is in the 
constructor when introduced. Now I see it is already in right state, maybe some 
UI issue for commit sequence you confirmed before. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-13 Thread GitBox
zhijiangW commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r283619819
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionBufferPoolFactory.java
 ##
 @@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+
+import java.io.IOException;
+
+/**
+ * Factory of {@link BufferPool} for network partitions or channels.
+ */
+public interface PartitionBufferPoolFactory {
+   BufferPool create(int size, ResultPartitionType type, BufferPoolOwner 
owner) throws IOException;
 
 Review comment:
   empty line before `create` method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-13 Thread GitBox
zhijiangW commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r283619661
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ##
 @@ -151,7 +159,7 @@ public static NetworkEnvironment create(
checkNotNull(taskEventPublisher),
resultPartitionFactory,
singleInputGateFactory,
-   resultPartitionBufferPoolFactory);
+   resultPartitionBufferPoolFactory, 
inputGateBufferPoolFactory);
 
 Review comment:
   inputGateBufferPoolFactory should be in separate line for the commit of 
`[hotfix][network] Introduce InputGateBufferPoolFactory`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-13 Thread GitBox
zhijiangW commented on issue #8416: [FLINK-12331] Introduce partition/gate 
setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#issuecomment-492070855
 
 
   @azagrebin thanks for the confirmation.
   
   As for `PartitionBufferPoolFactory`:
   
   - The current `BufferPoolFactory` exists only for partition/gate, and no 
other parts would need `BufferPool` in flink stack atm. The new introduced 
`PartitionBufferPoolFactory` is also for partition/gate, but has no 
relationship with `BufferPoolFactory` in the architecture form. So I think it 
is not very specific for existing two parallel interfaces related with 
`BufferPool`.
   
   - We could also pass the `ResultPartitionType` in constructor of 
`AbstractPartitionBufferPoolFactory`, then the new proposed interface method is 
shown as `create(int size, BufferPoolOwner owner)`, so the only difference with 
previous methods in `BufferPoolFactory` is we do not pass the `maxUsedBuffers` 
which is calculated based on `ResultPartitionType`.
   
   - The key motivation of proposing new interface is we could only pass one 
parameter factory in constructor of `ResultPartition`, otherwise we need two 
parameters of `NetworkEnvironmentConfiguration` and `BufferPoolFactory`. But 
for the constructor of `SingleInputGate`, it seems the same because we could 
use `NetworkEnvironmentConfiguration` to replace the current `isCreditBased` 
parameter. 
   
   - Another possible advantage of new interface is we extract the logic of 
creating `BufferPool` to another place, then it makes simple for 
`SingleInputGate`,`ResultPartition` and `NetworkEnvironment`.
   
   - From the aspect of extended architecture, it seems no problem if we did 
nothing. Because different `ShuffleService` implementations might need 
different `InputGate` and `ResultPartitionWriter` implementations. Even we 
might extract the current `BlockingSubpartition` as a separate implementation 
of `ResultPartitionWriter` in future, and it does not need `BufferPoolFactory` 
any more.
   
   In summary I do not think it is very necessary to introduce new interface 
here, or not very worthing.  If we want to make constructor of partition/gate 
simple, we could only pass `Supplier` as you suggested before, and 
it could be explained by `NetworkEnvironment` itself. If the way of `Supplier` 
seems not good, I have another option:
   
   - Add new method `createBufferPool(int numRequiredBuffers, BufferPoolOwner 
owner)` in existing `BufferPoolFactory`.
   
   - Make `AbstractPartitionBufferPoolFactory` implements `BufferPoolFactory` 
and we could also pass `ResultPartitionType` into constructor of 
`AbstractPartitionBufferPoolFactory`.
   
   - `ResultPartitionBufferPoolFactory/InputGateBufferPoolFactory` extends 
`AbstractPartitionBufferPoolFactory` and they only need to implement the new 
method `createBufferPool(int numRequiredBuffers, BufferPoolOwner owner)`.
   
   To do so we extend and reuse the general `BufferPoolFactory` and only 
abstract/distinguish the implementation of calculating `maxPoolSize` in 
partition/gate. What do you think?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on issue #8389: [FLINK-12399][table] Fix FilterableTableSource does not change after applyPredicate

2019-05-13 Thread GitBox
godfreyhe commented on issue #8389: [FLINK-12399][table] Fix 
FilterableTableSource does not change after applyPredicate
URL: https://github.com/apache/flink/pull/8389#issuecomment-492069318
 
 
   @walterddr thanks for the fix.  It's a little difficult for users to 
understand `explainSource` and implement it, even though we rename 
`explainSource` to `getSourceDigest`(or other names). could we keep the logic 
about `re-compute digest after push down` in rules ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8389: [FLINK-12399][table] Fix FilterableTableSource does not change after applyPredicate

2019-05-13 Thread GitBox
godfreyhe commented on a change in pull request #8389: [FLINK-12399][table] Fix 
FilterableTableSource does not change after applyPredicate
URL: https://github.com/apache/flink/pull/8389#discussion_r283616783
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
 ##
 @@ -81,7 +81,13 @@ class FlinkLogicalTableSourceScan(
 val terms = super.explainTerms(pw)
 .item("fields", tableSource.getTableSchema.getFieldNames.mkString(", 
"))
 
-val sourceDesc = tableSource.explainSource()
+val auxiliarySourceDesc = tableSource match {
+  case fts: FilterableTableSource[_] =>
+s"FilterPushDown=${fts.isFilterPushedDown.toString}"
 
 Review comment:
   `s"FilterPushDown=${fts.isFilterPushedDown.toString}"` is not the digest of 
a `FilterableTableSource `,  consider the following case: there are several 
different filters to be pushed down. 
   so, i think the digest should be the string value of the pushed filters.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on issue #8392: [FLINK-12370][python][travis] Integrated Travis for Python Table API.

2019-05-13 Thread GitBox
WeiZhong94 commented on issue #8392: [FLINK-12370][python][travis] Integrated 
Travis for Python Table API.
URL: https://github.com/apache/flink/pull/8392#issuecomment-492064023
 
 
   @sunjincheng121 Thanks for your review! Sorry for these mistakes. I have 
fixed them in the new commit and ensure the travis works correctly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime

2019-05-13 Thread GitBox
wuchong commented on a change in pull request #8244: [FLINK-11945] 
[table-runtime-blink] Support over aggregation for blink streaming runtime
URL: https://github.com/apache/flink/pull/8244#discussion_r283609527
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ##
 @@ -0,0 +1,1025 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeProcessOperator
+import 
org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import 
org.apache.flink.table.runtime.utils.UserDefinedFunctionTestUtils.{CountNullNonNull,
 CountPairs, LargerThanCount}
+import org.apache.flink.table.runtime.utils.{StreamTestData, 
StreamingWithStateTestBase, TestingAppendSink}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.mutable
+
+@RunWith(classOf[Parameterized])
+class OverWindowITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode) {
 
 Review comment:
   Proctime over aggregates can't be covered by IT cases, because the result 
will be non-deterministic. `ProcTimeBoundedRangeOver` and 
`ProcTimeBoundedRowsOver` have been covered by `OverWindowHarnessTest`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Mr-Nineteen closed pull request #8229: [FLINK-12273]The default value of CheckpointRetentionPolicy should be …

2019-05-13 Thread GitBox
Mr-Nineteen closed pull request #8229: [FLINK-12273]The default value of 
CheckpointRetentionPolicy should be …
URL: https://github.com/apache/flink/pull/8229
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Mr-Nineteen commented on issue #8229: [FLINK-12273]The default value of CheckpointRetentionPolicy should be …

2019-05-13 Thread GitBox
Mr-Nineteen commented on issue #8229: [FLINK-12273]The default value of 
CheckpointRetentionPolicy should be …
URL: https://github.com/apache/flink/pull/8229#issuecomment-492060134
 
 
   @StephanEwen 
Ok.
   I think it's important to change this default.If some users don't know much 
about flink, it's easy to bring huge losses in production.Therefore, flink 
needs to consider this aspect and also bring a better experience to users.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12443) Replace InternalType with LogicalType in blink runner

2019-05-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12443:
---
Labels: pull-request-available  (was: )

> Replace InternalType with LogicalType in blink runner
> -
>
> Key: FLINK-12443
> URL: https://issues.apache.org/jira/browse/FLINK-12443
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
>
> LogicalType can cover all InternalType, and its function is more perfect.
> With isNullable, we can do null check optimizer in code generator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-05-13 Thread GitBox
flinkbot commented on issue #8435: [FLINK-12443][table-planner-blink] Replace 
InternalType with LogicalType in blink
URL: https://github.com/apache/flink/pull/8435#issuecomment-492059404
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi opened a new pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-05-13 Thread GitBox
JingsongLi opened a new pull request #8435: [FLINK-12443][table-planner-blink] 
Replace InternalType with LogicalType in blink
URL: https://github.com/apache/flink/pull/8435
 
 
   ## What is the purpose of the change
   
   Use new LogicalType in blink planner.
   NOTE: Use only LogicalType to override the previous InternalType 
functionality. Not implementing the new functionality of LogicalType.
   
   LogicalType and InternalType:
   1.LogicalType have nullable, so it has no singleton object.
   2.YearMonthIntervalType and DayTimeIntervalType is an independent type.
   3.MultisetType is not a MapType.
   
   ## Verifying this change
   
   ut
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12405) Introduce BLOCKING_PERSISTENT result partition type

2019-05-13 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-12405:
---
Description: 
The new {{ResultPartitionType}} : {{BLOCKING_PERSISTENT}} is similar to 
{{BLOCKING}} except it might be consumed for several times and will be released 
after TM shutdown or {{ResultPartition}} removal request.
 This is the basis for Interactive Programming.


 Here is the brief changes:

* Introduce {{ResultPartitionType}} called {{BLOCKING_PERSISTENT}}
* Introduce {{BlockingShuffleOutputFormat}} and {{BlockingShuffleDataSink}} 
which are used to generate {{GenericDataSinkBase}} with user specified 
{{IntermediateDataSetID}} (passed from TableAPI in later PR)
* when {{JobGraphGenerator}} sees a {{GenericDataSinkBase}} with 
{{IntermediateDataSetID}}, it creates a {{IntermediateDataSet}} with this id, 
then {{OutputFormatVertex}} for this {{GenericDataSinkBase}} will be excluded 
in {{JobGraph}}
* So the JobGraph may contains some JobVertex which has more 
{{IntermediateDataSet}} than its downstream consumers.

Here are some design notes:
 * Why modify {{DataSet}} and {{JobGraphGenerator}}
 Since Blink Planner is not ready yet, and Batch Table is running on Flink 
Planner(based on DataSet).
 There will be another implementation once Blink Planner is ready.
 * Why use a special {{OutputFormat}} as placeholder
 We could add a {{cache()}} method for DataSet, but we do not want to change 
DataSet API any more. so a special {{OutputFormat}} as placeholder seems 
reasonable.

  was:
The new {{ResultPartitionType}} : {{BLOCKING_PERSISTENT}} is similar to 
{{BLOCKING}} except it might be consumed for several times and will be released 
after TM shutdown or {{ResultPartition}} removal request.
This is the basis for Interactive Programming.


> Introduce BLOCKING_PERSISTENT result partition type
> ---
>
> Key: FLINK-12405
> URL: https://issues.apache.org/jira/browse/FLINK-12405
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataSet
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The new {{ResultPartitionType}} : {{BLOCKING_PERSISTENT}} is similar to 
> {{BLOCKING}} except it might be consumed for several times and will be 
> released after TM shutdown or {{ResultPartition}} removal request.
>  This is the basis for Interactive Programming.
>  Here is the brief changes:
> * Introduce {{ResultPartitionType}} called {{BLOCKING_PERSISTENT}}
> * Introduce {{BlockingShuffleOutputFormat}} and {{BlockingShuffleDataSink}} 
> which are used to generate {{GenericDataSinkBase}} with user specified 
> {{IntermediateDataSetID}} (passed from TableAPI in later PR)
> * when {{JobGraphGenerator}} sees a {{GenericDataSinkBase}} with 
> {{IntermediateDataSetID}}, it creates a {{IntermediateDataSet}} with this id, 
> then {{OutputFormatVertex}} for this {{GenericDataSinkBase}} will be excluded 
> in {{JobGraph}}
> * So the JobGraph may contains some JobVertex which has more 
> {{IntermediateDataSet}} than its downstream consumers.
> Here are some design notes:
>  * Why modify {{DataSet}} and {{JobGraphGenerator}}
>  Since Blink Planner is not ready yet, and Batch Table is running on Flink 
> Planner(based on DataSet).
>  There will be another implementation once Blink Planner is ready.
>  * Why use a special {{OutputFormat}} as placeholder
>  We could add a {{cache()}} method for DataSet, but we do not want to change 
> DataSet API any more. so a special {{OutputFormat}} as placeholder seems 
> reasonable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime

2019-05-13 Thread GitBox
KurtYoung commented on a change in pull request #8244: [FLINK-11945] 
[table-runtime-blink] Support over aggregation for blink streaming runtime
URL: https://github.com/apache/flink/pull/8244#discussion_r283604284
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.java
 ##
 @@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataview.PerKeyStateDataViewStore;
+import org.apache.flink.table.generated.AggsHandleFunction;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import 
org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ * A basic implementation to support unbounded event-time over-window.
+ */
+public abstract class RowTimeUnboundedOver extends 
KeyedProcessFunctionWithCleanupState {
 
 Review comment:
   AbstractRowTimeUnboundedPrecedingOver


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime

2019-05-13 Thread GitBox
KurtYoung commented on a change in pull request #8244: [FLINK-11945] 
[table-runtime-blink] Support over aggregation for blink streaming runtime
URL: https://github.com/apache/flink/pull/8244#discussion_r283603446
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataview.PerKeyStateDataViewStore;
+import org.apache.flink.table.generated.AggsHandleFunction;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import 
org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Process Function for ROW clause processing-time bounded OVER window.
+ */
+public class ProcTimeBoundedRowsOver extends 
KeyedProcessFunctionWithCleanupState {
 
 Review comment:
   ProcTimeRowsBoundedPrecedingFunction, and update comment


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime

2019-05-13 Thread GitBox
KurtYoung commented on a change in pull request #8244: [FLINK-11945] 
[table-runtime-blink] Support over aggregation for blink streaming runtime
URL: https://github.com/apache/flink/pull/8244#discussion_r283603987
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.java
 ##
 @@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataview.PerKeyStateDataViewStore;
+import org.apache.flink.table.generated.AggsHandleFunction;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import 
org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Process Function used for the aggregate in bounded proc-time OVER window.
+ */
+public class ProcTimeBoundedRangeOver extends 
KeyedProcessFunctionWithCleanupState {
 
 Review comment:
   and also update the comment of this class to show this class is responsible 
for range bounded preceding to current row, maybe also give some query 
examples. You can refer to the comment of the sub classes of `OverWindowFrame`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime

2019-05-13 Thread GitBox
KurtYoung commented on a change in pull request #8244: [FLINK-11945] 
[table-runtime-blink] Support over aggregation for blink streaming runtime
URL: https://github.com/apache/flink/pull/8244#discussion_r283603486
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.java
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataview.PerKeyStateDataViewStore;
+import org.apache.flink.table.generated.AggsHandleFunction;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import 
org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+
+/**
+ * Process Function for processing-time unbounded OVER window.
+ */
+public class ProcTimeUnboundedOver extends 
KeyedProcessFunctionWithCleanupState {
 
 Review comment:
   ProcTimeUnboundedPrecedingFunction, and update comment


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime

2019-05-13 Thread GitBox
KurtYoung commented on a change in pull request #8244: [FLINK-11945] 
[table-runtime-blink] Support over aggregation for blink streaming runtime
URL: https://github.com/apache/flink/pull/8244#discussion_r283598406
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecOverAggregate.scala
 ##
 @@ -134,6 +144,325 @@ class StreamExecOverAggregate(
 
   override protected def translateToPlanInternal(
   tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = {
-throw new TableException("Implements this")
+val tableConfig = tableEnv.getConfig
+
+if (logicWindow.groups.size > 1) {
+  throw new TableException(
+  "All aggregates must be computed on the same window.")
+}
+
+val overWindow: org.apache.calcite.rel.core.Window.Group = 
logicWindow.groups.get(0)
 
 Review comment:
   why use full package name here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime

2019-05-13 Thread GitBox
KurtYoung commented on a change in pull request #8244: [FLINK-11945] 
[table-runtime-blink] Support over aggregation for blink streaming runtime
URL: https://github.com/apache/flink/pull/8244#discussion_r283607890
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ##
 @@ -0,0 +1,1025 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeProcessOperator
+import 
org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import 
org.apache.flink.table.runtime.utils.UserDefinedFunctionTestUtils.{CountNullNonNull,
 CountPairs, LargerThanCount}
+import org.apache.flink.table.runtime.utils.{StreamTestData, 
StreamingWithStateTestBase, TestingAppendSink}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.mutable
+
+@RunWith(classOf[Parameterized])
+class OverWindowITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode) {
 
 Review comment:
   Please add some more tests to cover the logic of `ProcTimeBoundedRangeOver`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime

2019-05-13 Thread GitBox
KurtYoung commented on a change in pull request #8244: [FLINK-11945] 
[table-runtime-blink] Support over aggregation for blink streaming runtime
URL: https://github.com/apache/flink/pull/8244#discussion_r283603324
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.java
 ##
 @@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataview.PerKeyStateDataViewStore;
+import org.apache.flink.table.generated.AggsHandleFunction;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import 
org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Process Function used for the aggregate in bounded proc-time OVER window.
+ */
+public class ProcTimeBoundedRangeOver extends 
KeyedProcessFunctionWithCleanupState {
 
 Review comment:
   ProcTimeRangeBoundedPrecedingFunction will be more accurate 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime

2019-05-13 Thread GitBox
KurtYoung commented on a change in pull request #8244: [FLINK-11945] 
[table-runtime-blink] Support over aggregation for blink streaming runtime
URL: https://github.com/apache/flink/pull/8244#discussion_r283602389
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.java
 ##
 @@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate;
 
 Review comment:
   Could you move all over related classes to package `over`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12380) Add thread name in the log4j.properties

2019-05-13 Thread Yun Tang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-12380.

Resolution: Information Provided

> Add thread name in the log4j.properties
> ---
>
> Key: FLINK-12380
> URL: https://issues.apache.org/jira/browse/FLINK-12380
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> This is inspired by FLINK-12368 when users want to add sub-task index 
> information in the source code. We could add thread name, which already 
> contains sub-task index information, in the logs to avoid have to change the 
> source code.
> Moreover, I found existing {{logback.xml}} in Flink already contains {{thread 
> name}} information. We should also add this in the {{log4j.properties.}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime

2019-05-13 Thread GitBox
KurtYoung commented on a change in pull request #8244: [FLINK-11945] 
[table-runtime-blink] Support over aggregation for blink streaming runtime
URL: https://github.com/apache/flink/pull/8244#discussion_r283604242
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.java
 ##
 @@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataview.PerKeyStateDataViewStore;
+import org.apache.flink.table.generated.AggsHandleFunction;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import 
org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Process Function for ROWS clause event-time bounded OVER window.
+ */
+public class RowTimeBoundedRowsOver extends 
KeyedProcessFunctionWithCleanupState {
 
 Review comment:
   RowTimeRowsBoundedPrecedingFunction, and update comment


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime

2019-05-13 Thread GitBox
KurtYoung commented on a change in pull request #8244: [FLINK-11945] 
[table-runtime-blink] Support over aggregation for blink streaming runtime
URL: https://github.com/apache/flink/pull/8244#discussion_r283604194
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.java
 ##
 @@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataview.PerKeyStateDataViewStore;
+import org.apache.flink.table.generated.AggsHandleFunction;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import 
org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Process Function for RANGE clause event-time bounded OVER window.
+ */
+public class RowTimeBoundedRangeOver extends 
KeyedProcessFunctionWithCleanupState {
 
 Review comment:
   RowTimeRangeBoundedPrecedingFunction, and update comment


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime

2019-05-13 Thread GitBox
KurtYoung commented on a change in pull request #8244: [FLINK-11945] 
[table-runtime-blink] Support over aggregation for blink streaming runtime
URL: https://github.com/apache/flink/pull/8244#discussion_r283604565
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedRowsOver.java
 ##
 @@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+
+/**
+ * A ProcessFunction to support unbounded ROWS window.
+ * The ROWS clause defines on a physical level how many rows are included in a 
window frame.
+ */
+public class RowTimeUnboundedRowsOver extends RowTimeUnboundedOver {
 
 Review comment:
   RowTimeRowsUnboundedPrecedingFunction, and update comment
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime

2019-05-13 Thread GitBox
KurtYoung commented on a change in pull request #8244: [FLINK-11945] 
[table-runtime-blink] Support over aggregation for blink streaming runtime
URL: https://github.com/apache/flink/pull/8244#discussion_r283604420
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedRangeOver.java
 ##
 @@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+
+/**
+ * A ProcessFunction to support unbounded RANGE window.
+ * The RANGE option includes all the rows within the window frame
+ * that have the same ORDER BY values as the current row.
+ */
+public class RowTimeUnboundedRangeOver extends RowTimeUnboundedOver {
 
 Review comment:
   RowTimeRangeUnboundedPrecedingFunction, and update comment


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime

2019-05-13 Thread GitBox
KurtYoung commented on a change in pull request #8244: [FLINK-11945] 
[table-runtime-blink] Support over aggregation for blink streaming runtime
URL: https://github.com/apache/flink/pull/8244#discussion_r283600244
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecOverAggregate.scala
 ##
 @@ -134,6 +144,325 @@ class StreamExecOverAggregate(
 
   override protected def translateToPlanInternal(
   tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = {
-throw new TableException("Implements this")
+val tableConfig = tableEnv.getConfig
+
+if (logicWindow.groups.size > 1) {
+  throw new TableException(
+  "All aggregates must be computed on the same window.")
+}
+
+val overWindow: org.apache.calcite.rel.core.Window.Group = 
logicWindow.groups.get(0)
+
+val orderKeys = overWindow.orderKeys.getFieldCollations
+
+if (orderKeys.size() != 1) {
+  throw new TableException(
+  "The window can only be ordered by a single time column.")
+}
+val orderKey = orderKeys.get(0)
+
+if (!orderKey.direction.equals(ASCENDING)) {
+  throw new TableException(
+  "The window can only be ordered in ASCENDING mode.")
+}
+
+val inputDS = getInputNodes.get(0).translateToPlan(tableEnv)
+  .asInstanceOf[StreamTransformation[BaseRow]]
+
+val inputIsAccRetract = StreamExecRetractionRules.isAccRetract(input)
+
+if (inputIsAccRetract) {
+  throw new TableException(
+  "Retraction on Over window aggregation is not supported yet. " +
+"Note: Over window aggregation should not follow a non-windowed 
GroupBy aggregation.")
+}
+
+if (!logicWindow.groups.get(0).keys.isEmpty && 
tableConfig.getMinIdleStateRetentionTime < 0) {
+  LOG.warn(
+"No state retention interval configured for a query which accumulates 
state. " +
+  "Please provide a query configuration with valid retention interval 
to prevent " +
+  "excessive state size. You may specify a retention time of 0 to not 
clean up the state.")
+}
+
+val timeType = 
outputRowType.getFieldList.get(orderKey.getFieldIndex).getType
+
+// check time field
+if (!FlinkTypeFactory.isRowtimeIndicatorType(timeType)
+  && !FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
+  throw new TableException(
+"OVER windows' ordering in stream mode must be defined on a time 
attribute.")
+}
+
+// identify window rowtime attribute
+val rowTimeIdx: Option[Int] = if 
(FlinkTypeFactory.isRowtimeIndicatorType(timeType)) {
+  Some(orderKey.getFieldIndex)
+} else if (FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
+  None
+} else {
+  throw new TableException(
+  "OVER windows can only be applied on time attributes.")
+}
+
+val config = tableEnv.getConfig
 
 Review comment:
   already got table config in line 147


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime

2019-05-13 Thread GitBox
KurtYoung commented on a change in pull request #8244: [FLINK-11945] 
[table-runtime-blink] Support over aggregation for blink streaming runtime
URL: https://github.com/apache/flink/pull/8244#discussion_r283598342
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecOverAggregate.scala
 ##
 @@ -17,25 +17,35 @@
  */
 package org.apache.flink.table.plan.nodes.physical.stream
 
-import org.apache.flink.streaming.api.transformations.StreamTransformation
-import org.apache.flink.table.CalcitePair
-import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
-import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.dataformat.BaseRow
-import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
-import org.apache.flink.table.plan.util.RelExplainUtil
-
 import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.Window.Group
 import org.apache.calcite.rel.core.{AggregateCall, Window}
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
 
 Review comment:
   Please keep import order align with other files, you can find some 
explaination here: https://flink.apache.org/contribute-code.html#imports


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12506) Add more over window unit tests

2019-05-13 Thread Kurt Young (JIRA)
Kurt Young created FLINK-12506:
--

 Summary: Add more over window unit tests
 Key: FLINK-12506
 URL: https://issues.apache.org/jira/browse/FLINK-12506
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime, Tests
Reporter: Kurt Young


We only have ITCase for streaming over window, need to add more unit tests for 
various process functions



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Armstrongya commented on a change in pull request #8366: [FLINK-12415][doc-zh]Translate HistoryServer page into Chinese

2019-05-13 Thread GitBox
Armstrongya commented on a change in pull request #8366: 
[FLINK-12415][doc-zh]Translate HistoryServer page into Chinese
URL: https://github.com/apache/flink/pull/8366#discussion_r283605925
 
 

 ##
 File path: docs/monitoring/historyserver.zh.md
 ##
 @@ -22,62 +22,62 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Flink has a history server that can be used to query the statistics of 
completed jobs after the corresponding Flink cluster has been shut down.
+Flink 自带的 history server 可以在已执行完作业对应的 Flink 集群关闭之后查询该作业的统计信息。
 
-Furthermore, it exposes a REST API that accepts HTTP requests and responds 
with JSON data.
+除此之外,它还提供了一个 REST API,可以通过 HTTP 以 JSON 格式发送和接收数据。
 
 * This will be replaced by the TOC
 {:toc}
 
-## Overview
+## 概览
 
-The HistoryServer allows you to query the status and statistics of completed 
jobs that have been archived by a JobManager.
+History server 可以查询被 JobManager 存档的已执行完作业的状态和统计信息。
 
-After you have configured the HistoryServer *and* JobManager, you start and 
stop the HistoryServer via its corresponding startup script:
+在配置完 History server 和 JobManager 之后,用户可以通过下面的启动脚本来开启和关停 History server:
 
 {% highlight shell %}
-# Start or stop the HistoryServer
+# 开启或关停 History Server
 bin/historyserver.sh (start|start-foreground|stop)
 {% endhighlight %}
 
-By default, this server binds to `localhost` and listens at port `8082`.
+默认情况下,History server 绑定到本机 `localhost` 的 `8082` 端口。
 
-Currently, you can only run it as a standalone process.
+目前你可以把它当做单独的进程来运行。
 
-## Configuration
+## 配置
 
-The configuration keys `jobmanager.archive.fs.dir` and 
`historyserver.archive.fs.refresh-interval` need to be adjusted for archiving 
and displaying archived jobs.
+存档和展示已完成的作业 需要调配 `jobmanager.archive.fs.dir` 和 
`historyserver.archive.fs.refresh-interval` 这俩配置项。
 
 **JobManager**
 
-The archiving of completed jobs happens on the JobManager, which uploads the 
archived job information to a file system directory. You can configure the 
directory to archive completed jobs in `flink-conf.yaml` by setting a directory 
via `jobmanager.archive.fs.dir`.
+JobManager 会进行已完成作业的存档,把已存档作业的信息上传到一个文件系统目录上。用户可以通过设置 
`jobmanager.archive.fs.dir` 来配置这个存档目录,将 `flink-conf.yaml` 中已完成的作业都存档下来。
 
 {% highlight yaml %}
-# Directory to upload completed job information
+# 已完成作业信息的上传目录
 jobmanager.archive.fs.dir: hdfs:///completed-jobs
 {% endhighlight %}
 
 **HistoryServer**
 
-The HistoryServer can be configured to monitor a comma-separated list of 
directories in via `historyserver.archive.fs.dir`. The configured directories 
are regularly polled for new archives; the polling interval can be configured 
via `historyserver.archive.fs.refresh-interval`.
+History Server 可以监控 `historyserver.archive.fs.dir` 
配置的用逗号分隔的文件目录列表。并且会为新存档定期轮询已配置的目录,轮询的间隔可以通过 
`historyserver.archive.fs.refresh-interval` 来配置。
 
 Review comment:
   @wuchong Thanks for your comments.  It's more reasonable and fluent. I have 
optimized the translation. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add 
all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283605276
 
 

 ##
 File path: flink-python/pyflink/table/tests/test_batch_table_api.py
 ##
 @@ -0,0 +1,418 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+# #  distributed under the License is distributed on an "AS IS" BASIS,
+# #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# #  See the License for the specific language governing permissions and
+# # limitations under the License.
+
+import os
+
+from pyflink.table.types import DataTypes
+from pyflink.table.window import Tumble, Slide, Session
+
+from pyflink.testing.test_case_utils import PyFlinkBatchTableTestCase
+
+
+class BatchTableTests(PyFlinkBatchTableTestCase):
 
 Review comment:
   Good idea!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add 
all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283604935
 
 

 ##
 File path: flink-python/pyflink/table/table_config.py
 ##
 @@ -44,18 +47,55 @@ def parallelism(self):
 def parallelism(self, parallelism):
 self._parallelism = parallelism
 
+@property
+def timezone(self):
+return self._j_table_config.getTimeZone().getID()
+
+@timezone.setter
+def timezone(self, timezone_id):
+if timezone_id is not None and isinstance(timezone_id, str):
+j_timezone = self._jvm.java.util.TimeZone.getTimeZone(timezone_id)
+self._j_table_config.setTimeZone(j_timezone)
+else:
+raise Exception("TableConfig.timezone should be a string!")
+
+@property
+def null_check(self):
+return self._j_table_config.getNullCheck()
+
+@null_check.setter
+def null_check(self, null_check):
+if null_check is not None and isinstance(null_check, bool):
+self._j_table_config.setNullCheck(null_check)
+else:
+raise Exception("TableConfig.null_check should be a bool value!")
+
+@property
+def max_generated_code_length(self):
+return self._j_table_config.getMaxGeneratedCodeLength()
 
 Review comment:
   I think this method is needed as Python API program is just an API and the 
Java code-gen logic is shared by both Java/Scala/Python API.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] kisimple commented on issue #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor

2019-05-13 Thread GitBox
kisimple commented on issue #7757: [FLINK-11630] Triggers the termination of 
all running Tasks when shutting down TaskExecutor
URL: https://github.com/apache/flink/pull/7757#issuecomment-492054384
 
 
   Highly appreciate your review :) @azagrebin I have addressed the comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8420: [FLINK-12408][python] Allow to define the data types in Python

2019-05-13 Thread GitBox
flinkbot edited a comment on issue #8420: [FLINK-12408][python] Allow to define 
the data types in Python
URL: https://github.com/apache/flink/pull/8420#issuecomment-491604302
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❗ 3. Needs [attention] from.
   - Needs attention by @twalthr [PMC]
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8420: [FLINK-12408][python] Allow to define the data types in Python

2019-05-13 Thread GitBox
sunjincheng121 commented on issue #8420: [FLINK-12408][python] Allow to define 
the data types in Python
URL: https://github.com/apache/flink/pull/8420#issuecomment-492054007
 
 
   @flinkbot attention @twalthr 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-12494) JDBCOutputFormat support reconnect when link failure and flush by timeInterval

2019-05-13 Thread zhaoshijie (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhaoshijie reassigned FLINK-12494:
--

Assignee: zhaoshijie

> JDBCOutputFormat support reconnect when link failure and flush by timeInterval
> --
>
> Key: FLINK-12494
> URL: https://issues.apache.org/jira/browse/FLINK-12494
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.8.0
>Reporter: zhaoshijie
>Assignee: zhaoshijie
>Priority: Major
>
> when i JDBCSink(flink-1.4.2) wite recode to mysql,find exception as flow :
>  
> {code:java}
> java.util.concurrent.ExecutionException: 
> com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
> failure
> The last packet successfully received from the server was 265,251 
> milliseconds ago. The last packet sent successfully to the server was 265,252 
> milliseconds ago.
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.get(StreamRecordQueueEntry.java:68)
> at 
> org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:129)
> ... 2 more
> Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: 
> Communications link failure
> The last packet successfully received from the server was 265,251 
> milliseconds ago. The last packet sent successfully to the server was 265,252 
> milliseconds ago.
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
> at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1116)
> at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3364)
> at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1983)
> at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2163)
> at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2624)
> at 
> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2127)
> at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2293)
> at 
> org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction.executeQuery(JDBCDimensionTableFunction.scala:199)
> at 
> org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction.executeQueryAndCombine(JDBCDimensionTableFunction.scala:139)
> at 
> org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction$$anon$1.apply(JDBCDimensionTableFunction.scala:83)
> at 
> org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction$$anon$1.apply(JDBCDimensionTableFunction.scala:73)
> at 
> org.apache.flink.streaming.api.functions.async.DimensionTableJoinFunction.lambda$asyncInvoke$0(DimensionTableJoinFunction.java:105)
> at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:97)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> ... 1 more
> Caused by: java.net.SocketException: Connection reset
> at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
> at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3345)
> ... 16 more
> {code}
> i think it is too long not write record by connection(idleConnection),server 
> close connection initiative. sparse data is relatively common in fact, so i 
> think we should reconnect when then connection is invalid。
> besides,i find JDBCOutputFormat.flush only call by snapshotState method and 
> "batchCount >= batchInterval",also if ours sink records is sparse, we will 
> find actual write happended by very large time delay,so should we add a flush 
> condition:currentTime- lastFlushTime > timeInterval?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8401: 
[FLINK-12407][python] Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283585908
 
 

 ##
 File path: flink-python/pyflink/table/tests/test_batch_table_api.py
 ##
 @@ -0,0 +1,418 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+# #  distributed under the License is distributed on an "AS IS" BASIS,
+# #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# #  See the License for the specific language governing permissions and
+# # limitations under the License.
+
+import os
+
+from pyflink.table.types import DataTypes
+from pyflink.table.window import Tumble, Slide, Session
+
+from pyflink.testing.test_case_utils import PyFlinkBatchTableTestCase
+
+
+class BatchTableTests(PyFlinkBatchTableTestCase):
+
+def test_select_alias(self):
 
 Review comment:
   I think we should align the test structure with Java/Scala testing,  which I 
mentioned in the test framework PR.  
   
![image](https://user-images.githubusercontent.com/22488084/57662378-d7523c80-7621-11e9-92f2-59b58f9aafdb.png)
   So it's better to classification the test. What do you think?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8401: 
[FLINK-12407][python] Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283601896
 
 

 ##
 File path: flink-python/pyflink/table/table_config.py
 ##
 @@ -44,18 +47,55 @@ def parallelism(self):
 def parallelism(self, parallelism):
 self._parallelism = parallelism
 
+@property
+def timezone(self):
+return self._j_table_config.getTimeZone().getID()
+
+@timezone.setter
+def timezone(self, timezone_id):
 
 Review comment:
   Add some Doc, such as:
   
   `the timezone_id for a timezone, either an abbreviation such as "PST", a 
full name such as "America/Los_Angeles", or a custom timezone_id such as 
"GMT-8:00". `
   
   What do you think?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8401: 
[FLINK-12407][python] Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283588400
 
 

 ##
 File path: flink-python/pyflink/table/query_config.py
 ##
 @@ -0,0 +1,108 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABCMeta
+from datetime import timedelta
+from py4j.compat import long
+
+from pyflink.java_gateway import get_gateway
+
+
+class QueryConfig(object):
+"""
+The :class:`QueryConfig` holds parameters to configure the behavior of 
queries.
+"""
+
+__metaclass__ = ABCMeta
+
+def __init__(self, j_query_config):
+self._j_query_config = j_query_config
+
+
+class StreamQueryConfig(QueryConfig):
+"""
+The :class:`StreamQueryConfig` holds parameters to configure the behavior 
of streaming queries.
+"""
+
+def __init__(self, j_stream_query_config=None):
+self._jvm = get_gateway().jvm
+if j_stream_query_config is not None:
+self._j_stream_query_config = j_stream_query_config
+else:
+self._j_stream_query_config = self._jvm.StreamQueryConfig()
+super(StreamQueryConfig, self).__init__(self._j_stream_query_config)
+
+def with_idle_state_retention_time(self, min_time, max_time):
+"""
+Specifies a minimum and a maximum time interval for how long idle 
state, i.e., state which
+was not updated, will be retained.
+
+State will never be cleared until it was idle for less than the 
minimum time and will never
+be kept if it was idle for more than the maximum time.
+
+When new data arrives for previously cleaned-up state, the new data 
will be handled as if it
+was the first data. This can result in previous results being 
overwritten.
+
+Set to 0 (zero) to never clean-up the state.
+
+.. note::
+Cleaning up state requires additional bookkeeping which becomes 
less expensive for
+larger differences of minTime and maxTime. The difference between 
minTime and maxTime
+must be at least 5 minutes.
+
+:param min_time: The minimum time interval for which idle state is 
retained. Set to 0(zero)
+ to never clean-up the state.
+:param max_time: The maximum time interval for which idle state is 
retained. Must be at
+ least 5 minutes greater than minTime. Set to 0 (zero) 
to never clean-up
+ the state.
+:return: :class:`StreamQueryConfig`
+"""
+#  type: (timedelta, timedelta) -> StreamQueryConfig
+j_time_class = self._jvm.org.apache.flink.api.common.time.Time
+j_min_time = 
j_time_class.milliseconds(long(round(min_time.total_seconds() * 1000)))
+j_max_time = 
j_time_class.milliseconds(long(round(max_time.total_seconds() * 1000)))
+self._j_stream_query_config = \
+self._j_stream_query_config.withIdleStateRetentionTime(j_min_time, 
j_max_time)
+return self
+
+def get_min_idle_state_retention_time(self):
+"""
+:return: The minimum time until state which was not updated will be 
retained.
 
 Review comment:
   Add `State might be cleared and removed if it was not updated for the 
defined period of time.`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8401: 
[FLINK-12407][python] Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283589060
 
 

 ##
 File path: flink-python/pyflink/table/query_config.py
 ##
 @@ -0,0 +1,108 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABCMeta
+from datetime import timedelta
+from py4j.compat import long
+
+from pyflink.java_gateway import get_gateway
+
+
+class QueryConfig(object):
+"""
+The :class:`QueryConfig` holds parameters to configure the behavior of 
queries.
+"""
+
+__metaclass__ = ABCMeta
+
+def __init__(self, j_query_config):
+self._j_query_config = j_query_config
+
+
+class StreamQueryConfig(QueryConfig):
+"""
+The :class:`StreamQueryConfig` holds parameters to configure the behavior 
of streaming queries.
+"""
+
+def __init__(self, j_stream_query_config=None):
+self._jvm = get_gateway().jvm
+if j_stream_query_config is not None:
+self._j_stream_query_config = j_stream_query_config
+else:
+self._j_stream_query_config = self._jvm.StreamQueryConfig()
+super(StreamQueryConfig, self).__init__(self._j_stream_query_config)
+
+def with_idle_state_retention_time(self, min_time, max_time):
 
 Review comment:
   Python Doc needs to add time units, which is different from Java. What do 
you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8401: 
[FLINK-12407][python] Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283588406
 
 

 ##
 File path: flink-python/pyflink/table/query_config.py
 ##
 @@ -0,0 +1,108 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABCMeta
+from datetime import timedelta
+from py4j.compat import long
+
+from pyflink.java_gateway import get_gateway
+
+
+class QueryConfig(object):
+"""
+The :class:`QueryConfig` holds parameters to configure the behavior of 
queries.
+"""
+
+__metaclass__ = ABCMeta
+
+def __init__(self, j_query_config):
+self._j_query_config = j_query_config
+
+
+class StreamQueryConfig(QueryConfig):
+"""
+The :class:`StreamQueryConfig` holds parameters to configure the behavior 
of streaming queries.
+"""
+
+def __init__(self, j_stream_query_config=None):
+self._jvm = get_gateway().jvm
+if j_stream_query_config is not None:
+self._j_stream_query_config = j_stream_query_config
+else:
+self._j_stream_query_config = self._jvm.StreamQueryConfig()
+super(StreamQueryConfig, self).__init__(self._j_stream_query_config)
+
+def with_idle_state_retention_time(self, min_time, max_time):
+"""
+Specifies a minimum and a maximum time interval for how long idle 
state, i.e., state which
+was not updated, will be retained.
+
+State will never be cleared until it was idle for less than the 
minimum time and will never
+be kept if it was idle for more than the maximum time.
+
+When new data arrives for previously cleaned-up state, the new data 
will be handled as if it
+was the first data. This can result in previous results being 
overwritten.
+
+Set to 0 (zero) to never clean-up the state.
+
+.. note::
+Cleaning up state requires additional bookkeeping which becomes 
less expensive for
+larger differences of minTime and maxTime. The difference between 
minTime and maxTime
+must be at least 5 minutes.
+
+:param min_time: The minimum time interval for which idle state is 
retained. Set to 0(zero)
+ to never clean-up the state.
+:param max_time: The maximum time interval for which idle state is 
retained. Must be at
+ least 5 minutes greater than minTime. Set to 0 (zero) 
to never clean-up
+ the state.
+:return: :class:`StreamQueryConfig`
+"""
+#  type: (timedelta, timedelta) -> StreamQueryConfig
+j_time_class = self._jvm.org.apache.flink.api.common.time.Time
+j_min_time = 
j_time_class.milliseconds(long(round(min_time.total_seconds() * 1000)))
+j_max_time = 
j_time_class.milliseconds(long(round(max_time.total_seconds() * 1000)))
+self._j_stream_query_config = \
+self._j_stream_query_config.withIdleStateRetentionTime(j_min_time, 
j_max_time)
+return self
+
+def get_min_idle_state_retention_time(self):
+"""
+:return: The minimum time until state which was not updated will be 
retained.
+"""
+#  type: () -> int
+return self._j_stream_query_config.getMinIdleStateRetentionTime()
+
+def get_max_idle_state_retention_time(self):
+"""
+:return: The maximum time until state which was not updated will be 
retained.
 
 Review comment:
   Add `State will be cleared and removed if it was not updated for the defined 
period of time.`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8401: 
[FLINK-12407][python] Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283589635
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -106,6 +113,344 @@ def where(self, predicate):
 """
 return Table(self._j_table.where(predicate))
 
+def group_by(self, fields):
+"""
+Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+to perform the aggregation on a per-group basis. Similar to a SQL 
GROUP BY statement.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg")
+
+:param fields: Group keys.
+:return: The grouped table.
+"""
+return GroupedTable(self._j_table.groupBy(fields))
+
+def distinct(self):
+"""
+Removes duplicate values and returns onl
+Example:
+::
+>>> tab.select("key, value").distinct()
+
+:return: Result table.
+"""
+return Table(self._j_table.distinct())
+
+def join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL join. The fields of the 
two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary. You can use where and select clauses after a join to 
further specify the
+behaviour of the join.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` .
+
+Example:
+::
+>>> left.join(right).where("a = b && c > 3").select("a, b, d")
+>>> left.join(right, "a = b")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is not None:
+return Table(self._j_table.join(right._j_table, join_predicate))
+else:
+return Table(self._j_table.join(right._j_table))
+
+def left_outer_join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL left outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.left_outer_join(right).select("a, b, d")
+>>> left.left_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is None:
+return Table(self._j_table.leftOuterJoin(right._j_table))
+else:
+return Table(self._j_table.leftOuterJoin(
+right._j_table, join_predicate))
+
+def right_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL right outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.right_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+"""
+return Table(self._j_table.rightOuterJoin(
+right._j_table, join_predicate))
+
+def full_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL full outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
 
 Review comment:
   Same as above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8401: 
[FLINK-12407][python] Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283586736
 
 

 ##
 File path: flink-python/pyflink/table/tests/test_batch_table_api.py
 ##
 @@ -0,0 +1,418 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+# #  distributed under the License is distributed on an "AS IS" BASIS,
+# #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# #  See the License for the specific language governing permissions and
+# # limitations under the License.
+
+import os
+
+from pyflink.table.types import DataTypes
+from pyflink.table.window import Tumble, Slide, Session
+
+from pyflink.testing.test_case_utils import PyFlinkBatchTableTestCase
+
+
+class BatchTableTests(PyFlinkBatchTableTestCase):
 
 Review comment:
   And I think we also need a base test class which can run both Batch and 
Stream, For very special tests we can choose to use Stream or Batch to execute? 
What do you think? @WeiZhong94 @dianfu 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8401: 
[FLINK-12407][python] Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283596541
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -106,6 +113,344 @@ def where(self, predicate):
 """
 return Table(self._j_table.where(predicate))
 
+def group_by(self, fields):
+"""
+Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+to perform the aggregation on a per-group basis. Similar to a SQL 
GROUP BY statement.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg")
+
+:param fields: Group keys.
+:return: The grouped table.
+"""
+return GroupedTable(self._j_table.groupBy(fields))
+
+def distinct(self):
+"""
+Removes duplicate values and returns onl
+Example:
+::
+>>> tab.select("key, value").distinct()
+
+:return: Result table.
+"""
+return Table(self._j_table.distinct())
+
+def join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL join. The fields of the 
two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary. You can use where and select clauses after a join to 
further specify the
+behaviour of the join.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` .
+
+Example:
+::
+>>> left.join(right).where("a = b && c > 3").select("a, b, d")
+>>> left.join(right, "a = b")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is not None:
+return Table(self._j_table.join(right._j_table, join_predicate))
+else:
+return Table(self._j_table.join(right._j_table))
+
+def left_outer_join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL left outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.left_outer_join(right).select("a, b, d")
+>>> left.left_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is None:
+return Table(self._j_table.leftOuterJoin(right._j_table))
+else:
+return Table(self._j_table.leftOuterJoin(
+right._j_table, join_predicate))
+
+def right_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL right outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.right_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+"""
+return Table(self._j_table.rightOuterJoin(
+right._j_table, join_predicate))
+
+def full_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL full outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.full_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+"""
+return Table(self._j_table.fullOuterJoin(
+right._j_table, join_predicate))
+
+def minus(self, right):
+"""
+Minus of two :class:`Table`s with duplicate records removed.
+Similar to a SQL EXCEPT clause. Minus returns records from the left 

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8401: 
[FLINK-12407][python] Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283589608
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -106,6 +113,344 @@ def where(self, predicate):
 """
 return Table(self._j_table.where(predicate))
 
+def group_by(self, fields):
+"""
+Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+to perform the aggregation on a per-group basis. Similar to a SQL 
GROUP BY statement.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg")
+
+:param fields: Group keys.
+:return: The grouped table.
+"""
+return GroupedTable(self._j_table.groupBy(fields))
+
+def distinct(self):
+"""
+Removes duplicate values and returns onl
+Example:
+::
+>>> tab.select("key, value").distinct()
+
+:return: Result table.
+"""
+return Table(self._j_table.distinct())
+
+def join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL join. The fields of the 
two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary. You can use where and select clauses after a join to 
further specify the
+behaviour of the join.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` .
+
+Example:
+::
+>>> left.join(right).where("a = b && c > 3").select("a, b, d")
+>>> left.join(right, "a = b")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is not None:
+return Table(self._j_table.join(right._j_table, join_predicate))
+else:
+return Table(self._j_table.join(right._j_table))
+
+def left_outer_join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL left outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.left_outer_join(right).select("a, b, d")
+>>> left.left_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is None:
+return Table(self._j_table.leftOuterJoin(right._j_table))
+else:
+return Table(self._j_table.leftOuterJoin(
+right._j_table, join_predicate))
+
+def right_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL right outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
 
 Review comment:
   `~pyflink.table.Table.as_` -> `~pyflink.table.Table.alias`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8401: 
[FLINK-12407][python] Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283602349
 
 

 ##
 File path: flink-python/pyflink/table/table_config.py
 ##
 @@ -44,18 +47,55 @@ def parallelism(self):
 def parallelism(self, parallelism):
 self._parallelism = parallelism
 
+@property
+def timezone(self):
+return self._j_table_config.getTimeZone().getID()
+
+@timezone.setter
+def timezone(self, timezone_id):
+if timezone_id is not None and isinstance(timezone_id, str):
+j_timezone = self._jvm.java.util.TimeZone.getTimeZone(timezone_id)
+self._j_table_config.setTimeZone(j_timezone)
+else:
+raise Exception("TableConfig.timezone should be a string!")
+
+@property
+def null_check(self):
+return self._j_table_config.getNullCheck()
+
+@null_check.setter
+def null_check(self, null_check):
+if null_check is not None and isinstance(null_check, bool):
+self._j_table_config.setNullCheck(null_check)
+else:
+raise Exception("TableConfig.null_check should be a bool value!")
+
+@property
+def max_generated_code_length(self):
+return self._j_table_config.getMaxGeneratedCodeLength()
 
 Review comment:
   Do we need this method for Python API(this is only using in Java code gen).?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8401: 
[FLINK-12407][python] Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283590341
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -106,6 +113,344 @@ def where(self, predicate):
 """
 return Table(self._j_table.where(predicate))
 
+def group_by(self, fields):
+"""
+Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+to perform the aggregation on a per-group basis. Similar to a SQL 
GROUP BY statement.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg")
+
+:param fields: Group keys.
+:return: The grouped table.
+"""
+return GroupedTable(self._j_table.groupBy(fields))
+
+def distinct(self):
+"""
+Removes duplicate values and returns onl
+Example:
+::
+>>> tab.select("key, value").distinct()
+
+:return: Result table.
+"""
+return Table(self._j_table.distinct())
+
+def join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL join. The fields of the 
two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary. You can use where and select clauses after a join to 
further specify the
+behaviour of the join.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` .
+
+Example:
+::
+>>> left.join(right).where("a = b && c > 3").select("a, b, d")
+>>> left.join(right, "a = b")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is not None:
+return Table(self._j_table.join(right._j_table, join_predicate))
+else:
+return Table(self._j_table.join(right._j_table))
+
+def left_outer_join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL left outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.left_outer_join(right).select("a, b, d")
+>>> left.left_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is None:
+return Table(self._j_table.leftOuterJoin(right._j_table))
+else:
+return Table(self._j_table.leftOuterJoin(
+right._j_table, join_predicate))
+
+def right_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL right outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.right_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+"""
+return Table(self._j_table.rightOuterJoin(
+right._j_table, join_predicate))
+
+def full_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL full outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.full_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+"""
+return Table(self._j_table.fullOuterJoin(
+right._j_table, join_predicate))
+
+def minus(self, right):
+"""
+Minus of two :class:`Table`s with duplicate records removed.
+Similar to a SQL EXCEPT clause. Minus returns records from the left 

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8401: 
[FLINK-12407][python] Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283590748
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -106,6 +113,344 @@ def where(self, predicate):
 """
 return Table(self._j_table.where(predicate))
 
+def group_by(self, fields):
+"""
+Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+to perform the aggregation on a per-group basis. Similar to a SQL 
GROUP BY statement.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg")
+
+:param fields: Group keys.
+:return: The grouped table.
+"""
+return GroupedTable(self._j_table.groupBy(fields))
+
+def distinct(self):
+"""
+Removes duplicate values and returns onl
+Example:
+::
+>>> tab.select("key, value").distinct()
+
+:return: Result table.
+"""
+return Table(self._j_table.distinct())
+
+def join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL join. The fields of the 
two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary. You can use where and select clauses after a join to 
further specify the
+behaviour of the join.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` .
+
+Example:
+::
+>>> left.join(right).where("a = b && c > 3").select("a, b, d")
+>>> left.join(right, "a = b")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is not None:
+return Table(self._j_table.join(right._j_table, join_predicate))
+else:
+return Table(self._j_table.join(right._j_table))
+
+def left_outer_join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL left outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.left_outer_join(right).select("a, b, d")
+>>> left.left_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is None:
+return Table(self._j_table.leftOuterJoin(right._j_table))
+else:
+return Table(self._j_table.leftOuterJoin(
+right._j_table, join_predicate))
+
+def right_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL right outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.right_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+"""
+return Table(self._j_table.rightOuterJoin(
+right._j_table, join_predicate))
+
+def full_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL full outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.as_` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.full_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+"""
+return Table(self._j_table.fullOuterJoin(
+right._j_table, join_predicate))
 
 Review comment:
   Can we in  one line. ?


This is an automated message from the Apache Git Service.
To respond to the 

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8401: 
[FLINK-12407][python] Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283601943
 
 

 ##
 File path: flink-python/pyflink/table/table_config.py
 ##
 @@ -44,18 +47,55 @@ def parallelism(self):
 def parallelism(self, parallelism):
 self._parallelism = parallelism
 
+@property
+def timezone(self):
+return self._j_table_config.getTimeZone().getID()
+
+@timezone.setter
+def timezone(self, timezone_id):
+if timezone_id is not None and isinstance(timezone_id, str):
 
 Review comment:
   It's there some simple way to translate the python timezone to java timezone?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8401: 
[FLINK-12407][python] Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283583541
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -106,6 +113,344 @@ def where(self, predicate):
 """
 return Table(self._j_table.where(predicate))
 
+def group_by(self, fields):
+"""
+Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+to perform the aggregation on a per-group basis. Similar to a SQL 
GROUP BY statement.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg")
+
+:param fields: Group keys.
+:return: The grouped table.
+"""
+return GroupedTable(self._j_table.groupBy(fields))
+
+def distinct(self):
+"""
+Removes duplicate values and returns onl
 
 Review comment:
   `Removes duplicate values and returns onl` -> `Removes duplicate values and 
returns only distinct (different) values.`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-13 Thread GitBox
sunjincheng121 commented on a change in pull request #8401: 
[FLINK-12407][python] Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r283599842
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -118,3 +463,112 @@ def insert_into(self, table_name):
 :param table_name: Name of the :class:`TableSink` to which the 
:class:`Table` is written.
 """
 self._j_table.insertInto(table_name)
+
+def __str__(self):
+return self._j_table.toString()
+
+
+class GroupedTable(object):
+"""
+A table that has been grouped on a set of grouping keys.
+"""
+
+def __init__(self, java_table):
+self._j_table = java_table
+
+def select(self, fields):
+"""
+Performs a selection operation on a grouped table. Similar to an SQL 
SELECT statement.
+The field expressions can contain complex expressions and aggregations.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg + ' The average' as 
average")
+
+
+:param fields: Expression string that contains group keys and 
aggregate function calls.
+:return: Result table.
+"""
+return Table(self._j_table.select(fields))
+
+
+class GroupWindowedTable(object):
+"""
+A table that has been windowed for :class:`GroupWindow`s.
+"""
+
+def __init__(self, java_group_windowed_table):
+self._java_table = java_group_windowed_table
 
 Review comment:
   Both `_j_table` and `_j_group_windowed_table` are makes sense to me.  
Because if we using `_j_table` all JVM *table and be ref by `_j_table`. if we 
using ` _j_group_windowed_table` we also need `_j_over_windowed_table`, 
`_j_windowed_grouped_table` etc.  For simple, I prefer using `_j_table` for all 
of the class in `Table.py`? What do you think? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10837) Kafka 2.0 test KafkaITCase.testOneToOneSources dead locked on deleteTestTopic?

2019-05-13 Thread Bowen Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16839011#comment-16839011
 ] 

Bowen Li commented on FLINK-10837:
--

Hi [~pnowojski] , do you know how to change the "Affected Version" to 1.9.0? 
Seems it doesn't allow me to do so

> Kafka 2.0 test KafkaITCase.testOneToOneSources dead locked on deleteTestTopic?
> --
>
> Key: FLINK-10837
> URL: https://issues.apache.org/jira/browse/FLINK-10837
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.0
>Reporter: Piotr Nowojski
>Priority: Major
> Fix For: 1.7.0
>
>
> https://api.travis-ci.org/v3/job/452439034/log.txt
> {noformat}
> Tests in error: 
>   
> KafkaITCase.testOneToOneSources:97->KafkaConsumerTestBase.runOneToOneExactlyOnceTest:875->KafkaTestBase.deleteTestTopic:206->Object.wait:502->Object.wait:-2
>  » TestTimedOut
> {noformat}
> {noformat}
> java.lang.InterruptedException
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.deleteTestTopic(KafkaTestEnvironmentImpl.java:158)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.deleteTestTopic(KafkaTestBase.java:206)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runOneToOneExactlyOnceTest(KafkaConsumerTestBase.java:875)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase.testOneToOneSources(KafkaITCase.java:97)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:748)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] yanghua edited a comment on issue #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore

2019-05-13 Thread GitBox
yanghua edited a comment on issue #8410: [FLINK-11159] Allow configuration 
whether to fall back to savepoints for restore
URL: https://github.com/apache/flink/pull/8410#issuecomment-492047904
 
 
   It's very strange, the Travis reported compile error :
   
   ```
   16:40:05.120 [ERROR] 
/home/travis/build/apache/flink/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java:[343,95]
 constructor CheckpointCoordinatorConfiguration in class 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration 
cannot be applied to given types;
   16:40:05.120 [ERROR] required: 
long,long,long,int,org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy,boolean,boolean
   16:40:05.120 [ERROR] found: 
long,long,long,int,org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy,boolean
   ```
   
   But the class `JobGraphTest` does not use 
`CheckpointCoordinatorConfiguration`.
   
   Will trigger rebuild.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on issue #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore

2019-05-13 Thread GitBox
yanghua commented on issue #8410: [FLINK-11159] Allow configuration whether to 
fall back to savepoints for restore
URL: https://github.com/apache/flink/pull/8410#issuecomment-492047904
 
 
   It's very strange, the Travis reported compile error :
   
   ```
   16:40:05.120 [ERROR] 
/home/travis/build/apache/flink/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java:[343,95]
 constructor CheckpointCoordinatorConfiguration in class 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration 
cannot be applied to given types;
   16:40:05.120 [ERROR] required: 
long,long,long,int,org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy,boolean,boolean
   16:40:05.120 [ERROR] found: 
long,long,long,int,org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy,boolean
   ```
   
   But the class `JobGraphTest` does not use 
`CheckpointCoordinatorConfiguration`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10837) Kafka 2.0 test KafkaITCase.testOneToOneSources dead locked on deleteTestTopic?

2019-05-13 Thread Bowen Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16839008#comment-16839008
 ] 

Bowen Li commented on FLINK-10837:
--

Ran into this again at build https://travis-ci.org/apache/flink/builds/532040077

Logs: https://api.travis-ci.org/v3/job/532040083/log.txt


{code:java}
00:34:35.293 [INFO] Tests run: 12, Failures: 0, Errors: 0, Skipped: 0, Time 
elapsed: 314.882 s - in 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase
00:34:35.650 [INFO] 
00:34:35.650 [INFO] Results:
00:34:35.650 [INFO] 
00:34:35.650 [ERROR] Errors: 
00:34:35.650 [ERROR]   
KafkaITCase.testTimestamps:264->KafkaTestBase.deleteTestTopic:204->Object.wait:-2
 » TestTimedOut
00:34:35.650 [INFO] 
00:34:35.650 [ERROR] Tests run: 46, Failures: 0, Errors: 1, Skipped: 0
{code}


> Kafka 2.0 test KafkaITCase.testOneToOneSources dead locked on deleteTestTopic?
> --
>
> Key: FLINK-10837
> URL: https://issues.apache.org/jira/browse/FLINK-10837
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.0
>Reporter: Piotr Nowojski
>Priority: Major
> Fix For: 1.7.0
>
>
> https://api.travis-ci.org/v3/job/452439034/log.txt
> {noformat}
> Tests in error: 
>   
> KafkaITCase.testOneToOneSources:97->KafkaConsumerTestBase.runOneToOneExactlyOnceTest:875->KafkaTestBase.deleteTestTopic:206->Object.wait:502->Object.wait:-2
>  » TestTimedOut
> {noformat}
> {noformat}
> java.lang.InterruptedException
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.deleteTestTopic(KafkaTestEnvironmentImpl.java:158)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.deleteTestTopic(KafkaTestBase.java:206)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runOneToOneExactlyOnceTest(KafkaConsumerTestBase.java:875)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase.testOneToOneSources(KafkaITCase.java:97)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:748)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-10837) Kafka 2.0 test KafkaITCase.testOneToOneSources dead locked on deleteTestTopic?

2019-05-13 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reopened FLINK-10837:
--

> Kafka 2.0 test KafkaITCase.testOneToOneSources dead locked on deleteTestTopic?
> --
>
> Key: FLINK-10837
> URL: https://issues.apache.org/jira/browse/FLINK-10837
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.0
>Reporter: Piotr Nowojski
>Priority: Major
> Fix For: 1.7.0
>
>
> https://api.travis-ci.org/v3/job/452439034/log.txt
> {noformat}
> Tests in error: 
>   
> KafkaITCase.testOneToOneSources:97->KafkaConsumerTestBase.runOneToOneExactlyOnceTest:875->KafkaTestBase.deleteTestTopic:206->Object.wait:502->Object.wait:-2
>  » TestTimedOut
> {noformat}
> {noformat}
> java.lang.InterruptedException
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.deleteTestTopic(KafkaTestEnvironmentImpl.java:158)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.deleteTestTopic(KafkaTestBase.java:206)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runOneToOneExactlyOnceTest(KafkaConsumerTestBase.java:875)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase.testOneToOneSources(KafkaITCase.java:97)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:748)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-13 Thread GitBox
bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283595220
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala
 ##
 @@ -69,45 +66,19 @@ import _root_.scala.collection.mutable
   */
 abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
 
-  // the catalog to hold all registered and translated tables
-  // we disable caching here to prevent side effects
-  private val internalSchema: CalciteSchema = 
CalciteSchema.createRootSchema(false, false)
-  private val rootSchema: SchemaPlus = internalSchema.plus()
-
   // Table API/SQL function catalog
   private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog()
 
-  // the configuration to create a Calcite planner
-  private lazy val frameworkConfig: FrameworkConfig = Frameworks
-.newConfigBuilder
-.defaultSchema(rootSchema)
-.parserConfig(getSqlParserConfig)
-.costFactory(new DataSetCostFactory)
-.typeSystem(new FlinkTypeSystem)
-.operatorTable(getSqlOperatorTable)
-.sqlToRelConverterConfig(getSqlToRelConverterConfig)
-// the converter is needed when calling temporal table functions from SQL, 
because
-// they reference a history table represented with a tree of table 
operations
-.context(Contexts.of(
-  new TableOperationConverter.ToRelConverterSupplier(expressionBridge)
-))
-// set the executor to evaluate constant expressions
-.executor(new ExpressionReducer(config))
-.build
+  private val BUILTIN_CATALOG_NAME = "builtin"
 
 Review comment:
   does TableEnvironment need to know the default catalog's name in 
CatalogManager?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-13 Thread GitBox
bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283595427
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerSchema.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.CatalogNotExistException;
+
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Table;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Bridge between {@link CatalogManager} and {@link Schema}. This way we can 
query Flink's specific catalogs
+ * from Calcite.
+ *
+ * The mapping for {@link Catalog}s is modeled as a strict two-level 
reference structure for Flink in Calcite,
+ * the full path of tables and views is of format 
[catalog_name].[db_name].[meta-object_name].
+ *
+ * It also supports {@link ExternalCatalog}s. An external catalog maps 1:1 
to Calcite's schema.
+ */
+@Internal
+public class CatalogManagerSchema implements Schema {
 
 Review comment:
   can we use a better name for this class? Not sure if we should support 
ExternalCatalog yet, but even if we do, its naming has been a bit confusing


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8392: [FLINK-12370][python][travis] Integrated Travis for Python Table API.

2019-05-13 Thread GitBox
sunjincheng121 commented on issue #8392: [FLINK-12370][python][travis] 
Integrated Travis for Python Table API.
URL: https://github.com/apache/flink/pull/8392#issuecomment-492046091
 
 
   > I find one thing we need double check, is the cost of the testing, maybe 
there is a bug need fix(I am not sure, I will check it):
   > 
![image](https://user-images.githubusercontent.com/22488084/57661099-0bc2fa00-761c-11e9-8a11-cdd7e49c3fe6.png)
   > @WeiZhong94
   
   the script bug, see follows:
   
![image](https://user-images.githubusercontent.com/22488084/57664749-05d51500-762c-11e9-8963-f0cf2f264ba7.png)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung merged pull request #8407: [FLINK-12374][table-planner-blink] Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan to StreamTransformation.

2019-05-13 Thread GitBox
KurtYoung merged pull request #8407: [FLINK-12374][table-planner-blink] Support 
translation from StreamExecTableSourceScan/BatchExecTableSourceScan to 
StreamTransformation.
URL: https://github.com/apache/flink/pull/8407
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12374) Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan to StreamTransformation.

2019-05-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12374:
---
Labels: pull-request-available  (was: )

> Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan 
> to StreamTransformation.
> 
>
> Key: FLINK-12374
> URL: https://issues.apache.org/jira/browse/FLINK-12374
> Project: Flink
>  Issue Type: Task
>  Components: Table SQL / Planner
>Reporter: Jing Zhang
>Assignee: Jing Zhang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12374) Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan to StreamTransformation.

2019-05-13 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-12374.
--
   Resolution: Implemented
Fix Version/s: 1.9.0

merge in 1.9.0: 2826ff80c4b056d7af589649238b3acabca43837

> Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan 
> to StreamTransformation.
> 
>
> Key: FLINK-12374
> URL: https://issues.apache.org/jira/browse/FLINK-12374
> Project: Flink
>  Issue Type: Task
>  Components: Table SQL / Planner
>Reporter: Jing Zhang
>Assignee: Jing Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12502) Harden JobMasterTest#testRequestNextInputSplitWithDataSourceFailover

2019-05-13 Thread leesf (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

leesf reassigned FLINK-12502:
-

Assignee: leesf

> Harden JobMasterTest#testRequestNextInputSplitWithDataSourceFailover
> 
>
> Key: FLINK-12502
> URL: https://issues.apache.org/jira/browse/FLINK-12502
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: leesf
>Priority: Major
>
> The {{JobMasterTest#testRequestNextInputSplitWithDataSourceFailover}} relies 
> on how many files you have in your working directory. This assumption is 
> quite brittle. Instead we should explicitly instantiate an 
> {{InputSplitAssigner}} with a defined number of input splits. 
> Moreover, we should make the assertions more explicit: Input split 
> comparisons should not rely solely on the length of the input split data.
> Maybe it is also not necessary to capture the full 
> {{TaskDeploymentDescriptor}} because we could already know the producer's and 
> consumer's {{JobVertexID}} when we create the {{JobGraph}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12504) NullPoint here NullPointException there.. It's every where

2019-05-13 Thread Ken Krugler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838959#comment-16838959
 ] 

Ken Krugler commented on FLINK-12504:
-

Not sure why you say "It's not print". Your stacktrace ends with 
{{PrintSinkFunction}} calling {{PrintSinkOutputWriter}} and that's a sink that 
(from the JavaDocs) is an "Implementation of the SinkFunction writing every 
tuple to the standard output or standard error stream". Which you definitely 
don't want to do in a production workflow, that's only something you'd do for 
testing small workflows running locally.

In any case, I'd suggest closing this issue and re-posting to the Flink user 
list, thanks.

> NullPoint here NullPointException there.. It's every where
> --
>
> Key: FLINK-12504
> URL: https://issues.apache.org/jira/browse/FLINK-12504
> Project: Flink
>  Issue Type: Bug
>Reporter: Chethan UK
>Priority: Major
>
> I was trying to push data from Kafka to Cassandra, after around 220K 
> sometimes, 300K points are pushed into C*, this 
> java.lang.NullPointerException throws in..
> ```
> java.lang.NullPointerException
>   at 
> org.apache.flink.api.common.functions.util.PrintSinkOutputWriter.write(PrintSinkOutputWriter.java:73)
>   at 
> org.apache.flink.streaming.api.functions.sink.PrintSinkFunction.invoke(PrintSinkFunction.java:81)
>   at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> ```
> How can normal Flink Users understand these error? The Job's keep failing and 
> it's very unstable to be considered in production... 
>  
> In RoadMap, is there plans to make Kotlin supported language as well?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 commented on issue #8392: [FLINK-12370][python][travis] Integrated Travis for Python Table API.

2019-05-13 Thread GitBox
sunjincheng121 commented on issue #8392: [FLINK-12370][python][travis] 
Integrated Travis for Python Table API.
URL: https://github.com/apache/flink/pull/8392#issuecomment-492027462
 
 
   We have two options for Python integration with Travis. 
   
   1. It is the addition of a python stage that PR currently reflects. Instead 
of using MVN for testing, it runs a separate shell.
   2. Using MVN to execute the shell, Python validation is also MVN is tested. 
I personally prefer the current PR program. The reason is that:
 - The Python API is a different language from Java/Scala. The code 
management and testing methods are not dependent on MVN. Python multi-version 
and Python API functional tests are combined using Conda and Tox. So we choose 
not to use the MVN solution;
   
- The Python API is a new language API that has just been incorporated 
into Flink. We would like to have an intuitive understanding of Python's 
details in the Travis process and intuitively observe the time spent testing 
the Python API. So choose a separate stage and a separate run output solution.
   
   Of course, there are 2 aspects, 1. Independent stage 2. Is it integrated 
into MVN, that is, 2x2=4 options, I tend to the current PR.
   
   What do you think? @aljoscha @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8434: [FLINK-12234][hive] Support view related operations in HiveCatalog

2019-05-13 Thread GitBox
flinkbot commented on issue #8434: [FLINK-12234][hive] Support view related 
operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8434#issuecomment-492026774
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8392: [FLINK-12370][python][travis] Integrated Travis for Python Table API.

2019-05-13 Thread GitBox
sunjincheng121 commented on issue #8392: [FLINK-12370][python][travis] 
Integrated Travis for Python Table API.
URL: https://github.com/apache/flink/pull/8392#issuecomment-492026843
 
 
   I find one thing we need double check, is the cost of the testing,  maybe 
there is a bug need fix(I am not sure, I will check it):
   
![image](https://user-images.githubusercontent.com/22488084/57661099-0bc2fa00-761c-11e9-8a11-cdd7e49c3fe6.png)
   @WeiZhong94 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12234) Support view related operations in HiveCatalog

2019-05-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12234:
---
Labels: pull-request-available  (was: )

> Support view related operations in HiveCatalog
> --
>
> Key: FLINK-12234
> URL: https://issues.apache.org/jira/browse/FLINK-12234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> Support view related operations in HiveCatalog, which implements 
> ReadableWritableCatalog API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   4   >