[jira] [Created] (FLINK-20705) Separate the implementation of BatchExecValues and StreamExecValues

2020-12-21 Thread godfrey he (Jira)
godfrey he created FLINK-20705:
--

 Summary: Separate the implementation of BatchExecValues and 
StreamExecValues
 Key: FLINK-20705
 URL: https://issues.apache.org/jira/browse/FLINK-20705
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he
 Fix For: 1.13.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20693) Port BatchExecPythonCorrelate and StreamExecPythonCorrelate to Java

2020-12-20 Thread godfrey he (Jira)
godfrey he created FLINK-20693:
--

 Summary: Port BatchExecPythonCorrelate and 
StreamExecPythonCorrelate to Java
 Key: FLINK-20693
 URL: https://issues.apache.org/jira/browse/FLINK-20693
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he
 Fix For: 1.13.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20690) Separate the implementation of BatchExecCorrelate and StreamExecCorrelate

2020-12-20 Thread godfrey he (Jira)
godfrey he created FLINK-20690:
--

 Summary: Separate the implementation of BatchExecCorrelate and 
StreamExecCorrelate
 Key: FLINK-20690
 URL: https://issues.apache.org/jira/browse/FLINK-20690
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he
 Fix For: 1.13.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20673) ExecNode#getOutputType method should return LogicalType instead of RowType

2020-12-18 Thread godfrey he (Jira)
godfrey he created FLINK-20673:
--

 Summary: ExecNode#getOutputType method should return LogicalType 
instead of RowType
 Key: FLINK-20673
 URL: https://issues.apache.org/jira/browse/FLINK-20673
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he


Currently, {{ExecNode#getOutputType}} always returns {{RowType}}. But some 
nodes do not return {{RowData}}, such as {{BatchExecSink}} and 
{{StreamExecSink}}. The output type of {{ExecNode}} should be consistent with 
the type parameter {{T}}. So {{ExecNode#getOutputType}} should return 
{{LogicalType}} instead of {{RowType}}, each subclass cast the {{LogicalType}} 
to specific type when using it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20668) Introduce translateToExecNode method for FlinkPhysicalRel

2020-12-17 Thread godfrey he (Jira)
godfrey he created FLINK-20668:
--

 Summary: Introduce translateToExecNode method for FlinkPhysicalRel
 Key: FLINK-20668
 URL: https://issues.apache.org/jira/browse/FLINK-20668
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he


Currently, we introduce ExecGraphGenerator to translate a graph of 
FlinkPhysicalRel to a ExecNode graph. While as more and more ExecNode 
supported, ExecGraphGenerator is hard to maintain. So we introduce 
FlinkPhysicalRel#translateToExecNode to create specific ExecNode in each 
physical RelNode, and ExecGraphGenerator is used to build the whole graph based 
on the translated ExecNode, including connecting the input/output nodes, 
handling the rel visitor, handle some special nodes (such as, Exchange), etc.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20623) Separate the implementation of StreamExecWatermarkAssigner

2020-12-16 Thread godfrey he (Jira)
godfrey he created FLINK-20623:
--

 Summary: Separate the implementation of StreamExecWatermarkAssigner
 Key: FLINK-20623
 URL: https://issues.apache.org/jira/browse/FLINK-20623
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20622) Separate the implementation of StreamExecChangelogNormalize

2020-12-16 Thread godfrey he (Jira)
godfrey he created FLINK-20622:
--

 Summary: Separate the implementation of 
StreamExecChangelogNormalize
 Key: FLINK-20622
 URL: https://issues.apache.org/jira/browse/FLINK-20622
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20620) Port StreamExecPythonCalc to java

2020-12-16 Thread godfrey he (Jira)
godfrey he created FLINK-20620:
--

 Summary: Port StreamExecPythonCalc to java
 Key: FLINK-20620
 URL: https://issues.apache.org/jira/browse/FLINK-20620
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20610) Separate the implementation of BatchExecCalc and StreamExecCalc

2020-12-15 Thread godfrey he (Jira)
godfrey he created FLINK-20610:
--

 Summary: Separate the implementation of BatchExecCalc and 
StreamExecCalc
 Key: FLINK-20610
 URL: https://issues.apache.org/jira/browse/FLINK-20610
 Project: Flink
  Issue Type: Sub-task
Reporter: godfrey he






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20609) Separate the implementation of BatchExecDataStreamScan and StreamExecDataStreamScan

2020-12-15 Thread godfrey he (Jira)
godfrey he created FLINK-20609:
--

 Summary: Separate the implementation of BatchExecDataStreamScan 
and StreamExecDataStreamScan
 Key: FLINK-20609
 URL: https://issues.apache.org/jira/browse/FLINK-20609
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20608) Separate the implementation of BatchExecLegacyTableSourceScan and StreamExecLegacyTableSourceScan

2020-12-15 Thread godfrey he (Jira)
godfrey he created FLINK-20608:
--

 Summary: Separate the implementation of 
BatchExecLegacyTableSourceScan and StreamExecLegacyTableSourceScan
 Key: FLINK-20608
 URL: https://issues.apache.org/jira/browse/FLINK-20608
 Project: Flink
  Issue Type: Sub-task
Reporter: godfrey he






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20516) Separate the implementation of BatchExecTableSourceScan and StreamExecTableSourceScan

2020-12-07 Thread godfrey he (Jira)
godfrey he created FLINK-20516:
--

 Summary: Separate the implementation of BatchExecTableSourceScan 
and StreamExecTableSourceScan
 Key: FLINK-20516
 URL: https://issues.apache.org/jira/browse/FLINK-20516
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20515) Separate the implementation of BatchExecMultipleInput and StreamExecMultipleInput

2020-12-07 Thread godfrey he (Jira)
godfrey he created FLINK-20515:
--

 Summary: Separate the implementation of BatchExecMultipleInput and 
StreamExecMultipleInput
 Key: FLINK-20515
 URL: https://issues.apache.org/jira/browse/FLINK-20515
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20513) Separate the implementation of BatchExecExchange and StreamExecExchange

2020-12-07 Thread godfrey he (Jira)
godfrey he created FLINK-20513:
--

 Summary: Separate the implementation of BatchExecExchange and 
StreamExecExchange
 Key: FLINK-20513
 URL: https://issues.apache.org/jira/browse/FLINK-20513
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he


The issue will separate the implementation of Batch(/Stream)ExecExchange, we 
will introduce Batch(/Stream)PhysicalExchange which will extend 
{{FlinkPhysicalRel}}, {{BatchExecExchange }} will be moved into 
`nodes.exec.batch` package and will implement ExecNode for Exchange, 
{{StreamExecExchange}} will be moved into `nodes.exec.stream` package. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20512) Introduce getDescription, getOutputType, replaceInputEdge methods for ExecNode

2020-12-07 Thread godfrey he (Jira)
godfrey he created FLINK-20512:
--

 Summary: Introduce getDescription, getOutputType, replaceInputEdge 
methods for ExecNode
 Key: FLINK-20512
 URL: https://issues.apache.org/jira/browse/FLINK-20512
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he


We will introduce {{getDescription()}} method which returns the description 
information of the ExecNode,
introduce {{getOutputType()}} method which returns the output {{RowType}} of 
the ExecNode,
introduce {{replaceInputEdge(int ordinalInParent, ExecEdge newInputEdge)}} 
method which can update the input edge at the given index.

Note: {{RelNode}} also has {{getDescription()}} method, so before we finish all 
separation work, we will introduce {{getDesc()}} instead of 
{{getDescription()}} to avoid compile error.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20511) Deprecate the classes in scala `nodes.exec` package

2020-12-07 Thread godfrey he (Jira)
godfrey he created FLINK-20511:
--

 Summary: Deprecate the classes in scala `nodes.exec` package
 Key: FLINK-20511
 URL: https://issues.apache.org/jira/browse/FLINK-20511
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he


As we will separate the implementation of {{RelNode}} and {{ExecNode}}, the new 
classes introduce for {{ExecNode}} implementation will be Java. So the classes 
in `nodes.exec` package will be deprecated, we will rename {{ExecNodeBase}} to 
{{LegaceExecNodeBase}}, rename {{BatchExecNode}} to {{LegacyBatchExecNode}}, 
and rename {{StreamExecNode}} to {{LegacyStreamExecNode}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20509) Refactor verifyPlan method in TableTestBase

2020-12-07 Thread godfrey he (Jira)
godfrey he created FLINK-20509:
--

 Summary: Refactor verifyPlan method in TableTestBase
 Key: FLINK-20509
 URL: https://issues.apache.org/jira/browse/FLINK-20509
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he


 Currently, we use {{verifyPlan}} method to verify the plan result for both 
{{RelNode}} plan and {{ExecNode}} plan, because their instances are the same. 
But once the implementation of {{RelNode}} and {{ExecNode}} are separated we 
can't get {{ESTIMATED_COST}} and {{CHANGELOG_MODE}} on {{ExecNode}} plan. So in 
order to make the methods more clear, the {{verifyPlan}} method can be 
separated into two methods, {{verifyRelPlan}} for verifying the {{RelNode}} 
plan, and {{verifyExecPlan}} for verifying the {{ExecNode}} plan. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20490) Don't share inputs between FlinkPhysicalRel and ExecNode

2020-12-04 Thread godfrey he (Jira)
godfrey he created FLINK-20490:
--

 Summary: Don't share inputs between FlinkPhysicalRel and ExecNode
 Key: FLINK-20490
 URL: https://issues.apache.org/jira/browse/FLINK-20490
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he


Currently, all execution nodes extend both from {{RelNode}} and {{ExecNode}}, 
and they share same input instances which is stored in the {{RelNode}} 
instance. {{ExecNode#getInputNodes}} just returns the casted {{RelNode}} 
inputs, code likes:

{code:java}
getInputs.map(_.asInstanceOf[ExecNode[_]])
{code}

This issue aim to let {{ExecNode}} have its own input instance. It is prepared 
for the implementation separation between {{RelNode}} and {{ExecNode}}.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20478) Show "Optimized Execution Plan" instead of "Physical Execution Plan" in explain result

2020-12-03 Thread godfrey he (Jira)
godfrey he created FLINK-20478:
--

 Summary: Show "Optimized Execution Plan" instead of "Physical 
Execution Plan" in explain result
 Key: FLINK-20478
 URL: https://issues.apache.org/jira/browse/FLINK-20478
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he


Currently, the explain result includes "Abstract Syntax Tree", "Optimized 
Logical Plan" and "Physical Execution Plan". While the "Optimized Logical Plan" 
is an {{ExecNode}} graph, and the 
"[ExplainDetail|https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java];
 represents the expected explain details, including {{ESTIMATED_COST}} and 
{{CHANGELOG_MODE}} now. Those types can only used for Calicte {{RelNode}}s 
instead of {{ExecNode}}. So I suggest to make the following adjustments:
1. Keep "Abstract Syntax Tree" as it, which represents the original 
(un-optimized) {{RelNode}} graph converted from {{SqlNode}}.
2. Rename "Optimized Logical Plan" to "Optimized Physical Plan", which 
represents the optimized physical {{RelNode}} graph composed of 
{{FlinkPhysicalRel}}. {{ESTIMATED_COST}} and {{CHANGELOG_MODE}} describe the 
expected explain details for "Optimized Physical Plan".
3.Replace "Physical Execution Plan" with "Optimized Execution Plan", which 
represents the optimized {{ExecNode}} graph. Currently, many optimizations are 
based on {{ExecNode}} graph, such as sub-plan reuse, multiple input rewrite. We 
may introduce more optimizations in the future. So there are more and more 
difference between "Optimized Physical Plan" and "Optimized Execution Plan". We 
do not want to show tow execution plans, and "Physical Execution Plan" for 
{{StreamGraph}} is less important than "Optimized Execution Plan". If we want 
to introduce "Physical Execution Plan" in the future, we can add a type named 
"PHYSICAL_EXECUTION_PLAN" in {{ExplainDetail}} to support it. There is already 
an issue to do the similar things, 
[FLINK-19687|https://issues.apache.org/jira/browse/FLINK-19687]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20437) Port ExecNode to Java

2020-12-01 Thread godfrey he (Jira)
godfrey he created FLINK-20437:
--

 Summary: Port ExecNode to Java
 Key: FLINK-20437
 URL: https://issues.apache.org/jira/browse/FLINK-20437
 Project: Flink
  Issue Type: Sub-task
Reporter: godfrey he






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20436) Simplify type parameter of ExecNod

2020-12-01 Thread godfrey he (Jira)
godfrey he created FLINK-20436:
--

 Summary: Simplify type parameter of ExecNod
 Key: FLINK-20436
 URL: https://issues.apache.org/jira/browse/FLINK-20436
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he
Assignee: godfrey he






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20435) Refactor ExecNode

2020-11-30 Thread godfrey he (Jira)
godfrey he created FLINK-20435:
--

 Summary: Refactor ExecNode
 Key: FLINK-20435
 URL: https://issues.apache.org/jira/browse/FLINK-20435
 Project: Flink
  Issue Type: Improvement
Reporter: godfrey he


Currently, there are many improvements about ExecNode:
1. simplify type parameter of {{ExecNode}}. Currently, 
{{ExecNode#translateToPlan}} takes {{BatchPlanner}} or {{StreamPlanner}} as a 
parameter, so {{ExecNode}} has a type parameter {{E <: Planner}}, which 
indicates the node is a batch node or a streaming node. While in the future, a 
plan may contain both batch nodes and stream node. The type parameter can be 
removed, the implementation base class can cast the planner to expected planner.
2. port ExecNode to Java
3. separate the implementation of {{RelNode}} and {{ExecNode}}. Currently, an 
execution node extends both from {{RelNode}} and {{ExecNode}}. After a physical 
node is converted to an exec node, many parameters are unnecessary, such as: 
RelOptCluster, RelTraitSet, etc. With more optimizations on {{ExecNode}}, We 
need {{ExecNode}} to be cleaner and simpler. So we will separate the 
implementation of {{RelNode}} and {{ExecNode}}.

This is an umbrella issue, we will create more related sub-tasks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20322) Rename table.optimizer.union-all-as-breakpoint-disabled to table.optimizer.union-all-as-breakpoint.enabled

2020-11-24 Thread godfrey he (Jira)
godfrey he created FLINK-20322:
--

 Summary: Rename table.optimizer.union-all-as-breakpoint-disabled 
to table.optimizer.union-all-as-breakpoint.enabled
 Key: FLINK-20322
 URL: https://issues.apache.org/jira/browse/FLINK-20322
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.12.0
Reporter: godfrey he
 Fix For: 1.12.0


{{table.optimizer.union-all-as-breakpoint-disabled}} is defined in 
{{RelNodeBlockPlanBuilder}}  and is an internal experimental config. While 
{{disabled}} and {{false}} as default value is very obscure.  I suggest to 
change {{table.optimizer.union-all-as-breakpoint-disabled}} to 
{{table.optimizer.union-all-as-breakpoint-enabled}} and use {{true}} as default 
value, which is easier to understand. As this config is an internal 
experimental config, we don't mark it as deprecated.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20293) Remove the useless constructor of ContinuousFileMonitoringFunction

2020-11-23 Thread godfrey he (Jira)
godfrey he created FLINK-20293:
--

 Summary: Remove the useless constructor of 
ContinuousFileMonitoringFunction
 Key: FLINK-20293
 URL: https://issues.apache.org/jira/browse/FLINK-20293
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Affects Versions: 1.12.0
Reporter: godfrey he
 Fix For: 1.12.0


In 1.11, we introduce {{ContinuousFileMonitoringFunction}} with given 
{{globalModificationTime}} to support {{HiveTableSource}} continuously reading 
non-partitioned file. While this feature has a bug, see 
[FLINK-20277|https://issues.apache.org/jira/browse/FLINK-20277]. 
In master, after 
[FLINK-19888|https://issues.apache.org/jira/browse/FLINK-19888] finished, 
{{HiveTableSource}} does not depend on {{ContinuousFileMonitoringFunction}} any 
more, we can revert the changes of 
[FLINK-17435|https://issues.apache.org/jira/browse/FLINK-17435] about the 
ContinuousFileMonitoringFunction part to avoid such bug.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20255) Nested decorrelate failed

2020-11-19 Thread godfrey he (Jira)
godfrey he created FLINK-20255:
--

 Summary: Nested decorrelate failed
 Key: FLINK-20255
 URL: https://issues.apache.org/jira/browse/FLINK-20255
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.11.0, 1.12.0
Reporter: godfrey he


This issue is from ML 
https://www.mail-archive.com/user@flink.apache.org/msg37746.html

We can reproduce the issue through the following code

{code:java}
@FunctionHint(output = new DataTypeHint("ROW"))
class SplitStringToRows extends TableFunction[Row] {
  def eval(str: String, separator: String = ";"): Unit = {
if (str != null) {
  str.split(separator).foreach(s => collect(Row.of(s.trim(
}
  }
}

object Job {

  def main(args: Array[String]): Unit = {
val settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)
streamTableEnv.createTemporarySystemFunction(
  "SplitStringToRows",
  classOf[SplitStringToRows]
) // Class defined in previous email

streamTableEnv.executeSql(
  """
  CREATE TABLE table2 (
attr1 STRING,
attr2 STRING,
attr3 DECIMAL,
attr4 DATE
  ) WITH (
   'connector' = 'datagen'
   )""")

val q2 = streamTableEnv.sqlQuery(
  """
SELECT
  a.attr1 AS attr1,
  attr2,
  attr3,
  attr4
FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1, ';')) AS 
a(attr1)
""")
streamTableEnv.createTemporaryView("view2", q2)

val q3 =
  """
SELECT
  w.attr1,
  p.attr3
FROM table2 w
LEFT JOIN LATERAL (
  SELECT
attr1,
attr3
  FROM (
SELECT
  attr1,
  attr3,
  ROW_NUMBER() OVER (
PARTITION BY attr1
ORDER BY
  attr4 DESC NULLS LAST,
  w.attr2 = attr2 DESC NULLS LAST
  ) AS row_num
  FROM view2)
  WHERE row_num = 1) p
ON (w.attr1 = p.attr1)
"""
println(streamTableEnv.explainSql(q3))
  }
}
{code}

The reason is {{RelDecorrelator}} in Calcite can't handle such nested 
decorrelate pattern now




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20070) NPE in SourceCoordinatorProviderTest.testCheckpointAndReset

2020-11-09 Thread godfrey he (Jira)
godfrey he created FLINK-20070:
--

 Summary: NPE in 
SourceCoordinatorProviderTest.testCheckpointAndReset
 Key: FLINK-20070
 URL: https://issues.apache.org/jira/browse/FLINK-20070
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common
Reporter: godfrey he
 Fix For: 1.12.0


https://dev.azure.com/godfreyhe/c147b7ad-1708-46c3-9021-cc523e50c4d5/_apis/build/builds/71/logs/114


{code:java}
2020-11-10T03:41:10.8231846Z [INFO] Running 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContextTest
2020-11-10T03:41:11.2510061Z [ERROR] Tests run: 2, Failures: 0, Errors: 1, 
Skipped: 0, Time elapsed: 1.171 s <<< FAILURE! - in 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorProviderTest
2020-11-10T03:41:11.2511837Z [ERROR] 
testCheckpointAndReset(org.apache.flink.runtime.source.coordinator.SourceCoordinatorProviderTest)
  Time elapsed: 1.055 s  <<< ERROR!
2020-11-10T03:41:11.2512610Z java.lang.NullPointerException
2020-11-10T03:41:11.2513268Zat 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorProviderTest.testCheckpointAndReset(SourceCoordinatorProviderTest.java:94)
2020-11-10T03:41:11.2513967Zat 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2020-11-10T03:41:11.2514553Zat 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2020-11-10T03:41:11.2515230Zat 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2020-11-10T03:41:11.2515827Zat 
java.lang.reflect.Method.invoke(Method.java:498)
2020-11-10T03:41:11.2516428Zat 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
2020-11-10T03:41:11.2517107Zat 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
2020-11-10T03:41:11.2517757Zat 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
2020-11-10T03:41:11.2518431Zat 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
2020-11-10T03:41:11.2519082Zat 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
2020-11-10T03:41:11.2519677Zat 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
2020-11-10T03:41:11.2520292Zat 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
2020-11-10T03:41:11.2521100Zat 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
2020-11-10T03:41:11.2521831Zat 
org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
2020-11-10T03:41:11.2522420Zat 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
2020-11-10T03:41:11.2522988Zat 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
2020-11-10T03:41:11.2523582Zat 
org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
2020-11-10T03:41:11.2524165Zat 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
2020-11-10T03:41:11.2524951Zat 
org.junit.runners.ParentRunner.run(ParentRunner.java:363)
2020-11-10T03:41:11.2525570Zat 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
2020-11-10T03:41:11.2526288Zat 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
2020-11-10T03:41:11.2526969Zat 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
2020-11-10T03:41:11.2527742Zat 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
2020-11-10T03:41:11.2528467Zat 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
2020-11-10T03:41:11.2529169Zat 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
2020-11-10T03:41:11.2529844Zat 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
2020-11-10T03:41:11.2530480Zat 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19925) Errors$NativeIoException: readAddress(..) failed: Connection reset by peer

2020-11-02 Thread godfrey he (Jira)
godfrey he created FLINK-19925:
--

 Summary: Errors$NativeIoException: readAddress(..) failed: 
Connection reset by peer
 Key: FLINK-19925
 URL: https://issues.apache.org/jira/browse/FLINK-19925
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: godfrey he


Errors$NativeIoException will occur sometime when we run TPCDS based on master, 
the full exception stack is 


{code:java}
Caused by: 
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 
readAddress(..) failed: Connection reset by peer (connection to 'xxx')
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1377)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:907)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:728)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:818)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at java.lang.Thread.run(Thread.java:834) ~[?:1.8.0_102]
Caused by: 
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: 
readAddress(..) failed: Connection reset by peer
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19822) remove redundant shuffle for stream

2020-10-27 Thread godfrey he (Jira)
godfrey he created FLINK-19822:
--

 Summary: remove redundant shuffle for stream
 Key: FLINK-19822
 URL: https://issues.apache.org/jira/browse/FLINK-19822
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he
Assignee: godfrey he
 Fix For: 1.12.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19737) Introduce TableOperatorWrapperGenerator to translate transformation DAG in a multiple-input node to TableOperatorWrapper DAG

2020-10-20 Thread godfrey he (Jira)
godfrey he created FLINK-19737:
--

 Summary: Introduce TableOperatorWrapperGenerator to translate 
transformation DAG in a multiple-input node to  TableOperatorWrapper DAG
 Key: FLINK-19737
 URL: https://issues.apache.org/jira/browse/FLINK-19737
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he
Assignee: godfrey he
 Fix For: 1.12.0


{{Transformation}} is not serializable, while {{StreamOperatorFactory}} is. We 
need to introduce another class (named {{TableOperatorWrapper}}) to store the 
information of a {{Transformation}}, and introduce a utility class (named 
{{TableOperatorWrapper}}) to convert the {{Transformation}} DAG to 
{{TableOperatorWrapper}} DAG.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18853) Supports properties from flink-conf.yaml for SET command in sql client

2020-08-07 Thread godfrey he (Jira)
godfrey he created FLINK-18853:
--

 Summary: Supports properties from flink-conf.yaml for SET command 
in sql client
 Key: FLINK-18853
 URL: https://issues.apache.org/jira/browse/FLINK-18853
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Client
Reporter: godfrey he


Many specific properties for sql client can be replace with config options from 
flink-conf.yaml. such as: {{execution.parallelism}} can be replaced with 
{{parallelism.default}}.
Sql client does not support many properties from flink-conf.yaml for SET 
command, such as {{state.backend}}.
As discussed in 
[FLINK-18161|https://issues.apache.org/jira/browse/FLINK-18161], we can 
deprecate sql client specific properties and support all properties from 
flink-conf.yaml. If there is a conflict between deprecate property and new 
property, just throws exception.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18791) Simplify the logic of SqlCommandParser and CliClient

2020-07-31 Thread godfrey he (Jira)
godfrey he created FLINK-18791:
--

 Summary: Simplify the logic of SqlCommandParser and CliClient
 Key: FLINK-18791
 URL: https://issues.apache.org/jira/browse/FLINK-18791
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Client
Reporter: godfrey he
 Fix For: 1.12.0


Currently, {{SqlCommandParser}} parse a statement to a corresponding 
{{SqlCommandCall}}, and {{CliClient}} will do different action based on 
different {{SqlCommandCall}}. However, if a new kind of statement is supported 
(such as [SHOW CURRENT DDL|https://issues.apache.org/jira/browse/FLINK-18616], 
[SHOW CREATE TABLE|https://issues.apache.org/jira/browse/FLINK-16384]), we must 
implement them in planner and at the same time we need to add a new kind of 
{{SqlCommand}}, add new method in {{CliClient}} to execute the command  in sql 
client. Moreover the implementation may be forgotten in sql client (such as 
[FLINK-18059|https://issues.apache.org/jira/browse/FLINK-18059]).
To improve this, I propose {{SqlCommandParser}} returns {{Operation}}, and 
{{CliClient}} executes {{Operation}} directly. Meanwhile we can unify the print 
style of different kind of Operations. After this is finished, we need not to 
change the sql client if a new kind of statement is supported, only if 
customized print style is needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18789) Use TableEnvironment#executeSql to execute insert statement in sql client

2020-07-31 Thread godfrey he (Jira)
godfrey he created FLINK-18789:
--

 Summary: Use TableEnvironment#executeSql to execute insert 
statement in sql client
 Key: FLINK-18789
 URL: https://issues.apache.org/jira/browse/FLINK-18789
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Client
Reporter: godfrey he
 Fix For: 1.12.0


Currently, sql client has a lot of logic to execute an insert job, which can be 
simplified through executeSql method.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18788) Refactor sql-client based the proposed interfaces of FLIP-84

2020-07-31 Thread godfrey he (Jira)
godfrey he created FLINK-18788:
--

 Summary: Refactor sql-client based the proposed interfaces of 
FLIP-84
 Key: FLINK-18788
 URL: https://issues.apache.org/jira/browse/FLINK-18788
 Project: Flink
  Issue Type: Task
  Components: Table SQL / Client
Reporter: godfrey he
 Fix For: 1.12.0


FLIP-84 introduces many useful methods, such as 
{{TableEnvironment#executeSql}}, {{TableResult#collect}}. Sql client can uses 
those methods to simplify many implementations, such as: many methods in 
{{Executor}} interface can be replaced with executeSql method, the select 
implementation can be replaced with collect method. 
This issue aims to the related refactor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18731) The monotonicity of UNIX_TIMESTAMP function is not correct

2020-07-27 Thread godfrey he (Jira)
godfrey he created FLINK-18731:
--

 Summary: The monotonicity of UNIX_TIMESTAMP function is not correct
 Key: FLINK-18731
 URL: https://issues.apache.org/jira/browse/FLINK-18731
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: godfrey he
 Fix For: 1.12.0


Currently, the monotonicity of {{UNIX_TIMESTAMP}} function is always 
{{INCREASING}}, actually, when it has empty function arguments 
({{UNIX_TIMESTAMP()}}, is equivalent to {{NOW}}), its monotonicity is 
INCREASING. otherwise its monotonicity should be NOT_MONOTONIC. (e.g. 
UNIX_TIMESTAMP(string))



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18717) reuse MiniCluster in table integration test class ?

2020-07-26 Thread godfrey he (Jira)
godfrey he created FLINK-18717:
--

 Summary: reuse MiniCluster in table integration test class ? 
 Key: FLINK-18717
 URL: https://issues.apache.org/jira/browse/FLINK-18717
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Legacy Planner, Table SQL / Planner
Affects Versions: 1.11.0
Reporter: godfrey he


before 1.11, {{MiniCluster}} can be reused in each integration test class. (see 
TestStreamEnvironment#setAsContext) 
In 1.11, after we correct the execution behavior of TableEnvironment, 
StreamTableEnvironment and BatchTableEnvironment (see 
[FLINK-16363|https://issues.apache.org/jira/browse/FLINK-16363], 
[FLINK-17126|https://issues.apache.org/jira/browse/FLINK-17126]), MiniCluster 
will be created for each test case even in same test class (see 
{{org.apache.flink.client.deployment.executors.LocalExecutor}}). It's better we 
can reuse {{MiniCluster}} like before. One approach is we provide a new kind of 
 MiniCluster factory (such as: SessionMiniClusterFactory) instead of using  
{{PerJobMiniClusterFactory}}. WDYT ?
  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18651) implicitly cast the time attribute to regular TIMESTAMP type in regular join

2020-07-20 Thread godfrey he (Jira)
godfrey he created FLINK-18651:
--

 Summary: implicitly cast the time attribute to regular TIMESTAMP 
type in regular join
 Key: FLINK-18651
 URL: https://issues.apache.org/jira/browse/FLINK-18651
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: godfrey he


Currently, regular join does not accept rowtime attribute field as input, and 
requires users manually cast the time attribute as regular timestamp. Because 
time attribute will be out-of-order after regular join, and then we can't do 
window aggregate based on the time attribute. 

We can improve it that the planner can implicitly cast the time attribute to 
regular TIMESTAMP type, and throws exception there is an operator (after join) 
depended on time attribute, like window aggregate.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18621) Simplify the methods of Executor interface in sql client

2020-07-17 Thread godfrey he (Jira)
godfrey he created FLINK-18621:
--

 Summary: Simplify the methods of Executor interface in sql client
 Key: FLINK-18621
 URL: https://issues.apache.org/jira/browse/FLINK-18621
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Client
Reporter: godfrey he
 Fix For: 1.12.0


After {{TableEnvironment#executeSql}} is introduced, many methods in 
{{Executor}} interface can be replaced with {{TableEnvironment#executeSql}}. 
Those methods include:
listCatalogs, listDatabases, createTable, dropTable, listTables, listFunctions, 
useCatalog, useDatabase, getTableSchema (use `DESCRIBE xx`)




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18550) use TableResult#collect to get select result for sql client

2020-07-09 Thread godfrey he (Jira)
godfrey he created FLINK-18550:
--

 Summary: use TableResult#collect to get select result for sql 
client
 Key: FLINK-18550
 URL: https://issues.apache.org/jira/browse/FLINK-18550
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Client
Reporter: godfrey he
 Fix For: 1.12.0


Currently, sql client has a lot of logic to handle the select result in sql 
client, which can be simplified through {{TableResult#collect}} method.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18399) TableResult#print can not print the result of unbounded stream query

2020-06-20 Thread godfrey he (Jira)
godfrey he created FLINK-18399:
--

 Summary: TableResult#print can not print the result of unbounded 
stream query
 Key: FLINK-18399
 URL: https://issues.apache.org/jira/browse/FLINK-18399
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.11.0
Reporter: godfrey he
 Fix For: 1.11.0


In current implementation of PrintUtils, all result will be collected to local 
memory to compute column width first. this can works fine with batch query and 
bounded stream query. but for unbounded stream query, the result will be 
endless, so the result will be never printed. To solve this, we can use 
fix-length strategy, and print a row immediately once the row is accessed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18337) Introduce TableResult#await method to wait data ready

2020-06-16 Thread godfrey he (Jira)
godfrey he created FLINK-18337:
--

 Summary: Introduce TableResult#await method to wait data ready
 Key: FLINK-18337
 URL: https://issues.apache.org/jira/browse/FLINK-18337
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: godfrey he


Currently, {{TableEnvironment.executeSql()}}  method for INSERT statement 
returns TableResult once the job is submitted. Users must use 
{{tableResult.getJobClient.get()
  .getJobExecutionResult(Thread.currentThread().getContextClassLoader)
  .get()}} to wait the job finish. This API looks very ugly.
So this issue aims to introduce {{TableResult#await}} method, the code snippet 
looks like:

{code:java}
val tEnv = ...
// submit the job and wait job finish
tEnv.executeSql("insert into ...").await()
{code}

the suggested new methods are:

{code:java}
/**
 * Wait until the data is ready.
 *
 * For select operation, this method will wait unit the first row 
can be accessed in local.
 * For insert operation, this method will wait for the job to finish, 
because the result contains only one row.
 * For other operations, this method will return immediately, because 
the result is ready in local.
 *
 * @throws ExecutionException if this future completed exceptionally
 * @throws InterruptedException if the current thread was interrupted 
while waiting
 */
void await() throws InterruptedException, ExecutionException;

/**
 * Wait until the data is ready.
 *
 * For select operation, this method will wait unit the first row 
can be accessed in local.
 * For insert operation, this method will wait for the job to finish, 
because the result contains only one row.
 * For other operations, this method will return immediately, because 
the result is ready in local.
 *
 * @param timeout the maximum time to wait
 * @param unit the time unit of the timeout argument
 * @throws ExecutionException if this future completed exceptionally
 * @throws InterruptedException if the current thread was interrupted 
while waiting
 * @throws TimeoutException if the wait timed out
 */
void await(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException;

{code}





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18180) unify the logic of time attribute derivation for both batch and streaming

2020-06-08 Thread godfrey he (Jira)
godfrey he created FLINK-18180:
--

 Summary: unify the logic of time attribute derivation for both 
batch and streaming
 Key: FLINK-18180
 URL: https://issues.apache.org/jira/browse/FLINK-18180
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: godfrey he


Currently, the logic of time attribute derivation is different for batch and 
streaming. For batch table source, the rowtime type will not be generated or 
will be erased as regular time type if the source table has rowtime type. To 
handle this difference, we have to distinguish batch or streaming via 
{{isStreamingMode}} flag in many places, such as: {{DatabaseCalciteSchema}}, 
{{CatalogSchemaTable}}, {{CatalogTableSchemaResolver}}, etc. In fact, batch 
queries may also need rowtime type, such as supporting rowtime temporal join. 
So we can unify the logic of time attribute derivation from the source side, 
and erase the rowtime type if need in optimization phase. And then it's easier 
to push the unified {{TableEnvironment}} and planner forward.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18061) TableResult#collect should return closeable iterator to avoid resource leak

2020-06-02 Thread godfrey he (Jira)
godfrey he created FLINK-18061:
--

 Summary: TableResult#collect should return closeable iterator to 
avoid resource leak
 Key: FLINK-18061
 URL: https://issues.apache.org/jira/browse/FLINK-18061
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: godfrey he
 Fix For: 1.11.0


as discussed in ML: 
http://mail-archives.apache.org/mod_mbox/flink-dev/202005.mbox/%3CCADQYLGuk%2BnnrPv-PR6Gi7D_rZqp_DhjfA%3DVtkRB-aGPxYxOQPw%40mail.gmail.com%3E,
 we should return a closeable iterator for TableResult#collect method *to avoid 
resource leak*. The suggested change is:

public interface TableResult {

  CloseableRowIterator collect();

}

public interface CloseableRowIterator extends Iterator, AutoCloseable {

} 

This change does not break current api.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18059) Can not execute create/drop catalog statement in sql client

2020-06-02 Thread godfrey he (Jira)
godfrey he created FLINK-18059:
--

 Summary: Can not execute create/drop catalog statement in sql 
client
 Key: FLINK-18059
 URL: https://issues.apache.org/jira/browse/FLINK-18059
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.10.1
Reporter: godfrey he


when executing create catalog statement (e.g. {{create CATALOG c1 
with('type'='generic_in_memory'}}) in sql client, the following exception will 
occur:

Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unsupported command: CREATE CATALOG
at 
org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:355)
at java.util.Optional.ifPresent(Optional.java:159)
at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:213)
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)

Similar case for {{drop catalog}}.

The reason is CliClient class does not handle CREATE_CATALOG command and 
DROP_CATALOG command.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18055) Catalog does not exist in SQL Client

2020-06-01 Thread godfrey he (Jira)
godfrey he created FLINK-18055:
--

 Summary: Catalog does not exist in SQL Client
 Key: FLINK-18055
 URL: https://issues.apache.org/jira/browse/FLINK-18055
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.11.0
Reporter: godfrey he


Flink SQL> show catalogs;
default_catalog
hive

Flink SQL> use  catalog hive;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.catalog.exceptions.CatalogException: A catalog with name 
[`hive`] does not exist.


The reason is {{SqlCommandParser}} adds {{``}} for catalog name, which is 
unnecessary. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17996) NEP in CatalogTableStatisticsConverter.convertToColumnStats method

2020-05-28 Thread godfrey he (Jira)
godfrey he created FLINK-17996:
--

 Summary: NEP in 
CatalogTableStatisticsConverter.convertToColumnStats method
 Key: FLINK-17996
 URL: https://issues.apache.org/jira/browse/FLINK-17996
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.11.0
Reporter: godfrey he


Currently, hive catalog only supports a few kinds of statistics, otherwise 
return null. (see HiveStatsUtil#createTableColumnStats). If there is a decimal 
statistics, NEP will occur in 
CatalogTableStatisticsConverter.convertToColumnStats method

Caused  by:  java.lang.NullPointerException
at  
org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter.convertToColumnStats(CatalogTableStatisticsConverter.java:77)
at  
org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter.convertToColumnStatsMap(CatalogTableStatisticsConverter.java:68)
at  
org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter.convertToTableStats(CatalogTableStatisticsConverter.java:57)
at  
org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.extractTableStats(DatabaseCalciteSchema.java:113)
at  
org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getStatistic(DatabaseCalciteSchema.java:97)
at  
org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:74)
at  java.util.Optional.map(Optional.java:215)
at  
org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:72)
at  org.apache.calcite.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17774) Support all kinds of changes for select result

2020-05-17 Thread godfrey he (Jira)
godfrey he created FLINK-17774:
--

 Summary: Support all kinds of changes for select result
 Key: FLINK-17774
 URL: https://issues.apache.org/jira/browse/FLINK-17774
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Legacy Planner, Table SQL / Planner
Reporter: godfrey he
 Fix For: 1.11.0


[FLINK-17252|https://issues.apache.org/jira/browse/FLINK-17252] has supported 
select query, however only append change is supported. because 
[FLINK-16998|https://issues.apache.org/jira/browse/FLINK-16998] is not 
finished. This issue aims to support all kinds of changes based on FLINK-16998.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17753) watermark defined in ddl does not work in Table api

2020-05-16 Thread godfrey he (Jira)
godfrey he created FLINK-17753:
--

 Summary: watermark defined in ddl does not work in Table api
 Key: FLINK-17753
 URL: https://issues.apache.org/jira/browse/FLINK-17753
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: godfrey he
 Fix For: 1.11.0


the following code will get {{org.apache.flink.table.api.ValidationException: A 
group window expects a time attribute for grouping in a stream environment.}}

{code:java}
@Test
  def testRowTimeTableSourceGroupWindow(): Unit = {
val ddl =
  s"""
 |CREATE TABLE rowTimeT (
 |  id int,
 |  rowtime timestamp(3),
 |  val bigint,
 |  name varchar(32),
 |  watermark for rowtime as rowtime
 |) WITH (
 |  'connector' = 'projectable-values',
 |  'bounded' = 'false'
 |)
   """.stripMargin
util.tableEnv.executeSql(ddl)

val t = util.tableEnv.from("rowTimeT")
  .where($"val" > 100)
  .window(Tumble over 10.minutes on 'rowtime as 'w)
  .groupBy('name, 'w)
  .select('name, 'w.end, 'val.avg)
util.verifyPlan(t)
  }
{code}

The reason is planner does not convert {{watermarkSpecs}} in {{TableSchema}} to 
correct type when calling {{tableEnv.from}}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17751) proctime defined in ddl can't work with over window in Table api

2020-05-16 Thread godfrey he (Jira)
godfrey he created FLINK-17751:
--

 Summary: proctime defined in ddl can't work with over window in 
Table api
 Key: FLINK-17751
 URL: https://issues.apache.org/jira/browse/FLINK-17751
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: godfrey he
 Fix For: 1.11.0


the following test will get {{org.apache.flink.table.api.ValidationException: 
Ordering must be defined on a time attribute.}}
{code:scala}
  @Test
  def testProcTimeTableSourceOverWindow(): Unit = {
val ddl =
  s"""
 |CREATE TABLE procTimeT (
 |  id int,
 |  val bigint,
 |  name varchar(32),
 |  proctime as PROCTIME()
 |) WITH (
 |  'connector' = 'projectable-values',
 |  'bounded' = 'false'
 |)
   """.stripMargin
util.tableEnv.executeSql(ddl)

val t = util.tableEnv.from("procTimeT")
  .window(Over partitionBy 'id orderBy 'proctime preceding 2.hours as 'w)
  .select('id, 'name, 'val.sum over 'w as 'valSum)
  .filter('valSum > 100)
util.verifyPlan(t)
  }
{code}

The reason is: the type of proctime is {{TIMESTAMP(3) NOT null}}, while 
{{LegacyTypeInfoDataTypeConverter}} does not handle the mapping between 
{{Types.LOCAL_DATE_TIME}} and {{DataTypes.TIMESTAMP(3)}} with not null. 




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17728) use sql parser to parse a statement in sql client

2020-05-15 Thread godfrey he (Jira)
godfrey he created FLINK-17728:
--

 Summary: use sql parser to parse a statement in sql client
 Key: FLINK-17728
 URL: https://issues.apache.org/jira/browse/FLINK-17728
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Client
Reporter: godfrey he
 Fix For: 1.11.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17405) add test cases for cancel job in SQL client

2020-04-27 Thread godfrey he (Jira)
godfrey he created FLINK-17405:
--

 Summary: add test cases for cancel job in SQL client
 Key: FLINK-17405
 URL: https://issues.apache.org/jira/browse/FLINK-17405
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Client
Reporter: godfrey he
 Fix For: 1.11.0, 1.10.2


as discussed in [FLINK-15669| 
https://issues.apache.org/jira/browse/FLINK-15669], we can re-add some tests to 
verify cancel job logic in SQL client.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17399) CsvTableSink should also extend from OverwritableTableSink

2020-04-26 Thread godfrey he (Jira)
godfrey he created FLINK-17399:
--

 Summary: CsvTableSink should also extend from OverwritableTableSink
 Key: FLINK-17399
 URL: https://issues.apache.org/jira/browse/FLINK-17399
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: godfrey he
 Fix For: 1.11.0


{{CsvTableSink}} has supported {{writeMode}} which could be {{OVERWRITE}} or 
{{NO_OVERWRITE}}. When we execute "INSERT OVERWRITE csv_table_sink xx", 
planners will check whether a table sink is an {{OverwritableTableSink}}.
Now {{CsvTableSink}} does not extend from {{OverwritableTableSink}}, so we 
can't execute above statement. 




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17383) flink legacy planner should not use CollectionEnvironment any more

2020-04-25 Thread godfrey he (Jira)
godfrey he created FLINK-17383:
--

 Summary: flink legacy planner should not use CollectionEnvironment 
any more
 Key: FLINK-17383
 URL: https://issues.apache.org/jira/browse/FLINK-17383
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Legacy Planner
Reporter: godfrey he
 Fix For: 1.11.0


As discussed in 
https://github.com/apache/flink/pull/11794,{{CollectionEnvironment}} is not a 
good practice, as it is not going through all the steps that a regular user 
program would go. We should change the tests to use {{LocalEnvironment}}. 

commit " Introduce CollectionPipelineExecutor for CollectionEnvironment 
([c983ac9|https://github.com/apache/flink/commit/c983ac9c49b7b58394574efdde4f39e8d33a8582])"
  should also be reverted at that mement.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17267) supports EXPLAIN statement in TableEnvironment#executeSql and Table#explain api

2020-04-20 Thread godfrey he (Jira)
godfrey he created FLINK-17267:
--

 Summary: supports EXPLAIN statement in TableEnvironment#executeSql 
and Table#explain api
 Key: FLINK-17267
 URL: https://issues.apache.org/jira/browse/FLINK-17267
 Project: Flink
  Issue Type: Sub-task
Reporter: godfrey he
 Fix For: 1.11.0


[FLINK-16366|https://issues.apache.org/jira/browse/FLINK-16366] has introduced 
executeSql method in TableEnvironment, but EXPLAIN statement is not supported 
because [FLINK-17126|https://issues.apache.org/jira/browse/FLINK-17126] is not 
finished. This issue aims to support EXPLAIN statement after FLINK-17126 
finished, and introduce Table#explain api at the same time.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17252) TableEnvironment#executeSql supports SELECT statement and Table introduces execute method

2020-04-19 Thread godfrey he (Jira)
godfrey he created FLINK-17252:
--

 Summary: TableEnvironment#executeSql supports SELECT statement and 
Table introduces execute method
 Key: FLINK-17252
 URL: https://issues.apache.org/jira/browse/FLINK-17252
 Project: Flink
  Issue Type: Sub-task
Reporter: godfrey he
 Fix For: 1.11.0


[FLINK-16366|https://issues.apache.org/jira/browse/FLINK-16366] has introduced 
executeSql method in TableEnvironment, but INSERT statement is not supported 
because [FLINK-17126|https://issues.apache.org/jira/browse/FLINK-17126] is not 
finished and the design of 
[FLINK-14807|https://issues.apache.org/jira/browse/FLINK-14807] is discussing 
(the whole design of Table#collect and Table#execute are similar, although 
Table does not introduce collect method based on the conclusion of FLIP-84).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17251) TableEnvironment#executeSql supports INSERT statement

2020-04-19 Thread godfrey he (Jira)
godfrey he created FLINK-17251:
--

 Summary: TableEnvironment#executeSql supports INSERT statement
 Key: FLINK-17251
 URL: https://issues.apache.org/jira/browse/FLINK-17251
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: godfrey he
 Fix For: 1.11.0


[FLINK-16366|https://issues.apache.org/jira/browse/FLINK-16366] has introduced 
executeSql method in TableEnvironment, but INSERT statement is not supported 
because [FLINK-17126|https://issues.apache.org/jira/browse/FLINK-17126] is not 
finished. This issue aims to supports INSERT statement after FLINK-17126 
finished.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17126) Correct the execution behavior of BatchTableEnvironment

2020-04-13 Thread godfrey he (Jira)
godfrey he created FLINK-17126:
--

 Summary: Correct the execution behavior of BatchTableEnvironment
 Key: FLINK-17126
 URL: https://issues.apache.org/jira/browse/FLINK-17126
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: godfrey he
 Fix For: 1.11.0


This issue is similar to 
[FLINK-16363|https://issues.apache.org/jira/browse/FLINK-16363].
In previous versions, BatchTableEnvironment.execute() can both trigger table 
and DataSet programs. Since 1.11.0, table programs can only be triggered by 
BatchTableEnvironment.execute(). Once table program is convereted into DataSet 
program (through toDataSet() method), it can only be triggered by 
ExecutionEnvironment.execute().






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17052) Introduce PlanGenerator

2020-04-08 Thread godfrey he (Jira)
godfrey he created FLINK-17052:
--

 Summary: Introduce PlanGenerator
 Key: FLINK-17052
 URL: https://issues.apache.org/jira/browse/FLINK-17052
 Project: Flink
  Issue Type: Sub-task
  Components: Client / Job Submission
Reporter: godfrey he
 Fix For: 1.11.0


As [FLINK-16533|https://issues.apache.org/jira/browse/FLINK-16533] discussed, 
We move the most part logic of {{ExecutionEnvironment#createProgramPlan}} 
method to {{PlanGenerator}}, which can be used by {{ExecutionEnvironment}} and 
flink-table-planner. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16881) use Catalog's total size info in planner

2020-03-31 Thread godfrey he (Jira)
godfrey he created FLINK-16881:
--

 Summary: use Catalog's total size info in planner
 Key: FLINK-16881
 URL: https://issues.apache.org/jira/browse/FLINK-16881
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Reporter: godfrey he


in some case, {{Catalog}} only contains {{totalSize}} and row count is unknown. 
we also can use {{totalSize}} to infer row count, or even use {{totalSize}} to 
decide whether the join is broadcast join



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16822) The config set by SET command does not work

2020-03-26 Thread godfrey he (Jira)
godfrey he created FLINK-16822:
--

 Summary: The config set by SET command does not work
 Key: FLINK-16822
 URL: https://issues.apache.org/jira/browse/FLINK-16822
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.10.0
Reporter: godfrey he
 Fix For: 1.11.0


Users can add or change the properties for execution behavior through SET 
command in SQL client, e.g. {{SET execution.parallelism=10}}, {{SET 
table.optimizer.join-reorder-enabled=true}}. But the {{table.xx}} config can't 
change the TableEnvironment behavior, because the property set from CLI does 
not be set into TableEnvironment's table config.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16535) BatchTableSink#emitDataSet returns DataSink

2020-03-10 Thread godfrey he (Jira)
godfrey he created FLINK-16535:
--

 Summary: BatchTableSink#emitDataSet returns DataSink
 Key: FLINK-16535
 URL: https://issues.apache.org/jira/browse/FLINK-16535
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he
 Fix For: 1.11.0


Add return value for {{BatchTableSink#emitDataSet}} to support generating 
{{DataSet}} plan in {{BatchTableEnvironment}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16533) ExecutionEnvironment supports executing plan

2020-03-10 Thread godfrey he (Jira)
godfrey he created FLINK-16533:
--

 Summary: ExecutionEnvironment supports executing plan
 Key: FLINK-16533
 URL: https://issues.apache.org/jira/browse/FLINK-16533
 Project: Flink
  Issue Type: Sub-task
  Components: Client / Job Submission
Reporter: godfrey he
 Fix For: 1.11.0


Currently, {{ExecutionEnvironment}} only supports executing the plan generated 
by self.
FLIP-84 proposes {{TableEnvironment}} can only trigger the table program and 
the {{StreamExecutionEnvironment}}/{{ExecutionEnvironment}} can only trigger 
{{DataStream}}/{{DataSet}} program. This requires that {{ExecutionEnvironment}} 
can execute the plan generated by {{TableEnvironment}}. We propose to add two 
methods in  {{ExecutionEnvironment}}: (which is similar to 
{{StreamExecutionEnvironment}}#execute(StreamGraph) and 
{{StreamExecutionEnvironment}}#executeAsync(StreamGraph))

{code:java}
@Internal
public JobExecutionResult execute(Plan plan) throws Exception {
.
}

@Internal
public JobClient executeAsync(Plan plan) throws Exception {
.
}
{code}








--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16519) CheckpointCoordinatorFailureTest logs LinkageErrors

2020-03-09 Thread godfrey he (Jira)
godfrey he created FLINK-16519:
--

 Summary: CheckpointCoordinatorFailureTest logs LinkageErrors
 Key: FLINK-16519
 URL: https://issues.apache.org/jira/browse/FLINK-16519
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Reporter: godfrey he
 Fix For: 1.11.0


This issue is in 
https://travis-ci.org/apache/flink/jobs/660152153?utm_medium=notification_source=slack

Log output

{code:java}
2020-03-09 15:52:14,550 main ERROR Could not reconfigure JMX 
java.lang.LinkageError: loader constraint violation: loader (instance of 
org/powermock/core/classloader/javassist/JavassistMockClassLoader) previously 
initiated loading for a different type with name "javax/management/MBeanServer"
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
at 
org.powermock.core.classloader.javassist.JavassistMockClassLoader.loadUnmockedClass(JavassistMockClassLoader.java:90)
at 
org.powermock.core.classloader.MockClassLoader.loadClassByThisClassLoader(MockClassLoader.java:104)
at 
org.powermock.core.classloader.DeferSupportingClassLoader.loadClass1(DeferSupportingClassLoader.java:147)
at 
org.powermock.core.classloader.DeferSupportingClassLoader.loadClass(DeferSupportingClassLoader.java:98)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
at 
org.apache.logging.log4j.core.jmx.Server.unregisterAllMatching(Server.java:337)
at 
org.apache.logging.log4j.core.jmx.Server.unregisterLoggerContext(Server.java:261)
at 
org.apache.logging.log4j.core.jmx.Server.reregisterMBeansAfterReconfigure(Server.java:165)
at 
org.apache.logging.log4j.core.jmx.Server.reregisterMBeansAfterReconfigure(Server.java:141)
at 
org.apache.logging.log4j.core.LoggerContext.setConfiguration(LoggerContext.java:590)
at 
org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:651)
at 
org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:668)
at 
org.apache.logging.log4j.core.LoggerContext.start(LoggerContext.java:253)
at 
org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:153)
at 
org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:45)
at org.apache.logging.log4j.LogManager.getContext(LogManager.java:194)
at 
org.apache.logging.log4j.spi.AbstractLoggerAdapter.getContext(AbstractLoggerAdapter.java:138)
at 
org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:45)
at 
org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:48)
at 
org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:30)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:329)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:349)
at org.apache.flink.util.TestLogger.(TestLogger.java:36)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorFailureTest.(CheckpointCoordinatorFailureTest.java:55)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.createTestInstance(PowerMockJUnit44RunnerDelegateImpl.java:197)
at 
org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.createTest(PowerMockJUnit44RunnerDelegateImpl.java:182)
at 
org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.invokeTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:204)
at 
org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.runMethods(PowerMockJUnit44RunnerDelegateImpl.java:160)
at 
org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$1.run(PowerMockJUnit44RunnerDelegateImpl.java:134)
at 
org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:34)
at 
org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:44)
at 
org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.run(PowerMockJUnit44RunnerDelegateImpl.java:136)
at 
org.powermock.modules.junit4.common.internal.impl.JUnit4TestSuiteChunkerImpl.run(JUnit4TestSuiteChunkerImpl.java:117)
at 
org.powermock.modules.junit4.common.internal.impl.AbstractCommonPowerMockRunner.run(AbstractCommonPowerMockRunner.java:57)
at 

[jira] [Created] (FLINK-16367) Introduce createDmlBatch method in TableEnvironment

2020-03-02 Thread godfrey he (Jira)
godfrey he created FLINK-16367:
--

 Summary: Introduce createDmlBatch method in TableEnvironment 
 Key: FLINK-16367
 URL: https://issues.apache.org/jira/browse/FLINK-16367
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: godfrey he
 Fix For: 1.11.0


As we deprecates {{execute}} method and {{explain}} method because of buffering 
SQLs/Tables execution problem. This issue aims to introduce a new method named 
createDmlBatch to support executing and explaining the batching queries.

The method likes like:

{code:java}
interface TableEnvironment {

 /** 
  * Create a DmlBatch instance which can add dml statements or Tables to the 
batch,
  * the planner can optimize all added statements and Tables together for 
better performance.
  */
  DmlBatch createDmlBatch();
}

interface DmlBatch {

  /** 
* add insert statement to the batch.
*/
   void addInsert(String insert);

  /** 
   * add Table with the given sink table name to the batch. 
   */
   void addInsert(String targetPath, Table table);

  /** 
   * execute all statements and Tables as a batch.
   * 
   * The added statements and Tables will be cleared when  this method. 
   */
   ResultTable execute() throws Exception;
  
   /** 
* returns the AST and the execution plan to compute the result of the all 
statements and Tables.
* 
* @param extended if the plan should contain additional properties. e.g. 
estimated cost, traits
*/
String explain(boolean extended);

}
{code}






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16366) Introduce executeStatement method in TableEnvironment

2020-03-02 Thread godfrey he (Jira)
godfrey he created FLINK-16366:
--

 Summary: Introduce executeStatement method in TableEnvironment
 Key: FLINK-16366
 URL: https://issues.apache.org/jira/browse/FLINK-16366
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: godfrey he
 Fix For: 1.11.0


This issue aims to introduce {{executeStatement}} which synchronously executes 
the given single statement immediately, and returns the execution result.


{code:java}
/**
 * Synchronously execute the given single statement immediately and the 
statement can be DDL/DML/SHOW/DESCRIBE/EXPLAIN/USE. 
 * If the statement is translated to a Flink job, the result will be returned 
until the job is finished.
 *  
 * @return result for SHOW/DESCRIBE/EXPLAIN, the affected row count for `DML` 
(-1 means unknown), or a string message ("OK") for other  statements.
 * @throws Exception which occurs during the execution.
*/
ResultTable executeStatement(String statement) throws Exception;
{code}


{code:java}
/** 
 * A ResultTable is the representation of the statement execution result.
 */
public interface ResultTable {


  /** 
   * Get the schema of ResultTable. 
   */
TableSchema getResultSchema();


  /**
*Get the result contents as an iterable rows. 
*/
Iterable getResultRows();
}
{code}





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16364) Deprecate the methods in TableEnvironment proposed by FLIP-84

2020-03-01 Thread godfrey he (Jira)
godfrey he created FLINK-16364:
--

 Summary: Deprecate the methods in TableEnvironment proposed by 
FLIP-84
 Key: FLINK-16364
 URL: https://issues.apache.org/jira/browse/FLINK-16364
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: godfrey he
 Fix For: 1.11.0


In 
[FLIP-84|https://cwiki.apache.org/confluence/display/FLINK/FLIP-84%3A+Improve+%26+Refactor+API+of+TableEnvironment],
 We propose to deprecate the following methods in TableEnvironment: 
{code:java}
void sqlUpdate(String sql)
void insertInto(String targetPath, Table table)
void execute(String jobName)
String explain(boolean extended)
Table fromTableSource(TableSource source)
{code}
This issue aims to deprecate them.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16363) Correct the execution behavior of TableEnvironment and StreamTableEnvironment

2020-03-01 Thread godfrey he (Jira)
godfrey he created FLINK-16363:
--

 Summary: Correct the execution behavior of TableEnvironment and 
StreamTableEnvironment
 Key: FLINK-16363
 URL: https://issues.apache.org/jira/browse/FLINK-16363
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: godfrey he
 Fix For: 1.11.0


Both {{TableEnvironment.execute()}} and {{StreamExecutionEnvironment.execute}} 
can trigger a Flink table program execution. However if you use 
{{TableEnvironment}} to build a Flink table program, you must use 
{{TableEnvironment.execute()}} to trigger execution, because you can’t get the 
{{StreamExecutionEnvironment}} instance. If you use {{StreamTableEnvironment}} 
to build a Flink table program, you can use both to trigger execution. If you 
convert a table program to a {{DataStream}} program (using 
{{StreamExecutionEnvironment.toAppendStream/toRetractStream}}), you also can 
use both to trigger execution. So it’s hard to explain which `execute` method 
should be used.

To correct current messy trigger point, we propose that: for 
{{TableEnvironment}} and {{StreamTableEnvironment}}, you must use 
{{TableEnvironment.execute()}} to trigger table program execution, once you 
convert the table program to a {{DataStream}} program (through 
{{toAppendStream}} or {{toRetractStream}} method), you must use 
{{StreamExecutionEnvironment.execute}} to trigger the {{DataStream}} program.

please refer to 
[FLIP-84|https://cwiki.apache.org/confluence/display/FLINK/FLIP-84%3A+Improve+%26+Refactor+API+of+TableEnvironment]
 for more detail.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16362) remove deprecated method in StreamTableSink

2020-03-01 Thread godfrey he (Jira)
godfrey he created FLINK-16362:
--

 Summary: remove deprecated method in StreamTableSink
 Key: FLINK-16362
 URL: https://issues.apache.org/jira/browse/FLINK-16362
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: godfrey he
 Fix For: 1.11.0


[FLIP-84|https://cwiki.apache.org/confluence/display/FLINK/FLIP-84%3A+Improve+%26+Refactor+API+of+TableEnvironment]
 proposes to unify the behavior of {{TableEnvironment}} and 
{{StreamTableEnvironment}}, and requires the {{StreamTableSink}} always returns 
{{DataStream}}. However
{{StreamTableSink.emitDataStream}} returns nothing and is deprecated since 
Flink 1.9, So we will remove it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16361) FLIP-84: Improve & Refactor API of TableEnvironment

2020-03-01 Thread godfrey he (Jira)
godfrey he created FLINK-16361:
--

 Summary: FLIP-84: Improve & Refactor API of TableEnvironment
 Key: FLINK-16361
 URL: https://issues.apache.org/jira/browse/FLINK-16361
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: godfrey he
 Fix For: 1.11.0


as the 
[FLIP-84|https://cwiki.apache.org/confluence/display/FLINK/FLIP-84%3A+Improve+%26+Refactor+API+of+TableEnvironment]
 document described, 

We propose to deprecate the following methods in TableEnvironment:
{code:java}
void sqlUpdate(String sql)
void insertInto(String targetPath, Table table)
void execute(String jobName)
String explain(boolean extended)
Table fromTableSource(TableSource source)
{code}

meanwhile, we propose to introduce the following new methods in 
TableEnvironment:

{code:java}
// synchronously execute the given single statement immediately, and return the 
execution result.
ResultTable executeStatement(String statement) 

public interface ResultTable {
TableSchema getResultSchema();
Iterable getResultRows();
}

// create a DmlBatch instance which can add dml statements or Tables to the 
batch and explain or execute them as a batch.
DmlBatch createDmlBatch()

interface DmlBatch {
void addInsert(String insert);
void addInsert(String targetPath, Table table);
ResultTable execute() throws Exception ;
String explain(boolean extended);
}
{code}


We unify the Flink table program trigger point, and propose that: for 
TableEnvironment and StreamTableEnvironment, you must use 
`TableEnvironment.execute()` to trigger table program execution, once you 
convert the table program to a DataStream program (through `toAppendStream` or 
`toRetractStream` method), you must use `StreamExecutionEnvironment.execute` to 
trigger the DataStream program.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16322) wrong result after filter push down in parquet table source

2020-02-27 Thread godfrey he (Jira)
godfrey he created FLINK-16322:
--

 Summary: wrong result after filter push down in parquet table 
source
 Key: FLINK-16322
 URL: https://issues.apache.org/jira/browse/FLINK-16322
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: godfrey he
 Attachments: parquet-1-1.parquet

I get the wrong result when run the following query:

source schema:
first VARCHAR
id INT
score DOUBLE 
last VARCHAR

data: (parquet file is in the attachment)
("Mike", 1, 12.3d, "Smith"),
("Bob", 2, 45.6d, "Taylor"),
("Sam", 3, 7.89d, "Miller"),
("Peter", 4, 0.12d, "Smith"),
("Liz", 5, 34.5d, "Williams"),
("Sally", 6, 6.78d, "Miller"),
("Alice", 7, 90.1d, "Smith"),
("Kelly", 8, 2.34d, "Williams")

query:
SELECT id, `first`, `last`, score FROM ParquetTable WHERE score < 3

the expected result size is 2, however the actual result size is 0. 








--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16315) throw JsonMappingException when using BatchTableEnvironment#explain to get the plan of sql with constant string

2020-02-27 Thread godfrey he (Jira)
godfrey he created FLINK-16315:
--

 Summary: throw JsonMappingException when using 
BatchTableEnvironment#explain to get the plan of sql with constant string  
 Key: FLINK-16315
 URL: https://issues.apache.org/jira/browse/FLINK-16315
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Legacy Planner
Reporter: godfrey he



{code:java}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

tEnv.registerTableSource("MyTable", CommonTestData.getCsvTableSource());

Table table = tEnv.sqlQuery("select * from MyTable where first = '274' ");

System.out.println(tEnv.explain(table));
{code}

when executing the above code, the following exception will occur.

{panel:title=exception}
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException:
 Unexpected character ('U' (code 85)): was expecting comma to separate Object 
entries
 at [Source: (String)"{
"nodes": [

{
"id": 2,
"type": "source",
"pact": "Data Source",
"contents": "CsvTableSource(read fields: first, id, score, 
last)",
"parallelism": "8",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniq"[truncated 3501 chars]; line: 41, 
column: 15] (through reference chain: 
org.apache.flink.table.explain.PlanTree["nodes"]->java.util.ArrayList[1])

at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:394)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:365)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:302)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:245)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:27)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:288)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:151)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4202)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3205)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3173)
at 
org.apache.flink.table.explain.PlanJsonParser.getSqlExecutionPlan(PlanJsonParser.java:42)
at 
org.apache.flink.table.api.internal.BatchTableEnvImpl.explain(BatchTableEnvImpl.scala:208)
at 
org.apache.flink.table.api.internal.BatchTableEnvImpl.explain(BatchTableEnvImpl.scala:223)
{panel}





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16195) append constant field to unique key set on project when deriving unique keys in FlinkRelMdUniqueKeys

2020-02-20 Thread godfrey he (Jira)
godfrey he created FLINK-16195:
--

 Summary: append constant field to unique key set on project when 
deriving unique keys in FlinkRelMdUniqueKeys
 Key: FLINK-16195
 URL: https://issues.apache.org/jira/browse/FLINK-16195
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: godfrey he


currently, `FlinkRelMdUniqueKeys` only supports deriving unique keys on 
non-constant fields. such as: `select a, b, 1, count(*) from T group by a, b`, 
currently the derived unique keys is `a, b`. However `a, b, 1` is also a unique 
key, and the result is `a, b` and `a, b, 1`.
note: Ideally, the planner does not require the constant key in the unique key 
set, all constant values are pulled up or removed as much as possible. Supports 
this improvement to handle some corner cases in cbo.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16110) LogicalTypeParser can't parse "TIMESTAMP(3) *ROWTIME*" and TIMESTAMP(3) *PROCTIME*

2020-02-16 Thread godfrey he (Jira)
godfrey he created FLINK-16110:
--

 Summary: LogicalTypeParser can't parse "TIMESTAMP(3) *ROWTIME*" 
and TIMESTAMP(3) *PROCTIME*
 Key: FLINK-16110
 URL: https://issues.apache.org/jira/browse/FLINK-16110
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.10.0
Reporter: godfrey he


 {{TIMESTAMP(3) *ROWTIME*}} is the string representation of 
{{TimestampType(true, TimestampKind.ROWTIME, 3)}} , however 
{{LogicalTypeParser}} can't convert it to  {{TimestampType(true, 
TimestampKind.ROWTIME, 3)}}. 
TIMESTAMP(3) *PROCTIME* is the same case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15669) SQL client can't cancel flink job

2020-01-19 Thread godfrey he (Jira)
godfrey he created FLINK-15669:
--

 Summary: SQL client can't cancel flink job
 Key: FLINK-15669
 URL: https://issues.apache.org/jira/browse/FLINK-15669
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.10.0
Reporter: godfrey he
 Fix For: 1.10.0


in sql client, CLI client do cancel query through {{void cancelQuery(String 
sessionId, String resultId)}} method in {{Executor}}. However, the {{resultId}} 
is a random UUID, is not the job id. So CLI client can't cancel a running job.


{code:java}
private  ResultDescriptor executeQueryInternal(String sessionId, 
ExecutionContext context, String query) {
..

// store the result with a unique id
final String resultId = UUID.randomUUID().toString();
resultStore.storeResult(resultId, result);

   ..

// create execution
final ProgramDeployer deployer = new ProgramDeployer(
configuration, jobName, pipeline);

// start result retrieval
result.startRetrieval(deployer);

return new ResultDescriptor(
resultId,
removeTimeAttributes(table.getSchema()),
result.isMaterialized());
}

private  void cancelQueryInternal(ExecutionContext context, String 
resultId) {
..

// stop Flink job
try (final ClusterDescriptor clusterDescriptor = 
context.createClusterDescriptor()) {
ClusterClient clusterClient = null;
try {
// retrieve existing cluster
clusterClient = 
clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
try {
clusterClient.cancel(new 
JobID(StringUtils.hexStringToByte(resultId))).get();
} catch (Throwable t) {
// the job might has finished earlier
}
} catch (Exception e) {
throw new SqlExecutionException("Could not 
retrieve or create a cluster.", e);
} finally {
try {
if (clusterClient != null) {
clusterClient.close();
}
} catch (Exception e) {
// ignore
}
}
} catch (SqlExecutionException e) {
throw e;
} catch (Exception e) {
throw new SqlExecutionException("Could not locate a 
cluster.", e);
}
}
{code}







--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15472) Support SQL Client Gateway

2020-01-03 Thread godfrey he (Jira)
godfrey he created FLINK-15472:
--

 Summary: Support SQL Client Gateway 
 Key: FLINK-15472
 URL: https://issues.apache.org/jira/browse/FLINK-15472
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Client
Reporter: godfrey he


FLIP-91: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-91%3A+Support+SQL+Client+Gateway
design document: 
https://docs.google.com/document/d/1T7--664rts4t_4gjRPw937S9ln9Plf1yghNQ9IiHQtQ



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15123) remove uniqueKeys from FlinkStatistic in blink planner

2019-12-07 Thread godfrey he (Jira)
godfrey he created FLINK-15123:
--

 Summary: remove uniqueKeys from FlinkStatistic in blink planner 
 Key: FLINK-15123
 URL: https://issues.apache.org/jira/browse/FLINK-15123
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: godfrey he


{{uniqueKeys}} is a kind of constraint, it's unreasonable that {{uniqueKeys}} 
is a kind of statistic. so we should remove uniqueKeys from {{FlinkStatistic}} 
in blink planner. Some temporary solutions (e.g. 
{{RichTableSourceQueryOperation}}) should also be resolved after primaryKey is 
introduced in {{TableSchema}} 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15095) bridge table schema's primary key to metadata handler in blink planner

2019-12-06 Thread godfrey he (Jira)
godfrey he created FLINK-15095:
--

 Summary: bridge table schema's primary key to metadata handler in 
blink planner
 Key: FLINK-15095
 URL: https://issues.apache.org/jira/browse/FLINK-15095
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he
 Fix For: 1.10.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15004) Choose SortMergeJoin instead of HashJoin if the statistics is unknown

2019-12-01 Thread godfrey he (Jira)
godfrey he created FLINK-15004:
--

 Summary: Choose SortMergeJoin instead of HashJoin if the 
statistics is unknown
 Key: FLINK-15004
 URL: https://issues.apache.org/jira/browse/FLINK-15004
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Affects Versions: 1.9.1, 1.9.0
Reporter: godfrey he
 Fix For: 1.10.0


Currently, blink planner will use default rowCount value (defined in 
{{FlinkPreparingTableBase#DEFAULT_ROWCOUNT}} ) when the statistics is unknown, 
and maybe choose {{HashJoin}} instead of {{SortMergeJoin}}. The job will hang 
if the build side has huge input size. So It's better to use {{SortMergeJoin}} 
for execution stability if the statistics is unknown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15001) The digest of sub-plan reuse should contain RelNode's trait

2019-12-01 Thread godfrey he (Jira)
godfrey he created FLINK-15001:
--

 Summary: The digest of sub-plan reuse should contain RelNode's 
trait
 Key: FLINK-15001
 URL: https://issues.apache.org/jira/browse/FLINK-15001
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.9.1, 1.9.0
Reporter: godfrey he
 Fix For: 1.10.0
 Attachments: image-2019-12-02-10-49-46-916.png, 
image-2019-12-02-10-52-01-399.png

This bug is found in [FLINK-14946| 
https://issues.apache.org/jira/browse/FLINK-14946]:

The plan for the given sql in [FLINK-14946| 
https://issues.apache.org/jira/browse/FLINK-14946] is
 !image-2019-12-02-10-49-46-916.png! 

however, the plan after sub-plan reuse is:
 !image-2019-12-02-10-52-01-399.png! 

in the first picture, we could find that the accMode of two joins are 
different, but the two joins are reused in the second picture. 

The reason is the digest of sub-plan reuse does not contain RelNode's trait now.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14999) sub-plan reuse should consider

2019-12-01 Thread godfrey he (Jira)
godfrey he created FLINK-14999:
--

 Summary: sub-plan reuse should consider
 Key: FLINK-14999
 URL: https://issues.apache.org/jira/browse/FLINK-14999
 Project: Flink
  Issue Type: Bug
Reporter: godfrey he






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14899) can not be translated to StreamExecDeduplicate when PROCTIME is defined in query

2019-11-21 Thread godfrey he (Jira)
godfrey he created FLINK-14899:
--

 Summary: can not be translated to StreamExecDeduplicate when 
PROCTIME is defined in query
 Key: FLINK-14899
 URL: https://issues.apache.org/jira/browse/FLINK-14899
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.9.1, 1.9.0
Reporter: godfrey he
 Fix For: 1.10.0


CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = 'localhost:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
);

CREATE TABLE user_dist (
dt VARCHAR,
user_id VARCHAR,
behavior VARCHAR
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/flink-test',
'connector.table' = 'user_behavior_dup',
'connector.username' = 'root',
'connector.password' = ‘**',
'connector.write.flush.max-rows' = '1'
);

INSERT INTO user_dist
SELECT
  dt,
  user_id,
  behavior
FROM (
   SELECT
  dt,
  user_id,
  behavior,
 ROW_NUMBER() OVER (PARTITION BY dt, user_id, behavior ORDER BY proc asc ) 
AS rownum
   FROM (select DATE_FORMAT(ts, '-MM-dd HH:00') as 
dt,user_id,behavior,PROCTIME() as proc
from user_log) )
WHERE rownum = 1;

Exception in thread "main" org.apache.flink.table.api.TableException: 
UpsertStreamTableSink requires that Table has a full primary keys if it is 
updated.
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14874) add local aggregate to solve data skew for ROLLUP/CUBE case

2019-11-20 Thread godfrey he (Jira)
godfrey he created FLINK-14874:
--

 Summary: add local aggregate to solve data skew for ROLLUP/CUBE 
case
 Key: FLINK-14874
 URL: https://issues.apache.org/jira/browse/FLINK-14874
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Affects Versions: 1.9.1, 1.9.0
Reporter: godfrey he
 Fix For: 1.10.0


Many tpc-ds queries have {{rollup}} keyword, which will be translated to 
multiple groups. 
for example:  {{group by rollup (channel, id) }} is equivalent {{ group by 
(channel, id)}} +  {{ group by (channel)}} +  {{ group by () }}. 
All data on empty group will be shuffled to a single node, It is a typical data 
skew case. If there is a local aggregate, the data size shuffled to the single 
node will be greatly reduced. However, currently the cost mode can't estimate 
the local aggregate's cost, and the plan with local aggregate may be chose even 
the query has {{rollup}} keyword.
we could add a rule based phase (after physical phase) to enforce local 
aggregate if it's input has empty group.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14789) extends max/min type in ColumnStats from Number to Comparable

2019-11-14 Thread godfrey he (Jira)
godfrey he created FLINK-14789:
--

 Summary: extends max/min type in ColumnStats from Number to 
Comparable
 Key: FLINK-14789
 URL: https://issues.apache.org/jira/browse/FLINK-14789
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he
 Fix For: 1.10.0


Many tpc-ds queries have predicates on date, like `d_date between '1999-02-01' 
and (cast('1999-02-01' as date) + INTERVAL '60' day)`, It's very useful to find 
a better plan if the planner knows the max/min values of date. However, max/min 
in {{ColumnStats}} only support {{Number}} type currently. This issue aims to 
extend max/min type from {{Number}} to {{Comparable}}, and then {{Date}}, 
{{Time}}, {{Timestamp}} even {{String}} could be supported.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14724) Join condition could be simplified in logical phase

2019-11-12 Thread godfrey he (Jira)
godfrey he created FLINK-14724:
--

 Summary: Join condition could be simplified in logical phase
 Key: FLINK-14724
 URL: https://issues.apache.org/jira/browse/FLINK-14724
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Affects Versions: 1.9.1, 1.9.0
Reporter: godfrey he
 Fix For: 1.10.0


currently the plan of tpcds q38.sql contains {{NestedLoopJoin}}, because it's 
join condition is {{CAST(AND(IS NOT DISTINCT FROM($2, $3), IS NOT DISTINCT 
FROM($1, $4), IS NOT DISTINCT FROM($0, $5))):BOOLEAN}}, and planner can't find 
equal join keys from the condition by {{Join#analyzeCondition. 
{{SimplifyJoinConditionRule}} could solve this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14656) blink planner should convert catalog statistics to TableStats for permanent table instead of temporary table

2019-11-07 Thread godfrey he (Jira)
godfrey he created FLINK-14656:
--

 Summary: blink planner should convert catalog statistics to 
TableStats for permanent table instead of temporary table
 Key: FLINK-14656
 URL: https://issues.apache.org/jira/browse/FLINK-14656
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.9.1, 1.9.0
Reporter: godfrey he
 Fix For: 1.10.0


currently, blink planner will convert {{CatalogTable}} to Calcite {{Table}}, 
and convert the catalog statistics to `TableStats` in 
{{DatabaseCalciteSchema}}. However, the catalog statistics conversion is only 
for temporary table which has no any statistics now. It should be for permanent 
table.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-13708) transformations should be cleared because a table environment could execute multiple job

2019-08-14 Thread godfrey he (JIRA)
godfrey he created FLINK-13708:
--

 Summary: transformations should be cleared because a table 
environment could execute multiple job
 Key: FLINK-13708
 URL: https://issues.apache.org/jira/browse/FLINK-13708
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: godfrey he
 Fix For: 1.9.0


currently, if a table environment execute more than one sql jobs, the following 
job contains transformations about the previous job. the reason is the 
transformations is not cleared after execution



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13611) Introduce analyze statistic utility to generate table & column statistics

2019-08-07 Thread godfrey he (JIRA)
godfrey he created FLINK-13611:
--

 Summary: Introduce analyze statistic utility to generate table & 
column statistics
 Key: FLINK-13611
 URL: https://issues.apache.org/jira/browse/FLINK-13611
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Reporter: godfrey he
 Fix For: 1.10.0


this issue aims to introduce a utility class to generate table & column 
statistics, the main steps include: 
1. generate sql, like {{select approx_count_distinct(a) as ndv, count(1) - 
count(a) as nullCount, avg(char_length(a)) as avgLen, max(char_lenght(a)) as 
maxLen, max(a) as maxValue, min(a) as minValue, ... from MyTable }}
2. execute the query
3. convert to the result to {{TableStats}} (maybe the source table is not a 
catalog table)
4. convert to {{TableStats}} to {{CatalogTableStatistics}} if needed

This issue does not involve DDL, however the DDL could use this utility class 
once it's supported.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13563) TumblingGroupWindow should implement toString method

2019-08-02 Thread godfrey he (JIRA)
godfrey he created FLINK-13563:
--

 Summary: TumblingGroupWindow should implement toString method
 Key: FLINK-13563
 URL: https://issues.apache.org/jira/browse/FLINK-13563
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.9.0, 1.10.0
Reporter: godfrey he
 Fix For: 1.9.0, 1.10.0


{code:scala}
  @Test
  def testAllEventTimeTumblingGroupWindowOverTime(): Unit = {
val util = streamTestUtil()
val table = util.addDataStream[(Long, Int, String)](
  "T1", 'long, 'int, 'string, 'rowtime.rowtime)

val windowedTable = table
  .window(Tumble over 5.millis on 'rowtime as 'w)
  .groupBy('w)
  .select('int.count)
util.verifyPlan(windowedTable)
  }
{code}

currently, it's physical plan is 

{code:java}
HashWindowAggregate(window=[TumblingGroupWindow], select=[Final_COUNT(count$0) 
AS EXPR$0])
+- Exchange(distribution=[single])
   +- LocalHashWindowAggregate(window=[TumblingGroupWindow], 
select=[Partial_COUNT(int) AS count$0])
  +- TableSourceScan(table=[[default_catalog, default_database, Table1, 
source: [TestTableSource(long, int, string)]]], fields=[long, int, string])
{code}

we know nothing about the TumblingGroupWindow except its name




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13562) throws exception when FlinkRelMdColumnInterval meets two stage stream group aggregate

2019-08-02 Thread godfrey he (JIRA)
godfrey he created FLINK-13562:
--

 Summary: throws exception when FlinkRelMdColumnInterval meets two 
stage stream group aggregate
 Key: FLINK-13562
 URL: https://issues.apache.org/jira/browse/FLINK-13562
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: godfrey he
 Fix For: 1.9.0, 1.10.0


test case:

{code:scala}
  @Test
  def testTwoDistinctAggregateWithNonDistinctAgg(): Unit = {
util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
util.verifyPlan("SELECT c, SUM(DISTINCT a), SUM(a), COUNT(DISTINCT b) FROM 
MyTable GROUP BY c")
  }
{code}



org.apache.flink.table.api.TableException: Sum aggregate function does not 
support type: ''VARCHAR''.
Please re-check the data type.

at 
org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createSumAggFunction(AggFunctionFactory.scala:191)
at 
org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:74)
at 
org.apache.flink.table.planner.plan.utils.AggregateUtil$$anonfun$9.apply(AggregateUtil.scala:285)
at 
org.apache.flink.table.planner.plan.utils.AggregateUtil$$anonfun$9.apply(AggregateUtil.scala:279)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:279)
at 
org.apache.flink.table.planner.plan.utils.AggregateUtil$.getOutputIndexToAggCallIndexMap(AggregateUtil.scala:154)
at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdColumnInterval.getAggCallIndexInLocalAgg$1(FlinkRelMdColumnInterval.scala:504)
at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdColumnInterval.estimateColumnIntervalOfAggregate(FlinkRelMdColumnInterval.scala:526)
at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdColumnInterval.getColumnInterval(FlinkRelMdColumnInterval.scala:417)
at GeneratedMetadataHandler_ColumnInterval.getColumnInterval_$(Unknown 
Source)
at GeneratedMetadataHandler_ColumnInterval.getColumnInterval(Unknown 
Source)
at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.getColumnInterval(FlinkRelMetadataQuery.java:122)



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13545) JoinToMultiJoinRule should not match SEMI/ANTI LogicalJoin

2019-08-01 Thread godfrey he (JIRA)
godfrey he created FLINK-13545:
--

 Summary: JoinToMultiJoinRule should not match SEMI/ANTI LogicalJoin
 Key: FLINK-13545
 URL: https://issues.apache.org/jira/browse/FLINK-13545
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.9.0, 1.10.0
Reporter: godfrey he
 Fix For: 1.9.0, 1.10.0


run tpcds 14.a on blink planner, an exception will thrown

java.lang.ArrayIndexOutOfBoundsException: 84

at 
org.apache.calcite.rel.rules.JoinToMultiJoinRule$InputReferenceCounter.visitInputRef(JoinToMultiJoinRule.java:564)
at 
org.apache.calcite.rel.rules.JoinToMultiJoinRule$InputReferenceCounter.visitInputRef(JoinToMultiJoinRule.java:555)
at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
at 
org.apache.calcite.rex.RexVisitorImpl.visitCall(RexVisitorImpl.java:80)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
at 
org.apache.calcite.rel.rules.JoinToMultiJoinRule.addOnJoinFieldRefCounts(JoinToMultiJoinRule.java:481)
at 
org.apache.calcite.rel.rules.JoinToMultiJoinRule.onMatch(JoinToMultiJoinRule.java:166)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:284)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)


the reason is {{JoinToMultiJoinRule}} should match SEMI/ANTI LogicalJoin. 
before calcite-1.20, SEMI join is represented by {{SemiJoin}} which is not 
matched {{JoinToMultiJoinRule}}.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13502) CatalogTableStatisticsConverter should be in planner.utils package

2019-07-30 Thread godfrey he (JIRA)
godfrey he created FLINK-13502:
--

 Summary: CatalogTableStatisticsConverter should be in 
planner.utils package
 Key: FLINK-13502
 URL: https://issues.apache.org/jira/browse/FLINK-13502
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: godfrey he
 Fix For: 1.9.0, 1.10.0


currently, {{CatalogTableStatisticsConverter}} is in 
{{org.apache.flink.table.util}}, its correct position is 
{{org.apache.flink.table.planner.utils}}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13404) Port csv factories & validator from flink-table-planner to flink-csv

2019-07-24 Thread godfrey he (JIRA)
godfrey he created FLINK-13404:
--

 Summary: Port csv factories & validator from flink-table-planner 
to flink-csv
 Key: FLINK-13404
 URL: https://issues.apache.org/jira/browse/FLINK-13404
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Legacy Planner
Reporter: godfrey he
 Fix For: 1.9.0, 1.10.0


Blink planner does not define any csv factories & validator, so port csv 
factories & validator from flink-table-planner to flink-csv, and let both 
planners use them



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13403) Correct package name after relocation

2019-07-24 Thread godfrey he (JIRA)
godfrey he created FLINK-13403:
--

 Summary: Correct package name after relocation
 Key: FLINK-13403
 URL: https://issues.apache.org/jira/browse/FLINK-13403
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: godfrey he
 Fix For: 1.9.0, 1.10.0


some scala classes's package name is not updated after 
[FLINK-13266|https://issues.apache.org/jira/browse/FLINK-13267], this issue 
aims to correct the package names



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13347) should handle new JoinRelType(SEMI/ANTI) in switch case

2019-07-21 Thread godfrey he (JIRA)
godfrey he created FLINK-13347:
--

 Summary: should handle new JoinRelType(SEMI/ANTI) in switch case
 Key: FLINK-13347
 URL: https://issues.apache.org/jira/browse/FLINK-13347
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Legacy Planner, Table SQL / Planner
Reporter: godfrey he
 Fix For: 1.9.0, 1.10.0


Calcite 1.20 introduces {{SEMI}} & {{ANTI}} to {{JoinRelType}}, blink planner & 
flink planner should handle them in each switch case



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13269) copy RelDecorrelator from blink planner to flink planner to fix CALCITE-3169 & CALCITE-3170

2019-07-15 Thread godfrey he (JIRA)
godfrey he created FLINK-13269:
--

 Summary: copy RelDecorrelator from blink planner to flink planner 
to fix CALCITE-3169 & CALCITE-3170
 Key: FLINK-13269
 URL: https://issues.apache.org/jira/browse/FLINK-13269
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: godfrey he
Assignee: godfrey he
 Fix For: 1.9.0, 1.10.0


[CALCITE-3169|https://issues.apache.org/jira/browse/CALCITE-3169] & 
[CALCITE-3170|https://issues.apache.org/jira/browse/CALCITE-3170] are not fixed 
in Calcite-1.20. 
`RelDecorrelator` is copied from Calcite to blink planner to resolve those two 
bug. to make both planners available in one jar, `RelDecorrelator` should also 
be copied to flink planner.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13268) revert SqlSplittableAggFunction to make two planners available in one jar

2019-07-15 Thread godfrey he (JIRA)
godfrey he created FLINK-13268:
--

 Summary: revert SqlSplittableAggFunction to make two planners 
available in one jar
 Key: FLINK-13268
 URL: https://issues.apache.org/jira/browse/FLINK-13268
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: godfrey he
Assignee: godfrey he
 Fix For: 1.9.0, 1.10.0


currently, SqlSplittableAggFunction is copied from Calcite and its `topSplit` 
method is extended to supports Left/Right outer join (see more: 
[CALCITE-2378|http://issues.apache.org/jira/browse/CALCITE-2378]). this new 
feature is only used for tpc-h now, so we will revert this class to make both 
planners available in  one jar.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13263) supports explain DAG plan in flink-python

2019-07-15 Thread godfrey he (JIRA)
godfrey he created FLINK-13263:
--

 Summary: supports explain DAG plan in flink-python
 Key: FLINK-13263
 URL: https://issues.apache.org/jira/browse/FLINK-13263
 Project: Flink
  Issue Type: New Feature
  Components: API / Python
Reporter: godfrey he
Assignee: godfrey he
 Fix For: 1.9.0, 1.10.0


update existing `explain` to support explain DAG plan in flink-python



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13185) Bump Calcite dependency to 1.20.0 in sql parser & flink planner

2019-07-09 Thread godfrey he (JIRA)
godfrey he created FLINK-13185:
--

 Summary: Bump Calcite dependency to 1.20.0 in sql parser & flink 
planner
 Key: FLINK-13185
 URL: https://issues.apache.org/jira/browse/FLINK-13185
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Legacy Planner
Reporter: godfrey he
Assignee: godfrey he


blink planner had upgraded calcite version to 1.20.0 (before version is 
1.19.0), and blink planner will support DDL in FLINK-1.9 which depends on 
flink-sql-parser. so calcite version in flink-sql-parser should also be upgrade 
to 1.20.0.

[~walterddr], [FLINK-11935|https://issues.apache.org/jira/browse/FLINK-11935] 
will not be fixed in this issue, because supporting DDL in blink planner is 
blocked by this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13168) clarify isBatch/isStreaming/isBounded flag in flink planner and blink planner

2019-07-09 Thread godfrey he (JIRA)
godfrey he created FLINK-13168:
--

 Summary: clarify isBatch/isStreaming/isBounded flag in flink 
planner and blink planner
 Key: FLINK-13168
 URL: https://issues.apache.org/jira/browse/FLINK-13168
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Legacy Planner, Table SQL / Planner
Reporter: godfrey he
Assignee: godfrey he


in blink planner & flink planner, there are many `isBatch` and `isStreaming` 
flags, they have different meaning in different place. which makes reader and 
coder crazy. especially in blink planner, Only `StreamTableSource` could be 
used for both batch and stream. is `bounded StreamTableSource` means batch, 
`unbounded` means stream ? 

we should make it clear:
1. `isBatch` in `ConnectorCatalogTable`, which tells if the 
tableSource/tableSink is BatchTableSource/BatchTableSink
2. `isStreaming` in `TableSourceTable`, which tells if  if the current table is 
on stream planner
3. `bounded StreamTableSource` could be used for both batch and stream, while 
`unbounded StreamTableSource` could only be used for stream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13116) supports catalog statistic in blink planner

2019-07-05 Thread godfrey he (JIRA)
godfrey he created FLINK-13116:
--

 Summary: supports catalog statistic in blink planner 
 Key: FLINK-13116
 URL: https://issues.apache.org/jira/browse/FLINK-13116
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Reporter: godfrey he
Assignee: godfrey he


This issue aims to let blink planner could use catalog statistic



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13115) Introduce planner rule to support partition pruning for PartitionableTableSource

2019-07-05 Thread godfrey he (JIRA)
godfrey he created FLINK-13115:
--

 Summary: Introduce planner rule to support partition pruning for 
PartitionableTableSource
 Key: FLINK-13115
 URL: https://issues.apache.org/jira/browse/FLINK-13115
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Reporter: godfrey he
Assignee: godfrey he


This issue aims to support partition pruning for {{PartitionableTableSource}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


<    1   2   3   >