[jira] [Commented] (FLINK-9673) Improve State efficiency of bounded OVER window operators

2021-07-05 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17374835#comment-17374835
 ] 

Timo Walther commented on FLINK-9673:
-

I updated the component. Should still be valid in the new planner? CC [~jark]

> Improve State efficiency of bounded OVER window operators
> -
>
> Key: FLINK-9673
> URL: https://issues.apache.org/jira/browse/FLINK-9673
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Fabian Hueske
>Priority: Minor
>
> Currently, the implementations of bounded OVER window aggregations store the 
> complete input for the bound interval. For example for the query:
> {code:java}
> SELECT user_id, count(action) OVER (PARTITION BY user_id ORDER BY rowtime 
> RANGE INTERVAL '14' DAY PRECEDING) action_count, rowtime
> FROM 
> SELECT rowtime, user_id, action, val1, val2, val3, val4 FROM user
> {code}
> The whole records with schema {{(rowtime, user_id, action, val1, val2, val3, 
> val4)}} are stored for 14 days in order to retract them after 14 days from 
> the accumulators.
> However, it would be sufficient to only store those fields that are required 
> for the aggregtions, i.e., {{action}} in the example above. All other fields 
> could be set to {{null}} and hence significantly reduce the amount of data 
> that needs to be stored in state.
> This improvement can be applied to all four combinations of bounded 
> [rowtime|proctime] [range|rows] OVER windows.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-10014) Fix the decimal literal parameter problem for arithmetic functions in Table

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-10014.

Resolution: Won't Fix

The new type system in the new planner should have fixed this.

> Fix the decimal literal parameter problem for arithmetic functions in Table
> ---
>
> Key: FLINK-10014
> URL: https://issues.apache.org/jira/browse/FLINK-10014
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Legacy Planner
>Reporter: Xingcan Cui
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Most of the arithmetic functions added in Flink only consider double type 
> parameters (e.g., def log(base: Double, x: Double): Double =...). However, by 
> Calcite, the decimal literals are automatically cast to BigDecimal, which 
> cannot be directly applied (e.g., LOG(3.0, 9.0) throws an unsupported call 
> exception). Explicit type cast can make these functions accessible but seems 
> not an elegant solution.
> We could consider automatically casting the BigDecimal type to Double or 
> adding the corresponding methods for BigDecimal type.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #16149: [FLINK-22884][hive] HiveCatalog should mark views as generic and stor…

2021-07-05 Thread GitBox


flinkbot edited a comment on pull request #16149:
URL: https://github.com/apache/flink/pull/16149#issuecomment-859601192


   
   ## CI report:
   
   * a110537e76afae800321d5610f1a6a7898d1988f Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=19899)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #16163: [FLINK-22994][Table SQL / Planner] improve the performace of invoking…

2021-07-05 Thread GitBox


flinkbot edited a comment on pull request #16163:
URL: https://github.com/apache/flink/pull/16163#issuecomment-861284972


   
   ## CI report:
   
   * 00f7cd34fc503db8cc116dd201bcc7f54a4b6f20 UNKNOWN
   * da303c24e513b0678794526c6351003b6e76cb6d UNKNOWN
   * 4133ee287c23af5da472db1098c9d7ab51b81cb9 UNKNOWN
   * 53b3a6c9758d6706c44e96e78fbacdd385514810 Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=19901)
 
   * 902e3874f3d6b223fc36daeeb94ade1262ab376c Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=19911)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-10734) Temporal joins on heavily filtered tables might fail in planning

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-10734:
-
Component/s: (was: Table SQL / Legacy Planner)
 Table SQL / Planner

> Temporal joins on heavily filtered tables might fail in planning
> 
>
> Key: FLINK-10734
> URL: https://issues.apache.org/jira/browse/FLINK-10734
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.7.0
>Reporter: Piotr Nowojski
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Following query:
> {code}
> val sqlQuery =
>   """
> |SELECT
> |  o.amount * r.rate AS amount
> |FROM
> |  Orders AS o,
> |  LATERAL TABLE (Rates(o.rowtime)) AS r
> |WHERE r.currency = o.currency
> |""".stripMargin
> {code}
> with {{Rates}} defined as follows:
> {code}
> tEnv.registerTable("EuroRatesHistory", 
> tEnv.scan("RatesHistory").filter('currency === "Euro"))
> tEnv.registerFunction(
>   "Rates",
>   tEnv.scan("EuroRatesHistory").createTemporalTableFunction('rowtime, 
> 'currency))
> {code}
> Will fail with:
> {noformat}
> org.apache.flink.table.api.ValidationException: Only single column join key 
> is supported. Found [] in [InnerJoin(where: 
> (__TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, currency)), join: (amount, 
> rowtime, currency, rate, rowtime0))]
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.validateRightPrimaryKey(DataStreamTemporalJoinToCoProcessTranslator.scala:215)
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:183)
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:152)
> {noformat}
> The problem is that filtering condition {{('currency === "Euro")}} interferes 
> with joining condition, simplifying it to nothing. Note how top 
> {{LogicalFilter(condition=[=($3, $1)])}} changes during optimising and 
> finally disappears:
> {noformat}
> LogicalProject(amount=[*($0, $4)])
>   LogicalFilter(condition=[=($3, $1)])
> LogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($2, $5, 
> $3)], joinType=[inner])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
>   LogicalFilter(condition=[=($0, _UTF-16LE'Euro')])
> LogicalTableScan(table=[[_DataStreamTable_1]])
> {noformat}
> {noformat}
> LogicalProject(amount=[*($0, $4)])
>   LogicalFilter(condition=[=(_UTF-16LE'Euro', $1)])
> LogicalProject(amount=[$0], currency=[$1], rowtime=[$2], currency0=[$3], 
> rate=[$4], rowtime0=[CAST($5):TIMESTAMP(3) NOT NULL])
>   LogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($2, $5, 
> $3)], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_0]])
> LogicalFilter(condition=[=($0, _UTF-16LE'Euro')])
>   LogicalTableScan(table=[[_DataStreamTable_1]])
> {noformat}
> {noformat}
> FlinkLogicalCalc(expr#0..4=[{inputs}], expr#5=[*($t0, $t3)], amount=[$t5])
>   FlinkLogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($1, $4, 
> $2)], joinType=[inner])
> FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[_UTF-16LE'Euro'], 
> expr#4=[=($t3, $t1)], amount=[$t0], rowtime=[$t2], $condition=[$t4])
>   FlinkLogicalNativeTableScan(table=[[_DataStreamTable_0]])
> FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[_UTF-16LE'Euro'], 
> expr#4=[=($t0, $t3)], proj#0..2=[{exprs}], $condition=[$t4])
>   FlinkLogicalNativeTableScan(table=[[_DataStreamTable_1]])
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-10734) Temporal joins on heavily filtered tables might fail in planning

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-10734:
-
Comment: was deleted

(was: This major issue is unassigned and itself and all of its Sub-Tasks have 
not been updated for 30 days. So, it has been labeled "stale-major". If this 
ticket is indeed "major", please either assign yourself or give an update. 
Afterwards, please remove the label. In 7 days the issue will be deprioritized.)

> Temporal joins on heavily filtered tables might fail in planning
> 
>
> Key: FLINK-10734
> URL: https://issues.apache.org/jira/browse/FLINK-10734
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.7.0
>Reporter: Piotr Nowojski
>Priority: Minor
>
> Following query:
> {code}
> val sqlQuery =
>   """
> |SELECT
> |  o.amount * r.rate AS amount
> |FROM
> |  Orders AS o,
> |  LATERAL TABLE (Rates(o.rowtime)) AS r
> |WHERE r.currency = o.currency
> |""".stripMargin
> {code}
> with {{Rates}} defined as follows:
> {code}
> tEnv.registerTable("EuroRatesHistory", 
> tEnv.scan("RatesHistory").filter('currency === "Euro"))
> tEnv.registerFunction(
>   "Rates",
>   tEnv.scan("EuroRatesHistory").createTemporalTableFunction('rowtime, 
> 'currency))
> {code}
> Will fail with:
> {noformat}
> org.apache.flink.table.api.ValidationException: Only single column join key 
> is supported. Found [] in [InnerJoin(where: 
> (__TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, currency)), join: (amount, 
> rowtime, currency, rate, rowtime0))]
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.validateRightPrimaryKey(DataStreamTemporalJoinToCoProcessTranslator.scala:215)
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:183)
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:152)
> {noformat}
> The problem is that filtering condition {{('currency === "Euro")}} interferes 
> with joining condition, simplifying it to nothing. Note how top 
> {{LogicalFilter(condition=[=($3, $1)])}} changes during optimising and 
> finally disappears:
> {noformat}
> LogicalProject(amount=[*($0, $4)])
>   LogicalFilter(condition=[=($3, $1)])
> LogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($2, $5, 
> $3)], joinType=[inner])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
>   LogicalFilter(condition=[=($0, _UTF-16LE'Euro')])
> LogicalTableScan(table=[[_DataStreamTable_1]])
> {noformat}
> {noformat}
> LogicalProject(amount=[*($0, $4)])
>   LogicalFilter(condition=[=(_UTF-16LE'Euro', $1)])
> LogicalProject(amount=[$0], currency=[$1], rowtime=[$2], currency0=[$3], 
> rate=[$4], rowtime0=[CAST($5):TIMESTAMP(3) NOT NULL])
>   LogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($2, $5, 
> $3)], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_0]])
> LogicalFilter(condition=[=($0, _UTF-16LE'Euro')])
>   LogicalTableScan(table=[[_DataStreamTable_1]])
> {noformat}
> {noformat}
> FlinkLogicalCalc(expr#0..4=[{inputs}], expr#5=[*($t0, $t3)], amount=[$t5])
>   FlinkLogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($1, $4, 
> $2)], joinType=[inner])
> FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[_UTF-16LE'Euro'], 
> expr#4=[=($t3, $t1)], amount=[$t0], rowtime=[$t2], $condition=[$t4])
>   FlinkLogicalNativeTableScan(table=[[_DataStreamTable_0]])
> FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[_UTF-16LE'Euro'], 
> expr#4=[=($t0, $t3)], proj#0..2=[{exprs}], $condition=[$t4])
>   FlinkLogicalNativeTableScan(table=[[_DataStreamTable_1]])
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-10734) Temporal joins on heavily filtered tables might fail in planning

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-10734:
-
Labels:   (was: auto-deprioritized-major)

> Temporal joins on heavily filtered tables might fail in planning
> 
>
> Key: FLINK-10734
> URL: https://issues.apache.org/jira/browse/FLINK-10734
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.7.0
>Reporter: Piotr Nowojski
>Priority: Minor
>
> Following query:
> {code}
> val sqlQuery =
>   """
> |SELECT
> |  o.amount * r.rate AS amount
> |FROM
> |  Orders AS o,
> |  LATERAL TABLE (Rates(o.rowtime)) AS r
> |WHERE r.currency = o.currency
> |""".stripMargin
> {code}
> with {{Rates}} defined as follows:
> {code}
> tEnv.registerTable("EuroRatesHistory", 
> tEnv.scan("RatesHistory").filter('currency === "Euro"))
> tEnv.registerFunction(
>   "Rates",
>   tEnv.scan("EuroRatesHistory").createTemporalTableFunction('rowtime, 
> 'currency))
> {code}
> Will fail with:
> {noformat}
> org.apache.flink.table.api.ValidationException: Only single column join key 
> is supported. Found [] in [InnerJoin(where: 
> (__TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, currency)), join: (amount, 
> rowtime, currency, rate, rowtime0))]
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.validateRightPrimaryKey(DataStreamTemporalJoinToCoProcessTranslator.scala:215)
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:183)
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:152)
> {noformat}
> The problem is that filtering condition {{('currency === "Euro")}} interferes 
> with joining condition, simplifying it to nothing. Note how top 
> {{LogicalFilter(condition=[=($3, $1)])}} changes during optimising and 
> finally disappears:
> {noformat}
> LogicalProject(amount=[*($0, $4)])
>   LogicalFilter(condition=[=($3, $1)])
> LogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($2, $5, 
> $3)], joinType=[inner])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
>   LogicalFilter(condition=[=($0, _UTF-16LE'Euro')])
> LogicalTableScan(table=[[_DataStreamTable_1]])
> {noformat}
> {noformat}
> LogicalProject(amount=[*($0, $4)])
>   LogicalFilter(condition=[=(_UTF-16LE'Euro', $1)])
> LogicalProject(amount=[$0], currency=[$1], rowtime=[$2], currency0=[$3], 
> rate=[$4], rowtime0=[CAST($5):TIMESTAMP(3) NOT NULL])
>   LogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($2, $5, 
> $3)], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_0]])
> LogicalFilter(condition=[=($0, _UTF-16LE'Euro')])
>   LogicalTableScan(table=[[_DataStreamTable_1]])
> {noformat}
> {noformat}
> FlinkLogicalCalc(expr#0..4=[{inputs}], expr#5=[*($t0, $t3)], amount=[$t5])
>   FlinkLogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($1, $4, 
> $2)], joinType=[inner])
> FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[_UTF-16LE'Euro'], 
> expr#4=[=($t3, $t1)], amount=[$t0], rowtime=[$t2], $condition=[$t4])
>   FlinkLogicalNativeTableScan(table=[[_DataStreamTable_0]])
> FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[_UTF-16LE'Euro'], 
> expr#4=[=($t0, $t3)], proj#0..2=[{exprs}], $condition=[$t4])
>   FlinkLogicalNativeTableScan(table=[[_DataStreamTable_1]])
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-10734) Temporal joins on heavily filtered tables might fail in planning

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-10734:
-
Comment: was deleted

(was: This issue was labeled "stale-major" 7 ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Major, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.
)

> Temporal joins on heavily filtered tables might fail in planning
> 
>
> Key: FLINK-10734
> URL: https://issues.apache.org/jira/browse/FLINK-10734
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.7.0
>Reporter: Piotr Nowojski
>Priority: Minor
>
> Following query:
> {code}
> val sqlQuery =
>   """
> |SELECT
> |  o.amount * r.rate AS amount
> |FROM
> |  Orders AS o,
> |  LATERAL TABLE (Rates(o.rowtime)) AS r
> |WHERE r.currency = o.currency
> |""".stripMargin
> {code}
> with {{Rates}} defined as follows:
> {code}
> tEnv.registerTable("EuroRatesHistory", 
> tEnv.scan("RatesHistory").filter('currency === "Euro"))
> tEnv.registerFunction(
>   "Rates",
>   tEnv.scan("EuroRatesHistory").createTemporalTableFunction('rowtime, 
> 'currency))
> {code}
> Will fail with:
> {noformat}
> org.apache.flink.table.api.ValidationException: Only single column join key 
> is supported. Found [] in [InnerJoin(where: 
> (__TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, currency)), join: (amount, 
> rowtime, currency, rate, rowtime0))]
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.validateRightPrimaryKey(DataStreamTemporalJoinToCoProcessTranslator.scala:215)
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:183)
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:152)
> {noformat}
> The problem is that filtering condition {{('currency === "Euro")}} interferes 
> with joining condition, simplifying it to nothing. Note how top 
> {{LogicalFilter(condition=[=($3, $1)])}} changes during optimising and 
> finally disappears:
> {noformat}
> LogicalProject(amount=[*($0, $4)])
>   LogicalFilter(condition=[=($3, $1)])
> LogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($2, $5, 
> $3)], joinType=[inner])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
>   LogicalFilter(condition=[=($0, _UTF-16LE'Euro')])
> LogicalTableScan(table=[[_DataStreamTable_1]])
> {noformat}
> {noformat}
> LogicalProject(amount=[*($0, $4)])
>   LogicalFilter(condition=[=(_UTF-16LE'Euro', $1)])
> LogicalProject(amount=[$0], currency=[$1], rowtime=[$2], currency0=[$3], 
> rate=[$4], rowtime0=[CAST($5):TIMESTAMP(3) NOT NULL])
>   LogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($2, $5, 
> $3)], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_0]])
> LogicalFilter(condition=[=($0, _UTF-16LE'Euro')])
>   LogicalTableScan(table=[[_DataStreamTable_1]])
> {noformat}
> {noformat}
> FlinkLogicalCalc(expr#0..4=[{inputs}], expr#5=[*($t0, $t3)], amount=[$t5])
>   FlinkLogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($1, $4, 
> $2)], joinType=[inner])
> FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[_UTF-16LE'Euro'], 
> expr#4=[=($t3, $t1)], amount=[$t0], rowtime=[$t2], $condition=[$t4])
>   FlinkLogicalNativeTableScan(table=[[_DataStreamTable_0]])
> FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[_UTF-16LE'Euro'], 
> expr#4=[=($t0, $t3)], proj#0..2=[{exprs}], $condition=[$t4])
>   FlinkLogicalNativeTableScan(table=[[_DataStreamTable_1]])
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #16351: [FLINK-22972][datastream] Remove StreamOperator#dispose in favour of close and finish

2021-07-05 Thread GitBox


flinkbot edited a comment on pull request #16351:
URL: https://github.com/apache/flink/pull/16351#issuecomment-872790645


   
   ## CI report:
   
   * 68484a60cd14c0412ebed7b4d143e6ae14c342d5 Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=19896)
 
   * e6e93af590763a4828be5a385fce62ba0c138eb5 UNKNOWN
   * 05673474e842d6b5ca7ab3c7f4a702d37660322c Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=19921)
 
   * c134bf9ea698b23a3122a7f59c79f6b1c84512d6 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #16363: [FLINK-23225][doc-zh]Fixed some mismatches in the translation pages

2021-07-05 Thread GitBox


flinkbot edited a comment on pull request #16363:
URL: https://github.com/apache/flink/pull/16363#issuecomment-873435468


   
   ## CI report:
   
   * 18919f28f549a3ec1483be025bdd8e1db061f5cf Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=19862)
 
   * 4c7d15553969e2f13b3dc3e1a1a23f840acf4860 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=19926)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-10795) STDDEV_POP error

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-10795.

Resolution: Won't Fix

Since the old planner has been removed, I will close this issue now. Please 
reopen if you think that this topic has not been addressed.

> STDDEV_POP error
> 
>
> Key: FLINK-10795
> URL: https://issues.apache.org/jira/browse/FLINK-10795
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Legacy Planner
>Affects Versions: 1.6.2
>Reporter: Flavio Pompermaier
>Priority: Major
>  Labels: auto-unassigned
> Attachments: FlinkTableApiError.java, test.tsv
>
>
> if using STDDEV_POP in the attached job the following error is thrown (with 
> Flink 1.6.1):
>  
> {code:java}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: 
> java.lang.NumberFormatException
>  at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
>  at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:235)
>  at 
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
>  at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)
>  at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
>  at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
>  at 
> it.okkam.datalinks.batch.flink.operations.FlinkTableApiError.main(FlinkTableApiError.java:466)
> Caused by: java.lang.NumberFormatException
>  at java.math.BigDecimal.(BigDecimal.java:494)
>  at java.math.BigDecimal.(BigDecimal.java:383)
>  at java.math.BigDecimal.(BigDecimal.java:806)
>  at java.math.BigDecimal.valueOf(BigDecimal.java:1274)
>  at org.apache.calcite.runtime.SqlFunctions.sround(SqlFunctions.java:1242)
>  at DataSetCalcRule$6909.flatMap(Unknown Source)
>  at 
> org.apache.flink.table.runtime.FlatMapRunner.flatMap(FlatMapRunner.scala:52)
>  at 
> org.apache.flink.table.runtime.FlatMapRunner.flatMap(FlatMapRunner.scala:31)
>  at 
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at DataSetSingleRowJoinRule$6450.join(Unknown Source)
>  at 
> org.apache.flink.table.runtime.MapJoinLeftRunner.flatMap(MapJoinLeftRunner.scala:35)
>  at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:109)
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>  at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>  at java.lang.Thread.run(Thread.java:748)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-10734) Temporal joins on heavily filtered tables might fail in planning

2021-07-05 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17374838#comment-17374838
 ] 

Timo Walther commented on FLINK-10734:
--

[~Leonard Xu] I guess this is still an issue? I updated the component for now. 
Or shall we close it?

> Temporal joins on heavily filtered tables might fail in planning
> 
>
> Key: FLINK-10734
> URL: https://issues.apache.org/jira/browse/FLINK-10734
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.7.0
>Reporter: Piotr Nowojski
>Priority: Minor
>
> Following query:
> {code}
> val sqlQuery =
>   """
> |SELECT
> |  o.amount * r.rate AS amount
> |FROM
> |  Orders AS o,
> |  LATERAL TABLE (Rates(o.rowtime)) AS r
> |WHERE r.currency = o.currency
> |""".stripMargin
> {code}
> with {{Rates}} defined as follows:
> {code}
> tEnv.registerTable("EuroRatesHistory", 
> tEnv.scan("RatesHistory").filter('currency === "Euro"))
> tEnv.registerFunction(
>   "Rates",
>   tEnv.scan("EuroRatesHistory").createTemporalTableFunction('rowtime, 
> 'currency))
> {code}
> Will fail with:
> {noformat}
> org.apache.flink.table.api.ValidationException: Only single column join key 
> is supported. Found [] in [InnerJoin(where: 
> (__TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, currency)), join: (amount, 
> rowtime, currency, rate, rowtime0))]
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.validateRightPrimaryKey(DataStreamTemporalJoinToCoProcessTranslator.scala:215)
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:183)
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:152)
> {noformat}
> The problem is that filtering condition {{('currency === "Euro")}} interferes 
> with joining condition, simplifying it to nothing. Note how top 
> {{LogicalFilter(condition=[=($3, $1)])}} changes during optimising and 
> finally disappears:
> {noformat}
> LogicalProject(amount=[*($0, $4)])
>   LogicalFilter(condition=[=($3, $1)])
> LogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($2, $5, 
> $3)], joinType=[inner])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
>   LogicalFilter(condition=[=($0, _UTF-16LE'Euro')])
> LogicalTableScan(table=[[_DataStreamTable_1]])
> {noformat}
> {noformat}
> LogicalProject(amount=[*($0, $4)])
>   LogicalFilter(condition=[=(_UTF-16LE'Euro', $1)])
> LogicalProject(amount=[$0], currency=[$1], rowtime=[$2], currency0=[$3], 
> rate=[$4], rowtime0=[CAST($5):TIMESTAMP(3) NOT NULL])
>   LogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($2, $5, 
> $3)], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_0]])
> LogicalFilter(condition=[=($0, _UTF-16LE'Euro')])
>   LogicalTableScan(table=[[_DataStreamTable_1]])
> {noformat}
> {noformat}
> FlinkLogicalCalc(expr#0..4=[{inputs}], expr#5=[*($t0, $t3)], amount=[$t5])
>   FlinkLogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($1, $4, 
> $2)], joinType=[inner])
> FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[_UTF-16LE'Euro'], 
> expr#4=[=($t3, $t1)], amount=[$t0], rowtime=[$t2], $condition=[$t4])
>   FlinkLogicalNativeTableScan(table=[[_DataStreamTable_0]])
> FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[_UTF-16LE'Euro'], 
> expr#4=[=($t0, $t3)], proj#0..2=[{exprs}], $condition=[$t4])
>   FlinkLogicalNativeTableScan(table=[[_DataStreamTable_1]])
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #16376: [FLINK-23248][datastream] Close SinkWriter when calling dispose

2021-07-05 Thread GitBox


flinkbot edited a comment on pull request #16376:
URL: https://github.com/apache/flink/pull/16376#issuecomment-874094343


   
   ## CI report:
   
   * 03b532b9c7cb0c8e773b183cbbf94bd5dc7c1e86 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=19928)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #16375: [FLINK-23248][datastream] Close SinkWriter when calling dispose

2021-07-05 Thread GitBox


flinkbot edited a comment on pull request #16375:
URL: https://github.com/apache/flink/pull/16375#issuecomment-874094190


   
   ## CI report:
   
   * 83ca0c3b8831c8b2b599cc8c8732525201ac6b1f Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=19927)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #16377: [BP-1.13][FLINK-23182][connectors/rabbitmq] Fix connection leak in RMQSource

2021-07-05 Thread GitBox


flinkbot commented on pull request #16377:
URL: https://github.com/apache/flink/pull/16377#issuecomment-874121353


   
   ## CI report:
   
   * fb86bcdd3eae2cd138de6acffe74f69149cbc31c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #16378: [BP-1.12][FLINK-23182][connectors/rabbitmq] Fix connection leak in RMQSource

2021-07-05 Thread GitBox


flinkbot commented on pull request #16378:
URL: https://github.com/apache/flink/pull/16378#issuecomment-874121463


   
   ## CI report:
   
   * 423bb06676e53788d3f12f523d510242ce82196a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-11222) Change api.scala.DataStream to api.datastream.DataStream for createHarnessTester in HarnessTestBase

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-11222.

Resolution: Won't Fix

Since the old planner has been removed, I will close this issue now. Please 
reopen if you think that this topic has not been addressed.

> Change api.scala.DataStream to api.datastream.DataStream for 
> createHarnessTester in HarnessTestBase
> ---
>
> Key: FLINK-11222
> URL: https://issues.apache.org/jira/browse/FLINK-11222
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Legacy Planner
>Reporter: Hequn Cheng
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> Thanks to FLINK-11074, we can create harness tester from a DataStream which 
> makes easier to write harness test.
> However, it would be better if we change the parameter type from 
> api.scala.DataStream to api.datastream.DataStream for the 
> \{{createHarnessTester()}} method, so that both java.DataStream and 
> scala.DataStream can use this method.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-11398) Add a dedicated phase to materialize time indicators for nodes produce updates

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-11398:
-
Labels:   (was: auto-deprioritized-major auto-unassigned)

> Add a dedicated phase to materialize time indicators for nodes produce updates
> --
>
> Key: FLINK-11398
> URL: https://issues.apache.org/jira/browse/FLINK-11398
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Legacy Planner
>Reporter: Hequn Cheng
>Priority: Minor
>
> As discussed 
> [here|https://github.com/apache/flink/pull/6787#discussion_r247926320], we 
> need a dedicated phase to materialize time indicators for nodes produce 
> updates.
> Details:
> Currently, we materialize time indicators in `RelTimeInidicatorConverter`. We 
> need to introduce another materialize phase that materializes all time 
> attributes on nodes that produce updates. We can not do it inside 
> `RelTimeInidicatorConverter`, because only later, after physical optimization 
> phase, we know whether it is a non-window outer join which will produce 
> updates
> There are a few other things we need to consider.
> - Whether we can unify the two converter phase.
> - Take window with early fire into consideration(not been implemented yet). 
> In this case, we don't need to materialize time indicators even it produces 
> updates.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-11398) Add a dedicated phase to materialize time indicators for nodes produce updates

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-11398:
-
Comment: was deleted

(was: This issue is assigned but has not received an update in 7 days so it has 
been labeled "stale-assigned". If you are still working on the issue, please 
give an update and remove the label. If you are no longer working on the issue, 
please unassign so someone else may work on it. In 7 days the issue will be 
automatically unassigned.)

> Add a dedicated phase to materialize time indicators for nodes produce updates
> --
>
> Key: FLINK-11398
> URL: https://issues.apache.org/jira/browse/FLINK-11398
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Hequn Cheng
>Priority: Minor
>
> As discussed 
> [here|https://github.com/apache/flink/pull/6787#discussion_r247926320], we 
> need a dedicated phase to materialize time indicators for nodes produce 
> updates.
> Details:
> Currently, we materialize time indicators in `RelTimeInidicatorConverter`. We 
> need to introduce another materialize phase that materializes all time 
> attributes on nodes that produce updates. We can not do it inside 
> `RelTimeInidicatorConverter`, because only later, after physical optimization 
> phase, we know whether it is a non-window outer join which will produce 
> updates
> There are a few other things we need to consider.
> - Whether we can unify the two converter phase.
> - Take window with early fire into consideration(not been implemented yet). 
> In this case, we don't need to materialize time indicators even it produces 
> updates.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-11398) Add a dedicated phase to materialize time indicators for nodes produce updates

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-11398:
-
Comment: was deleted

(was: This issue was marked "stale-assigned" and has not received an update in 
7 days. It is now automatically unassigned. If you are still working on it, you 
can assign it to yourself again. Please also give an update about the status of 
the work.)

> Add a dedicated phase to materialize time indicators for nodes produce updates
> --
>
> Key: FLINK-11398
> URL: https://issues.apache.org/jira/browse/FLINK-11398
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Hequn Cheng
>Priority: Minor
>
> As discussed 
> [here|https://github.com/apache/flink/pull/6787#discussion_r247926320], we 
> need a dedicated phase to materialize time indicators for nodes produce 
> updates.
> Details:
> Currently, we materialize time indicators in `RelTimeInidicatorConverter`. We 
> need to introduce another materialize phase that materializes all time 
> attributes on nodes that produce updates. We can not do it inside 
> `RelTimeInidicatorConverter`, because only later, after physical optimization 
> phase, we know whether it is a non-window outer join which will produce 
> updates
> There are a few other things we need to consider.
> - Whether we can unify the two converter phase.
> - Take window with early fire into consideration(not been implemented yet). 
> In this case, we don't need to materialize time indicators even it produces 
> updates.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-11398) Add a dedicated phase to materialize time indicators for nodes produce updates

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-11398:
-
Component/s: (was: Table SQL / Legacy Planner)
 Table SQL / Planner

> Add a dedicated phase to materialize time indicators for nodes produce updates
> --
>
> Key: FLINK-11398
> URL: https://issues.apache.org/jira/browse/FLINK-11398
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Hequn Cheng
>Priority: Minor
>
> As discussed 
> [here|https://github.com/apache/flink/pull/6787#discussion_r247926320], we 
> need a dedicated phase to materialize time indicators for nodes produce 
> updates.
> Details:
> Currently, we materialize time indicators in `RelTimeInidicatorConverter`. We 
> need to introduce another materialize phase that materializes all time 
> attributes on nodes that produce updates. We can not do it inside 
> `RelTimeInidicatorConverter`, because only later, after physical optimization 
> phase, we know whether it is a non-window outer join which will produce 
> updates
> There are a few other things we need to consider.
> - Whether we can unify the two converter phase.
> - Take window with early fire into consideration(not been implemented yet). 
> In this case, we don't need to materialize time indicators even it produces 
> updates.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-11398) Add a dedicated phase to materialize time indicators for nodes produce updates

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-11398:
-
Comment: was deleted

(was: This issue was labeled "stale-major" 7 ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Major, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.
)

> Add a dedicated phase to materialize time indicators for nodes produce updates
> --
>
> Key: FLINK-11398
> URL: https://issues.apache.org/jira/browse/FLINK-11398
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Hequn Cheng
>Priority: Minor
>
> As discussed 
> [here|https://github.com/apache/flink/pull/6787#discussion_r247926320], we 
> need a dedicated phase to materialize time indicators for nodes produce 
> updates.
> Details:
> Currently, we materialize time indicators in `RelTimeInidicatorConverter`. We 
> need to introduce another materialize phase that materializes all time 
> attributes on nodes that produce updates. We can not do it inside 
> `RelTimeInidicatorConverter`, because only later, after physical optimization 
> phase, we know whether it is a non-window outer join which will produce 
> updates
> There are a few other things we need to consider.
> - Whether we can unify the two converter phase.
> - Take window with early fire into consideration(not been implemented yet). 
> In this case, we don't need to materialize time indicators even it produces 
> updates.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-11398) Add a dedicated phase to materialize time indicators for nodes produce updates

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-11398:
-
Comment: was deleted

(was: I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I 
help the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.
)

> Add a dedicated phase to materialize time indicators for nodes produce updates
> --
>
> Key: FLINK-11398
> URL: https://issues.apache.org/jira/browse/FLINK-11398
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Hequn Cheng
>Priority: Minor
>
> As discussed 
> [here|https://github.com/apache/flink/pull/6787#discussion_r247926320], we 
> need a dedicated phase to materialize time indicators for nodes produce 
> updates.
> Details:
> Currently, we materialize time indicators in `RelTimeInidicatorConverter`. We 
> need to introduce another materialize phase that materializes all time 
> attributes on nodes that produce updates. We can not do it inside 
> `RelTimeInidicatorConverter`, because only later, after physical optimization 
> phase, we know whether it is a non-window outer join which will produce 
> updates
> There are a few other things we need to consider.
> - Whether we can unify the two converter phase.
> - Take window with early fire into consideration(not been implemented yet). 
> In this case, we don't need to materialize time indicators even it produces 
> updates.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] XComp commented on a change in pull request #16286: [FLINK-21445] Refactors the PackagedProgramRetriever implementation and adds configuration to PackagedProgram

2021-07-05 Thread GitBox


XComp commented on a change in pull request #16286:
URL: https://github.com/apache/flink/pull/16286#discussion_r663940924



##
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/FromJarEntryClassInformationProvider.java
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.Optional;
+
+/**
+ * {@code FromJarEntryClassInformationProvider} is used for cases where the 
Jar archive is
+ * explicitly specified.
+ */
+public class FromJarEntryClassInformationProvider implements 
EntryClassInformationProvider {
+
+private final File jarFile;
+private final String jobClassName;
+
+/**
+ * Creates a {@code FromJarEntryClassInformationProvider} for a custom Jar 
archive. At least the
+ * {@code jarFile} or the {@code jobClassName} has to be set.
+ *
+ * @param jarFile The Jar archive.
+ * @param jobClassName The name of the job class.
+ * @return The {@code FromJarEntryClassInformationProvider} referring to 
the passed information.
+ */
+public static FromJarEntryClassInformationProvider createFromCustomJar(
+File jarFile, @Nullable String jobClassName) {
+return new FromJarEntryClassInformationProvider(jarFile, jobClassName);

Review comment:
   I rather don't want to change the failure logic as part of this PR. Just 
by looking at the code, it would be possible, I guess. But I'm hesitant to do 
this because it might open other refactoring efforts which I want to avoid.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] fapaul commented on a change in pull request #16333: [FLINK-23182][connectors/rabbitmq] Fix connection leak in RMQSource

2021-07-05 Thread GitBox


fapaul commented on a change in pull request #16333:
URL: https://github.com/apache/flink/pull/16333#discussion_r663939481



##
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##
@@ -273,44 +276,38 @@ public void open(Configuration config) throws Exception {
 @Override
 public void close() throws Exception {
 super.close();
+Exception exception = null;
 
 try {
 if (consumer != null && channel != null) {
 channel.basicCancel(consumer.getConsumerTag());
 }
 } catch (IOException e) {
-throw new RuntimeException(
-"Error while cancelling RMQ consumer on "
-+ queueName
-+ " at "
-+ rmqConnectionConfig.getHost(),
-e);
+exception =
+new RuntimeException(
+"Error while cancelling RMQ consumer on "
++ queueName
++ " at "
++ rmqConnectionConfig.getHost(),
+e);
 }
 
 try {
-if (channel != null) {
-channel.close();
-}
+IOUtils.closeAll(channel, connection);
 } catch (IOException e) {
-throw new RuntimeException(
-"Error while closing RMQ channel with "
-+ queueName
-+ " at "
-+ rmqConnectionConfig.getHost(),
-e);
+exception =
+ExceptionUtils.firstOrSuppressed(
+new RuntimeException(
+"Error while closing RMQ connection with "

Review comment:
   ```suggestion
   "Error while closing RMQ source with "
   ```
   I think we have to generify it now because it can be related to the channel 
or connection.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-23254) Upgrade Test Framework to JUnit 5

2021-07-05 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-23254:
-

 Summary: Upgrade Test Framework to JUnit 5
 Key: FLINK-23254
 URL: https://issues.apache.org/jira/browse/FLINK-23254
 Project: Flink
  Issue Type: New Feature
  Components: Tests
Affects Versions: 1.14.0
Reporter: Qingsheng Ren
 Fix For: 1.14.0


Please see mailing list discussion about the background of this upgrade. 

[{color:#33}https://lists.apache.org/thread.html/r6c8047c7265b8a9f2cb3ef6d6153dd80b94d36ebb03daccf36ab4940%40%3Cdev.flink.apache.org%3E{color}]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-11484) Blink java.util.concurrent.TimeoutException

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-11484:
-
Component/s: (was: Table SQL / Legacy Planner)
 Table SQL / Planner

> Blink java.util.concurrent.TimeoutException
> ---
>
> Key: FLINK-11484
> URL: https://issues.apache.org/jira/browse/FLINK-11484
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.5.5
> Environment: The link of blink source code: 
> [github.com/apache/flink/tree/blink|https://github.com/apache/flink/tree/blink]
>Reporter: pj
>Priority: Minor
>  Labels: auto-deprioritized-major, blink
> Attachments: 1.png, code.png, dashboard.png, error.png, 
> image-2019-02-13-10-54-16-880.png
>
>
> *If I run blink application on yarn and the parallelism number larger than 1.*
> *Following is the command :*
> ./flink run -m yarn-cluster -ynm FLINK_NG_ENGINE_1 -ys 4 -yn 10 -ytm 5120 -p 
> 40 -c XXMain ~/xx.jar
> *Following is the code:*
> {{DataStream outputStream = tableEnv.toAppendStream(curTable, Row.class); 
> outputStream.print();}}
> *{{The whole subtask of application will hang a long time and finally the 
> }}{{toAppendStream()}}{{ function will throw an exception like below:}}*
> {{org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: f5e4f7243d06035202e8fa250c364304) at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:276)
>  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:482) 
> at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeInternal(StreamContextEnvironment.java:85)
>  at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeInternal(StreamContextEnvironment.java:37)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1893)
>  at com.ngengine.main.KafkaMergeMain.startApp(KafkaMergeMain.java:352) at 
> com.ngengine.main.KafkaMergeMain.main(KafkaMergeMain.java:94) at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498) at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:561)
>  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:445)
>  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:419) 
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:786) 
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280) 
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215) at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1029)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1105) 
> at java.security.AccessController.doPrivileged(Native Method) at 
> javax.security.auth.Subject.doAs(Subject.java:422) at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1105) 
> Caused by: java.util.concurrent.TimeoutException at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:834)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)}}{{}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11398) Add a dedicated phase to materialize time indicators for nodes produce updates

2021-07-05 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17374841#comment-17374841
 ] 

Timo Walther commented on FLINK-11398:
--

[~jinyu.zj] I updated the component. This might be interesting for you? 
Otherwise we can also close it if you think it is not applicable to the new 
planner.

> Add a dedicated phase to materialize time indicators for nodes produce updates
> --
>
> Key: FLINK-11398
> URL: https://issues.apache.org/jira/browse/FLINK-11398
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Hequn Cheng
>Priority: Minor
>
> As discussed 
> [here|https://github.com/apache/flink/pull/6787#discussion_r247926320], we 
> need a dedicated phase to materialize time indicators for nodes produce 
> updates.
> Details:
> Currently, we materialize time indicators in `RelTimeInidicatorConverter`. We 
> need to introduce another materialize phase that materializes all time 
> attributes on nodes that produce updates. We can not do it inside 
> `RelTimeInidicatorConverter`, because only later, after physical optimization 
> phase, we know whether it is a non-window outer join which will produce 
> updates
> There are a few other things we need to consider.
> - Whether we can unify the two converter phase.
> - Take window with early fire into consideration(not been implemented yet). 
> In this case, we don't need to materialize time indicators even it produces 
> updates.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12097) Flink SQL hangs in StreamTableEnvironment.sqlUpdate, keeps executing and seems never stop, finally lead to java.lang.OutOfMemoryError: GC overhead limit exceeded

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-12097:
-
Component/s: (was: Table SQL / Legacy Planner)
 Table SQL / Planner

> Flink SQL hangs in StreamTableEnvironment.sqlUpdate, keeps executing and 
> seems never stop, finally lead to java.lang.OutOfMemoryError: GC overhead 
> limit exceeded
> -
>
> Key: FLINK-12097
> URL: https://issues.apache.org/jira/browse/FLINK-12097
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.7.2
>Reporter: xu
>Priority: Minor
>  Labels: auto-deprioritized-major, performance
> Attachments: DSL.txt
>
>
> Hi Experts,
>  There is a Flink application(Version 1.7.2) which is written in Flink SQL, 
> and the SQL in the application is quite long, consists of about 30 tables, 
> 1500 lines in total. When executing, I found it is hanged in 
> StreamTableEnvironment.sqlUpdate, keep executing some code about calcite and 
> the memory usage keeps grown up, after several minutes 
> java.lang.OutOfMemoryError: GC overhead limit exceeded is got.
>   
>  I get some thread dumps:
>          at 
> org.apache.calcite.plan.volcano.RuleQueue.popMatch(RuleQueue.java:475)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:640)
>          at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>          at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>          at 
> org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
>          at 
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
>          at 
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:817)
>          at 
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:777)
>   
>   
>          at java.io.PrintWriter.write(PrintWriter.java:473)
>          at 
> org.apache.calcite.rel.AbstractRelNode$1.explain_(AbstractRelNode.java:415)
>          at 
> org.apache.calcite.rel.externalize.RelWriterImpl.done(RelWriterImpl.java:156)
>          at 
> org.apache.calcite.rel.AbstractRelNode.explain(AbstractRelNode.java:312)
>          at 
> org.apache.calcite.rel.AbstractRelNode.computeDigest(AbstractRelNode.java:420)
>          at 
> org.apache.calcite.rel.AbstractRelNode.recomputeDigest(AbstractRelNode.java:356)
>          at 
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:350)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1484)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
>          at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)
>          at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
>          at 
> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
>          at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>          at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>          at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>          at 
> org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
>          at 
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
>          at 
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:817)
>        

[jira] [Issue Comment Deleted] (FLINK-12097) Flink SQL hangs in StreamTableEnvironment.sqlUpdate, keeps executing and seems never stop, finally lead to java.lang.OutOfMemoryError: GC overhead limit ex

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-12097:
-
Comment: was deleted

(was: This major issue is unassigned and itself and all of its Sub-Tasks have 
not been updated for 30 days. So, it has been labeled "stale-major". If this 
ticket is indeed "major", please either assign yourself or give an update. 
Afterwards, please remove the label. In 7 days the issue will be deprioritized.)

> Flink SQL hangs in StreamTableEnvironment.sqlUpdate, keeps executing and 
> seems never stop, finally lead to java.lang.OutOfMemoryError: GC overhead 
> limit exceeded
> -
>
> Key: FLINK-12097
> URL: https://issues.apache.org/jira/browse/FLINK-12097
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.7.2
>Reporter: xu
>Priority: Minor
>  Labels: performance
> Attachments: DSL.txt
>
>
> Hi Experts,
>  There is a Flink application(Version 1.7.2) which is written in Flink SQL, 
> and the SQL in the application is quite long, consists of about 30 tables, 
> 1500 lines in total. When executing, I found it is hanged in 
> StreamTableEnvironment.sqlUpdate, keep executing some code about calcite and 
> the memory usage keeps grown up, after several minutes 
> java.lang.OutOfMemoryError: GC overhead limit exceeded is got.
>   
>  I get some thread dumps:
>          at 
> org.apache.calcite.plan.volcano.RuleQueue.popMatch(RuleQueue.java:475)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:640)
>          at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>          at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>          at 
> org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
>          at 
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
>          at 
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:817)
>          at 
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:777)
>   
>   
>          at java.io.PrintWriter.write(PrintWriter.java:473)
>          at 
> org.apache.calcite.rel.AbstractRelNode$1.explain_(AbstractRelNode.java:415)
>          at 
> org.apache.calcite.rel.externalize.RelWriterImpl.done(RelWriterImpl.java:156)
>          at 
> org.apache.calcite.rel.AbstractRelNode.explain(AbstractRelNode.java:312)
>          at 
> org.apache.calcite.rel.AbstractRelNode.computeDigest(AbstractRelNode.java:420)
>          at 
> org.apache.calcite.rel.AbstractRelNode.recomputeDigest(AbstractRelNode.java:356)
>          at 
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:350)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1484)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
>          at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)
>          at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
>          at 
> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
>          at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>          at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>          at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>          at 
> org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironme

[jira] [Updated] (FLINK-12097) Flink SQL hangs in StreamTableEnvironment.sqlUpdate, keeps executing and seems never stop, finally lead to java.lang.OutOfMemoryError: GC overhead limit exceeded

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-12097:
-
Labels: performance  (was: auto-deprioritized-major performance)

> Flink SQL hangs in StreamTableEnvironment.sqlUpdate, keeps executing and 
> seems never stop, finally lead to java.lang.OutOfMemoryError: GC overhead 
> limit exceeded
> -
>
> Key: FLINK-12097
> URL: https://issues.apache.org/jira/browse/FLINK-12097
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.7.2
>Reporter: xu
>Priority: Minor
>  Labels: performance
> Attachments: DSL.txt
>
>
> Hi Experts,
>  There is a Flink application(Version 1.7.2) which is written in Flink SQL, 
> and the SQL in the application is quite long, consists of about 30 tables, 
> 1500 lines in total. When executing, I found it is hanged in 
> StreamTableEnvironment.sqlUpdate, keep executing some code about calcite and 
> the memory usage keeps grown up, after several minutes 
> java.lang.OutOfMemoryError: GC overhead limit exceeded is got.
>   
>  I get some thread dumps:
>          at 
> org.apache.calcite.plan.volcano.RuleQueue.popMatch(RuleQueue.java:475)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:640)
>          at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>          at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>          at 
> org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
>          at 
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
>          at 
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:817)
>          at 
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:777)
>   
>   
>          at java.io.PrintWriter.write(PrintWriter.java:473)
>          at 
> org.apache.calcite.rel.AbstractRelNode$1.explain_(AbstractRelNode.java:415)
>          at 
> org.apache.calcite.rel.externalize.RelWriterImpl.done(RelWriterImpl.java:156)
>          at 
> org.apache.calcite.rel.AbstractRelNode.explain(AbstractRelNode.java:312)
>          at 
> org.apache.calcite.rel.AbstractRelNode.computeDigest(AbstractRelNode.java:420)
>          at 
> org.apache.calcite.rel.AbstractRelNode.recomputeDigest(AbstractRelNode.java:356)
>          at 
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:350)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1484)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
>          at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)
>          at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
>          at 
> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
>          at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>          at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>          at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>          at 
> org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
>          at 
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
>          at 
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:817)
>          at 
> org.apache.flink.table.api.TableEnvironme

[jira] [Issue Comment Deleted] (FLINK-12097) Flink SQL hangs in StreamTableEnvironment.sqlUpdate, keeps executing and seems never stop, finally lead to java.lang.OutOfMemoryError: GC overhead limit ex

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-12097:
-
Comment: was deleted

(was: This issue was labeled "stale-major" 7 ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Major, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.
)

> Flink SQL hangs in StreamTableEnvironment.sqlUpdate, keeps executing and 
> seems never stop, finally lead to java.lang.OutOfMemoryError: GC overhead 
> limit exceeded
> -
>
> Key: FLINK-12097
> URL: https://issues.apache.org/jira/browse/FLINK-12097
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.7.2
>Reporter: xu
>Priority: Minor
>  Labels: performance
> Attachments: DSL.txt
>
>
> Hi Experts,
>  There is a Flink application(Version 1.7.2) which is written in Flink SQL, 
> and the SQL in the application is quite long, consists of about 30 tables, 
> 1500 lines in total. When executing, I found it is hanged in 
> StreamTableEnvironment.sqlUpdate, keep executing some code about calcite and 
> the memory usage keeps grown up, after several minutes 
> java.lang.OutOfMemoryError: GC overhead limit exceeded is got.
>   
>  I get some thread dumps:
>          at 
> org.apache.calcite.plan.volcano.RuleQueue.popMatch(RuleQueue.java:475)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:640)
>          at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>          at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>          at 
> org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
>          at 
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
>          at 
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:817)
>          at 
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:777)
>   
>   
>          at java.io.PrintWriter.write(PrintWriter.java:473)
>          at 
> org.apache.calcite.rel.AbstractRelNode$1.explain_(AbstractRelNode.java:415)
>          at 
> org.apache.calcite.rel.externalize.RelWriterImpl.done(RelWriterImpl.java:156)
>          at 
> org.apache.calcite.rel.AbstractRelNode.explain(AbstractRelNode.java:312)
>          at 
> org.apache.calcite.rel.AbstractRelNode.computeDigest(AbstractRelNode.java:420)
>          at 
> org.apache.calcite.rel.AbstractRelNode.recomputeDigest(AbstractRelNode.java:356)
>          at 
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:350)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1484)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
>          at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)
>          at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
>          at 
> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
>          at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>          at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>          at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>          at 
> org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
>          at 
> org.apache.flink.table.api.Ta

[jira] [Commented] (FLINK-12097) Flink SQL hangs in StreamTableEnvironment.sqlUpdate, keeps executing and seems never stop, finally lead to java.lang.OutOfMemoryError: GC overhead limit exceeded

2021-07-05 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17374844#comment-17374844
 ] 

Timo Walther commented on FLINK-12097:
--

I updated the component. It is actually a nice big example for battle testing 
the parser and planner.

> Flink SQL hangs in StreamTableEnvironment.sqlUpdate, keeps executing and 
> seems never stop, finally lead to java.lang.OutOfMemoryError: GC overhead 
> limit exceeded
> -
>
> Key: FLINK-12097
> URL: https://issues.apache.org/jira/browse/FLINK-12097
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.7.2
>Reporter: xu
>Priority: Minor
>  Labels: performance
> Attachments: DSL.txt
>
>
> Hi Experts,
>  There is a Flink application(Version 1.7.2) which is written in Flink SQL, 
> and the SQL in the application is quite long, consists of about 30 tables, 
> 1500 lines in total. When executing, I found it is hanged in 
> StreamTableEnvironment.sqlUpdate, keep executing some code about calcite and 
> the memory usage keeps grown up, after several minutes 
> java.lang.OutOfMemoryError: GC overhead limit exceeded is got.
>   
>  I get some thread dumps:
>          at 
> org.apache.calcite.plan.volcano.RuleQueue.popMatch(RuleQueue.java:475)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:640)
>          at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>          at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>          at 
> org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
>          at 
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
>          at 
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:817)
>          at 
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:777)
>   
>   
>          at java.io.PrintWriter.write(PrintWriter.java:473)
>          at 
> org.apache.calcite.rel.AbstractRelNode$1.explain_(AbstractRelNode.java:415)
>          at 
> org.apache.calcite.rel.externalize.RelWriterImpl.done(RelWriterImpl.java:156)
>          at 
> org.apache.calcite.rel.AbstractRelNode.explain(AbstractRelNode.java:312)
>          at 
> org.apache.calcite.rel.AbstractRelNode.computeDigest(AbstractRelNode.java:420)
>          at 
> org.apache.calcite.rel.AbstractRelNode.recomputeDigest(AbstractRelNode.java:356)
>          at 
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:350)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1484)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
>          at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)
>          at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
>          at 
> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
>          at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
>          at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>          at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>          at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>          at 
> org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>          at 
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
>          at 
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
>          at 
> org.apache.flink.table.api.TableEnvironment.sqlU

[jira] [Closed] (FLINK-12116) Args autocast will cause exception for plan transformation in TableAPI

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-12116.

Resolution: Won't Fix

Since the old planner has been removed, I will close this issue now. Please 
reopen if you think that this topic has not been addressed.

> Args autocast will cause exception for plan transformation in TableAPI
> --
>
> Key: FLINK-12116
> URL: https://issues.apache.org/jira/browse/FLINK-12116
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner
>Affects Versions: 1.6.4, 1.7.2
>Reporter: Xingcan Cui
>Priority: Minor
>  Labels: auto-deprioritized-major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In tableAPI, the automatic typecast for arguments may break their initial 
> structures, which makes {{TreeNode.makeCopy()}} fail.
> Take the {{ConcatWs}} function as an example. It requires a string 
> {{Expression}} sequence for the second parameter of its constructor. If we 
> provide some {{Expressions}} with other types, the planner will try to cast 
> them automatically. However, during this process, the arguments will be 
> incorrectly unwrapped (e.g., {{[f1, f2]}} will be unwrapped to two 
> expressions {{f1.cast(String)}} and {{f2.cast(String)}}) which will cause 
> {{java.lang.IllegalArgumentException: wrong number of arguments}} for 
> {{Constructor.newInstance()}}.
> As a workaround, we can cast these arguments manually.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-12206) cannot query nested fields using Flink SQL

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-12206.

Resolution: Won't Fix

Since the old planner has been removed, I will close this issue now. The new 
type system should be able to deal with this example.

> cannot query nested fields using Flink SQL
> --
>
> Key: FLINK-12206
> URL: https://issues.apache.org/jira/browse/FLINK-12206
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner
>Affects Versions: 1.8.0
>Reporter: Yu Yang
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> We feed list of events with the following RowTypeInfo  to flink,
> {code:java}
> Row(
>   timestamp: Long, 
>   userId: Long,
>   eventType: String, 
>   auxData: Map, 
>   userActions: 
>  Map,
>   diagnostics: Row(hostname: String, ipaddress: String)
> )
> {code}
> and run the following SQL query
> {code:sql}
> SELECT event.userId, event.diagnostics.hostname
> FROM event
> WHERE event.userId < 10;
>  {code}
> We are prompted "Column 'diagnostics.hostname' not found in table 'event'". 
> Do I miss anything while constructuing the RowTypeInfo? Or it is because any 
> SQL validation issue? 
> = 
> The following is the detailed exceptions:
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: SQL validation failed. From line 1, column 28 to line 1, 
> column 47: Column 'diagnostics.hostname' not found in table 'event'
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.table.api.ValidationException: SQL validation 
> failed. From line 1, column 28 to line 1, column 47: Column 
> 'diagnostics.hostname' not found in table 'event'
> at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:109)
> at 
> org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:746)
> at 
> com.pinterest.flink.samples.ThriftRowSerializerSample.main(ThriftRowSerializerSample.java:71)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> ... 9 more
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
> column 28 to line 1, column 47: Column 'diagnostics.hostname' not found in 
> table 'event'
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:783)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:768)
> at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4764)
> at 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:439)
> at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5624)
> at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5606)
> at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:334)
> at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5213)
> at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:435)
> at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSel

[GitHub] [flink] AHeise commented on a change in pull request #16345: [FLINK-18783] Load AkkaRpcSystem through separate classloader

2021-07-05 Thread GitBox


AHeise commented on a change in pull request #16345:
URL: https://github.com/apache/flink/pull/16345#discussion_r663943034



##
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
##
@@ -25,7 +25,7 @@
  * This interface serves as a factory interface for RPC services, with some 
additional utilities
  * that are reliant on implementation details of the RPC service.
  */
-public interface RpcSystem extends RpcSystemUtils {
+public interface RpcSystem extends RpcSystemUtils, AutoCloseable {

Review comment:
   I'd actually always go for `Closeable` unless you need to throw checked 
exceptions beyond `IOException`.
   
   The practical implications are actually quite small except that you can use 
Guava's `Closer` only with `Closeables` but there may also other code that 
still requires the more specific interface.

##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
##
@@ -84,23 +83,8 @@ public void 
testStartMetricActorSystemRespectsThreadPriority() throws Exception
 configuration, "localhost", RpcSystemLoader.load());
 
 try {
-// dirty reflection code to avoid ClassCastExceptions
-final Method getActorSystem = 
rpcService.getClass().getMethod("getActorSystem");
-final Object actorSystem = getActorSystem.invoke(rpcService);
-
-final Method settingsMethod = 
actorSystem.getClass().getMethod("settings");
-final Object settings = settingsMethod.invoke(actorSystem);
-
-final Method configMethod = 
settings.getClass().getMethod("config");
-final Object config = configMethod.invoke(settings);
-
-final Method getIntMethod = config.getClass().getMethod("getInt", 
String.class);
-getIntMethod.setAccessible(true);
 final int threadPriority =
-(int)
-getIntMethod.invoke(
-config, 
"akka.actor.default-dispatcher.thread-priority");
-
+rpcService.execute(() -> 
Thread.currentThread().getPriority()).get();

Review comment:
   👍 

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
##
@@ -1069,7 +1069,7 @@ private void terminateMiniClusterServices() throws 
Exception {
 }
 
 try {
-rpcSystem.cleanup();
+rpcSystem.close();

Review comment:
   Here this whole block would be a few lines of code with Guava `Closer`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] knaufk commented on a change in pull request #236: [FLINK-22771] Add TestContext to Java SDK

2021-07-05 Thread GitBox


knaufk commented on a change in pull request #236:
URL: https://github.com/apache/flink-statefun/pull/236#discussion_r663946418



##
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/TestContext.java
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+/**
+ * An implementation of {@link Context} to to make it easier to test {@link
+ * org.apache.flink.statefun.sdk.java.StatefulFunction}s in isolation. It can 
be instantiated with
+ * the address of the function under test and optionally the address of the 
caller.
+ */
+public class TestContext implements Context {

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] twalthr commented on pull request #8208: [FLINK-12252][TableSQL/Planner] Support not escaped table name for External Catalogs in INSERT SQL statement

2021-07-05 Thread GitBox


twalthr commented on pull request #8208:
URL: https://github.com/apache/flink/pull/8208#issuecomment-874129199


   Since the old planner has been removed, I will close this issue now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] twalthr closed pull request #8208: [FLINK-12252][TableSQL/Planner] Support not escaped table name for External Catalogs in INSERT SQL statement

2021-07-05 Thread GitBox


twalthr closed pull request #8208:
URL: https://github.com/apache/flink/pull/8208


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-12252) Support not escaped table name for External Catalogs in INSERT SQL statement

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-12252.

Resolution: Won't Fix

Since the old planner has been removed, I will close this issue now.

> Support not escaped table name for External Catalogs in INSERT SQL statement
> 
>
> Key: FLINK-12252
> URL: https://issues.apache.org/jira/browse/FLINK-12252
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Legacy Planner
>Affects Versions: 1.8.0, 1.9.0
>Reporter: Artsem Semianenka
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned, 
> pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In case if I want to write SQL stream query which inserting data into the 
> sink which described in the external catalog I have to escape full table 
> description in quotes. Example :
> INSERT INTO +`test.db3.tb3`+ SELECT a,b,c,d FROM test.db2.tb2
> I see a discrepancy in query semantic because the SELECT statement described 
> without quotes.
> I propose to add support of queries without quotes in INSERT statement like 
>  INSERT INTO test.db3.tb3 SELECT a,b,c,d FROM test.db2.tb2
> Pull request is available



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink-statefun] knaufk commented on a change in pull request #236: [FLINK-22771] Add TestContext to Java SDK

2021-07-05 Thread GitBox


knaufk commented on a change in pull request #236:
URL: https://github.com/apache/flink-statefun/pull/236#discussion_r663947605



##
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/TestContext.java
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+/**
+ * An implementation of {@link Context} to to make it easier to test {@link
+ * org.apache.flink.statefun.sdk.java.StatefulFunction}s in isolation. It can 
be instantiated with
+ * the address of the function under test and optionally the address of the 
caller.
+ */
+public class TestContext implements Context {
+
+  private final TestAddressScopedStorage storage;
+  private Address self;
+  private Optional caller;
+
+  private List sentMessages = new ArrayList<>();
+  private List sentEgressMessages = new ArrayList<>();
+
+  private TestContext(Address self, Optional caller) {
+this.self = self;
+this.caller = caller;
+this.storage = new TestAddressScopedStorage();
+  }
+
+  public TestContext(Address self) {

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-12701) Column name alias causes exception when used with where and group-by

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-12701.

Resolution: Won't Fix

Since the old planner has been removed, I will close this issue now. Please 
reopen if you think that this topic has not been addressed.

> Column name alias causes exception when used with where and group-by
> 
>
> Key: FLINK-12701
> URL: https://issues.apache.org/jira/browse/FLINK-12701
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner
>Affects Versions: 1.8.0
>Reporter: Josh Bradt
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Assigning a column an alias containing a space sometimes causes an exception 
> even though the docs suggest this is valid.
> Assume we have a table {{Groups}} that contains a string-typed column called 
> {{name}}. Then the query
> {code:sql}
> SELECT `Groups`.`name` AS `Group Name` FROM `Groups`
> {code}
> works as expected, but
> {code:sql}
> SELECT `Groups`.`name` AS `Group Name` 
> FROM `Groups` 
> WHERE `Groups`.`name` LIKE 'Treat%' 
> ORDER BY `Groups`.`name` ASC
> {code}
> produces the following exception
> {code:java}
> org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException:
>  Invalid tuple field reference "Group Name".
>   at 
> org.apache.flink.api.java.typeutils.RowTypeInfo.getFlatFields(RowTypeInfo.java:97)
>   at 
> org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:266)
>   at 
> org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:223)
>   at org.apache.flink.api.java.DataSet.partitionByRange(DataSet.java:1298)
>   at 
> org.apache.flink.table.plan.nodes.dataset.DataSetSort.translateToPlan(DataSetSort.scala:99)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:498)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:476)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:311)
>   at 
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
>   at org.apache.flink.table.api.Table.insertInto(table.scala:1126)
>   [...]
> {code}
> If you remove the {{WHERE}} clause or the {{ORDER BY}} clause, it works fine. 
> It only fails when both are included. Additionally, it works fine if the 
> column alias ({{AS `Group Name`}}) is removed or if it doesn't contain a 
> space ({{AS `GroupName`}}).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23255) Add JUnit 5 jupiter and vintage engine

2021-07-05 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-23255:
-

 Summary: Add JUnit 5 jupiter and vintage engine
 Key: FLINK-23255
 URL: https://issues.apache.org/jira/browse/FLINK-23255
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Affects Versions: 1.14.0
Reporter: Qingsheng Ren
 Fix For: 1.14.0


Add dependencies for JUnit 5 jupiter for supporting JUnit 5 tests, and vintage 
engine for supporting test cases in JUnit 4 style



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-13372) Timestamp conversion bug in non-blink Table/SQL runtime

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-13372.

Resolution: Won't Fix

Since the old planner has been removed, I will close this issue now. Most 
issues should be solved in the new planner. See also FLIP-37 and FLIP-162.

> Timestamp conversion bug in non-blink Table/SQL runtime
> ---
>
> Key: FLINK-13372
> URL: https://issues.apache.org/jira/browse/FLINK-13372
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner
>Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0, 1.8.1, 1.9.0
>Reporter: Shuyi Chen
>Priority: Major
>  Labels: auto-deprioritized-critical, auto-unassigned
>
> Currently, in the non-blink table/SQL runtime, Flink used 
> SqlFunctions.internalToTimestamp(long v) from Calcite to convert event time 
> (in long) to java.sql.Timestamp.
> {code:java}
>  public static Timestamp internalToTimestamp(long v) { return new Timestamp(v 
> - (long)LOCAL_TZ.getOffset(v)); } {code}
> However, as discussed in the recent Calcite mailing list, 
> SqlFunctions.internalToTimestamp() assumes the input timestamp value is in 
> the current JVM’s default timezone (which is unusual), NOT milliseconds since 
> epoch. And SqlFunctions.internalToTimestamp() is used to convert timestamp 
> value in the current JVM’s default timezone to milliseconds since epoch, 
> which java.sql.Timestamp constructor takes. Therefore, the results will not 
> only be wrong, but change if the job runs in machines on different timezones 
> as well.(The only exception is that all your production machines uses UTC 
> timezone.)
> Here is an example, if the user input value is 0 (00:00:00 UTC on 1 January 
> 1970), and the table/SQL runtime runs in a machine in PST (UTC-8), the output 
> sql.Timestamp after SqlFunctions.internalToTimestamp() will become 2880 
> millisec since epoch (08:00:00 UTC on 1 January 1970); And with the same 
> input, if the table/SQL runtime runs again in a different machine in EST 
> (UTC-5), the output sql.Timestamp after SqlFunctions.internalToTimestamp() 
> will become 1800 millisec since epoch (05:00:00 UTC on 1 January 1970).
> Currently, there are unittests to test the table/SQL API event time 
> input/output (e.g., GroupWindowITCase.testEventTimeTumblingWindow() and 
> SqlITCase.testDistinctAggWithMergeOnEventTimeSessionGroupWindow()). They now 
> all passed because we are comparing the string format of the time which 
> ignores timezone. If you step into the code, the actual java.sql.Timestamp 
> value is wrong and change as the tests run in different timezone (e.g., one 
> can use -Duser.timezone=PST to change the current JVM’s default timezone)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] CecilHarvey commented on pull request #15431: [FLINK-22042][flink-core] Fix reading bigint(20) UNSIGNED type field cannot be cast

2021-07-05 Thread GitBox


CecilHarvey commented on pull request #15431:
URL: https://github.com/apache/flink/pull/15431#issuecomment-874132220


   I adapted  this patch manually to flink 1.12.4 which I'm using and it fixes 
the issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-13781) Use new Expression in RexNodeToExpressionConverter

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-13781.

Resolution: Won't Fix

Since the old planner has been removed, I will close this issue now.

> Use new Expression in RexNodeToExpressionConverter
> --
>
> Key: FLINK-13781
> URL: https://issues.apache.org/jira/browse/FLINK-13781
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Legacy Planner
>Reporter: Jingsong Lee
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-15040) Open function is not called in UDF AggregateFunction

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-15040.

Resolution: Won't Fix

Since the old planner has been removed, I will close this issue now. Please 
reopen if you think that this topic has not been addressed.

> Open function is not called in UDF AggregateFunction
> 
>
> Key: FLINK-15040
> URL: https://issues.apache.org/jira/browse/FLINK-15040
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner
>Reporter: LI Guobao
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> I am trying to register a metric in an aggregate UDF by overriding the *open* 
> function. According to the documentation, the *open* function can be override 
> in order to retrieve the metric group to do the metric registration. But it 
> works only on ScalarFunction not on AggregateFunction. Since the *open* 
> function is not call by AggregateFunction.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink-statefun] knaufk commented on a change in pull request #236: [FLINK-22771] Add TestContext to Java SDK

2021-07-05 Thread GitBox


knaufk commented on a change in pull request #236:
URL: https://github.com/apache/flink-statefun/pull/236#discussion_r663950748



##
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/TestContext.java
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+/**
+ * An implementation of {@link Context} to to make it easier to test {@link
+ * org.apache.flink.statefun.sdk.java.StatefulFunction}s in isolation. It can 
be instantiated with
+ * the address of the function under test and optionally the address of the 
caller.
+ */
+public class TestContext implements Context {
+
+  private final TestAddressScopedStorage storage;
+  private Address self;
+  private Optional caller;
+
+  private List sentMessages = new ArrayList<>();
+  private List sentEgressMessages = new ArrayList<>();
+
+  private TestContext(Address self, Optional caller) {
+this.self = self;
+this.caller = caller;
+this.storage = new TestAddressScopedStorage();
+  }
+
+  public TestContext(Address self) {
+this(self, Optional.empty());
+  }
+
+  public TestContext(Address self, Address caller) {
+this(self, Optional.of(caller));

Review comment:
   Done

##
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/TestContext.java
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+/**
+ * An implementation of {@link Context} to to make it easier to test {@link
+ * org.apache.flink.statefun.sdk.java.StatefulFunction}s in isolation. It can 
be instantiated with
+ * the address of the function under test and optionally the address of the 
caller.
+ */
+public class TestContext implements Context {
+
+  private final TestAddressScopedStorage storage;
+  private Address self;
+  private Optional caller;
+
+  private List sentMessages = new ArrayList<>();
+  private List sentEgressMessages = new ArrayList<>();
+
+  private TestContext(Address self, Optional caller) {
+this.self = self;
+this.caller = caller;
+this.storage = new TestAddressScopedStorage();
+  }
+
+  public TestContext(Address self) {
+this(self, Optional.empty());
+  }
+
+  public TestContext(Address self, Address caller) {
+this(self, Optional.of(caller));
+  }
+
+  @Override
+  public Address self() {
+return self;
+  }
+
+  @Override
+  public Optional caller() {
+return caller;
+  }
+
+  @Override
+  public void send(Message message) {
+sentMessages.add(new Envelope(Dur

[jira] [Closed] (FLINK-15212) PROCTIME attribute causes problems with timestamp times before 1900 ?

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-15212.

Resolution: Won't Fix

Since the old planner has been removed, I will close this issue now. Please 
reopen if you think that this topic has not been addressed.

>  PROCTIME attribute causes problems with timestamp times before 1900 ?
> --
>
> Key: FLINK-15212
> URL: https://issues.apache.org/jira/browse/FLINK-15212
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner
>Affects Versions: 1.9.1
> Environment: flink 1.9.1
> jdk1.8.0_211
> idea2019.3
>Reporter: Rockey Cui
>Priority: Minor
>  Labels: auto-deprioritized-major, easyfix
>
> A simple DataStreamSource with timestamp registered as a table.
>  
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> env.setParallelism(1);
> DataStreamSource stringDataStreamSource = env.fromElements(
> "1001,1002,adc0,1900-01-01 00:00:00.0",
> "1002,1003,adc1,1910-01-01 00:00:00.0",
> "1003,1004,adc2,1920-01-01 00:00:00.0",
> "1004,1005,adc3,1930-01-01 00:00:00.0",
> "1005,1006,adc4,1970-01-01 00:00:00.0",
> ",,adc5,1971-01-01 00:00:00.0"
> );
> TypeInformation[] fieldTypes = new TypeInformation[]{Types.LONG, 
> Types.LONG, Types.STRING, Types.SQL_TIM
> String[] fieldNames = new String[]{"id", "cityId", "url", "clickTime"};
> RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames);
> DataStream stream = stringDataStreamSource.map((MapFunction Row>) s -> {
> String[] split = s.split(",");
> Row row = new Row(split.length);
> for (int i = 0; i < split.length; i++) {
> Object value = null;
> if (fieldTypes[i].equals(Types.STRING)) {
> value = split[i];
> }
> if (fieldTypes[i].equals(Types.LONG)) {
> value = Long.valueOf(split[i]);
> }
> if (fieldTypes[i].equals(Types.INT)) {
> value = Integer.valueOf(split[i]);
> }
> if (fieldTypes[i].equals(Types.DOUBLE)) {
> value = Double.valueOf(split[i]);
> }
> if (fieldTypes[i].equals(Types.SQL_TIMESTAMP)) {
> value = Timestamp.valueOf(split[i]);
> }
> row.setField(i, value);
> }
> //System.out.println(row.toString());
> return row;
> }).returns(rowTypeInfo);
> tableEnv.registerDataStream("user_click_info", stream, String.join(",", 
> fieldNames) + ",www.proctime");
> String sql = "select * from user_click_info";
> Table table = tableEnv.sqlQuery(sql);
> DataStream result = tableEnv.toAppendStream(table, Row.class);
> result.print();
> table.printSchema();
> tableEnv.execute("Test");
> {code}
> result ==>
>  
> root
>  |-- id: BIGINT
>  |-- cityId: BIGINT
>  |-- url: STRING
>  |-- clickTime: TIMESTAMP(3)
>  |-- www: TIMESTAMP(3) *PROCTIME*
>  
> 1001,1002,adc0,{color:#FF}1899-12-31 23:54:17.0{color},2019-12-12 
> 03:37:18.036
> 1002,1003,adc1,1910-01-01 00:00:00.0,2019-12-12 03:37:18.196
> 1003,1004,adc2,1920-01-01 00:00:00.0,2019-12-12 03:37:18.196
> 1004,1005,adc3,1930-01-01 00:00:00.0,2019-12-12 03:37:18.196
> 1005,1006,adc4,1970-01-01 00:00:00.0,2019-12-12 03:37:18.196
> ,,adc5,1971-01-01 00:00:00.0,2019-12-12 03:37:18.196
> 
> without  PROCTIME attribute is OK ==>
>  
> root
>  |-- id: BIGINT
>  |-- cityId: BIGINT
>  |-- url: STRING
>  |-- clickTime: TIMESTAMP(3)
>  
> 1001,1002,adc0,1900-01-01 00:00:00.0
> 1002,1003,adc1,1910-01-01 00:00:00.0
> 1003,1004,adc2,1920-01-01 00:00:00.0
> 1004,1005,adc3,1930-01-01 00:00:00.0
> 1005,1006,adc4,1970-01-01 00:00:00.0
> ,,adc5,1971-01-01 00:00:00.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14297) Temporal Table Function Build Side does not accept a constant key

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-14297:
-
Component/s: (was: Table SQL / Legacy Planner)

> Temporal Table Function Build Side does not accept a constant key
> -
>
> Key: FLINK-14297
> URL: https://issues.apache.org/jira/browse/FLINK-14297
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0
> Environment: Java 1.8, Scala 2.11, Flink 1.9 (pom.xml file attached)
>Reporter: Benoît Paris
>Priority: Minor
>  Labels: auto-deprioritized-major
> Attachments: flink-test-temporal-constant-key-build-side.zip
>
>
> When defining a table that will be used as the build side on a Temporal Table 
> Function, a constant key will not be accepted:
> In:
> {code:java}
> Table ratesHistory = tEnv.sqlQuery(sql);
> TemporalTableFunction rates = 
> ratesHistory.createTemporalTableFunction("r_proctime", "r_currency");
> {code}
>  This crashes: 
> {code:java}
> SELECT 
>  'Eur1' AS r_currency,
>  r_amount, 
>  r_proctime 
> FROM RatesHistory{code}
>  Making a type verification in Calcite fail: 
> RelOptUtil.verifyTypeEquivalence, when trying to join the Lateral Table 
> Function. It seems like this is a corner case in nullability, the error is:  
> {code:java}
> (Blink) 
> Apply rule [LogicalCorrelateToJoinFromTemporalTableFunctionRule] [...]
> (old planner) 
> Apply rule [LogicalCorrelateToTemporalTableJoinRule] [...]
> Exception in thread "main" java.lang.AssertionError: Cannot add expression of 
> different type to set:
> set type is RecordType(
>   [...] VARCHAR(65536) CHARACTER SET "UTF-16LE"  r_currency, 
> [...]) NOT NULL
> expression type is RecordType(
>   [...] CHAR(4)CHARACTER SET "UTF-16LE" NOT NULL r_currency, 
> [...]) NOT NULL{code}
>  (formatting and commenting mine)
> No problem in VARCHAR vs CHAR, as using the following works: 
> {code:java}
> SELECT 
>  COALESCE('Eur1', r_currency) AS r_currency, 
>  r_amount, 
>  r_proctime 
> FROM RatesHistory{code}
>  The problem is coming from nullable vs NOT NULL
> Attached is Java reproduction code, pom.xml, and both blink and old planner 
> logs and stacktraces.
> 
> My speculations on this is that an earlier transformation infers and 
> normalizes the key type (or maybe gets it from the query side?), but the 
> decorrelation and special temporal table function case happens later.
> Reordering the rules could help? Maybe way too heavy handed.
> Or do this 
> [rexBuilder.makeInputRef|https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala#L145]
>  in a type-compatible way.
> 
> This seems to be related to another issue:
> https://issues.apache.org/jira/browse/FLINK-14173
> Where careful support of the the nullability of the build side key in a LEFT 
> JOIN will take part in the output.
> 
> This might seem like a useless use case, but a constant key is the only way 
> to access in SQL a Temporal Table Function for a global value (like querying 
> a global current number)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14173) ANSI-style JOIN with Temporal Table Function fails

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-14173:
-
Component/s: (was: Table SQL / Legacy Planner)

> ANSI-style JOIN with Temporal Table Function fails
> --
>
> Key: FLINK-14173
> URL: https://issues.apache.org/jira/browse/FLINK-14173
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0
> Environment: Java 1.8, Scala 2.11, Flink 1.9 (pom.xml file attached)
>Reporter: Benoît Paris
>Priority: Minor
>  Labels: auto-deprioritized-major
> Attachments: flink-test-temporal-tables-1.9.zip
>
>
> The planner fails to generate a plan for ANSI-style joins with Temporal Table 
> Functions. The Blink planners throws with a "Missing conversion is 
> LogicalTableFunctionScan[convention: NONE -> LOGICAL]" message (and some very 
> fancy graphviz stuff). The old planner does a "This exception indicates that 
> the query uses an unsupported SQL feature."
> This fails:
> {code:java}
>  SELECT 
>o_amount * r_amount AS amount 
>  FROM Orders 
>  JOIN LATERAL TABLE (Rates(o_proctime)) 
>ON r_currency = o_currency {code}
> This works:
> {code:java}
>  SELECT 
>o_amount * r_amount AS amount 
>  FROM Orders 
> , LATERAL TABLE (Rates(o_proctime)) 
>  WHERE r_currency = o_currency{code}
> Reproduction with the attached Java and pom.xml files. Also included: stack 
> traces for both Blink and the old planner.
> I think this is a regression. I remember using ANSI-style joins with a 
> temporal table function successfully in 1.8.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-15563) When using ParquetTableSource, The program hangs until OOM

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-15563.

Resolution: Fixed

Since the old planner has been removed, I will close this issue now. Please 
reopen if you think that this topic has not been addressed.

>  When using ParquetTableSource, The program hangs until OOM
> ---
>
> Key: FLINK-15563
> URL: https://issues.apache.org/jira/browse/FLINK-15563
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner
>Affects Versions: 1.8.1, 1.9.1
>Reporter: sujun
>Priority: Major
>  Labels: Parquet, auto-deprioritized-critical, auto-unassigned, 
> pull-request-available, stale-major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> This is my code:
> {code:java}
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tableEnv = StreamTableEnvironment.create(env)val schema = 
> "{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"log_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"log_from\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"data_source\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"is_scan\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"result\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timelong\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"is_sec\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"time_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"device\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"occur_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"row_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"
> 
> val parquetTableSource: ParquetTableSource = ParquetTableSource
> .builder
> .forParquetSchema(new 
> org.apache.parquet.avro.AvroSchemaConverter().convert(
> org.apache.avro.Schema.parse(schema, true)))
> .path("/Users/sujun/Documents/tmp/login_data")
> .build
> tableEnv.registerTableSource("source",parquetTableSource)
> val t1 = tableEnv.sqlQuery("select log_id,city from source where city = 
> '274' ")
> tableEnv.registerTable("t1",t1)
> val t2 = tableEnv.sqlQuery("select * from t1 where 
> log_id='5927070661978133'")
> t2.toAppendStream[Row].print()
> env.execute()}
> {code}
>  
>  When the two SQLS each have a where condition, the main program will hang 
> until OOM. When the filter push down code of ParquetTableSource is deleted, 
> the program runs normally.
>   
>  Through my debugging, I found that the program hangs in the 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp method
>   
>  May be a bug in the calcite optimizer caused by filter push down code
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-15563) When using ParquetTableSource, The program hangs until OOM

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-15563.

Resolution: Won't Fix

>  When using ParquetTableSource, The program hangs until OOM
> ---
>
> Key: FLINK-15563
> URL: https://issues.apache.org/jira/browse/FLINK-15563
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner
>Affects Versions: 1.8.1, 1.9.1
>Reporter: sujun
>Priority: Major
>  Labels: Parquet, auto-deprioritized-critical, auto-unassigned, 
> pull-request-available, stale-major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> This is my code:
> {code:java}
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tableEnv = StreamTableEnvironment.create(env)val schema = 
> "{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"log_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"log_from\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"data_source\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"is_scan\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"result\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timelong\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"is_sec\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"time_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"device\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"occur_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"row_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"
> 
> val parquetTableSource: ParquetTableSource = ParquetTableSource
> .builder
> .forParquetSchema(new 
> org.apache.parquet.avro.AvroSchemaConverter().convert(
> org.apache.avro.Schema.parse(schema, true)))
> .path("/Users/sujun/Documents/tmp/login_data")
> .build
> tableEnv.registerTableSource("source",parquetTableSource)
> val t1 = tableEnv.sqlQuery("select log_id,city from source where city = 
> '274' ")
> tableEnv.registerTable("t1",t1)
> val t2 = tableEnv.sqlQuery("select * from t1 where 
> log_id='5927070661978133'")
> t2.toAppendStream[Row].print()
> env.execute()}
> {code}
>  
>  When the two SQLS each have a where condition, the main program will hang 
> until OOM. When the filter push down code of ParquetTableSource is deleted, 
> the program runs normally.
>   
>  Through my debugging, I found that the program hangs in the 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp method
>   
>  May be a bug in the calcite optimizer caused by filter push down code
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (FLINK-15563) When using ParquetTableSource, The program hangs until OOM

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther reopened FLINK-15563:
--

>  When using ParquetTableSource, The program hangs until OOM
> ---
>
> Key: FLINK-15563
> URL: https://issues.apache.org/jira/browse/FLINK-15563
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner
>Affects Versions: 1.8.1, 1.9.1
>Reporter: sujun
>Priority: Major
>  Labels: Parquet, auto-deprioritized-critical, auto-unassigned, 
> pull-request-available, stale-major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> This is my code:
> {code:java}
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tableEnv = StreamTableEnvironment.create(env)val schema = 
> "{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"log_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"log_from\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"data_source\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"is_scan\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"result\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timelong\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"is_sec\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"time_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"device\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"occur_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"row_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"
> 
> val parquetTableSource: ParquetTableSource = ParquetTableSource
> .builder
> .forParquetSchema(new 
> org.apache.parquet.avro.AvroSchemaConverter().convert(
> org.apache.avro.Schema.parse(schema, true)))
> .path("/Users/sujun/Documents/tmp/login_data")
> .build
> tableEnv.registerTableSource("source",parquetTableSource)
> val t1 = tableEnv.sqlQuery("select log_id,city from source where city = 
> '274' ")
> tableEnv.registerTable("t1",t1)
> val t2 = tableEnv.sqlQuery("select * from t1 where 
> log_id='5927070661978133'")
> t2.toAppendStream[Row].print()
> env.execute()}
> {code}
>  
>  When the two SQLS each have a where condition, the main program will hang 
> until OOM. When the filter push down code of ParquetTableSource is deleted, 
> the program runs normally.
>   
>  Through my debugging, I found that the program hangs in the 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp method
>   
>  May be a bug in the calcite optimizer caused by filter push down code
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-15968) LegacyTypeInfoDataTypeConverter should support conversion from BINARY/VARBINARY to BYTE_PRIMITIVE_ARRAY_TYPE_INFO

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-15968.

Resolution: Won't Fix

LegacyTypeInfoDataTypeConverter is (as the name indicates) legacy. This utility 
will be removed soon.

> LegacyTypeInfoDataTypeConverter should support conversion from 
> BINARY/VARBINARY to BYTE_PRIMITIVE_ARRAY_TYPE_INFO
> -
>
> Key: FLINK-15968
> URL: https://issues.apache.org/jira/browse/FLINK-15968
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner, Table SQL / Planner
>Affects Versions: 1.11.0
>Reporter: Zhenghua Gao
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Currently LegacyTypeInfoDataTypeConverter only support conversion between 
> DataTypes.BYTES and BYTE_PRIMITIVE_ARRAY_TYPE_INFO. When we update connectors 
> to new type system, we need to convert BINARY(p) or VARBINARY(p) to 
> BYTE_PRIMITIVE_ARRAY_TYPE_INFO.  A similar logic has been implemented for 
> CHAR/VARCHAR type.
> The Hive connector achieve this via depending blink planner‘s conversion 
> logic, which is odd because a planner dependency won't be necessary for 
> connectors.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16119) Port base RelNode classes from Scala to Java

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-16119:
-
Component/s: (was: Table SQL / Legacy Planner)

> Port base RelNode classes from Scala to Java
> 
>
> Key: FLINK-16119
> URL: https://issues.apache.org/jira/browse/FLINK-16119
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Hequn Cheng
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Currently, when adding new Flink RelNodes, we have to write a Scala one due 
> to the problem that we can't use the implemented methods of a Scala trait 
> from Java([see 
> details|https://alvinalexander.com/scala/how-to-wrap-scala-traits-used-accessed-java-classes-methods]).
>  Take DataStreamCorrelate as an example, it extends both CommonCorrelate and 
> DataStreamRel and we can't convert DataStreamCorrelate to Java directly. 
> It would be great if we can convert these base RelNode 
> classes(CommonCorrelate, DataStreamRel, etc) from Scala to Java so that we 
> can add new Java RelNodes and convert the existed RelNodes to Java.
> CC [~twalthr]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16119) Port base RelNode classes from Scala to Java

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-16119:
-
Labels:   (was: auto-deprioritized-major)

> Port base RelNode classes from Scala to Java
> 
>
> Key: FLINK-16119
> URL: https://issues.apache.org/jira/browse/FLINK-16119
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Hequn Cheng
>Priority: Minor
>
> Currently, when adding new Flink RelNodes, we have to write a Scala one due 
> to the problem that we can't use the implemented methods of a Scala trait 
> from Java([see 
> details|https://alvinalexander.com/scala/how-to-wrap-scala-traits-used-accessed-java-classes-methods]).
>  Take DataStreamCorrelate as an example, it extends both CommonCorrelate and 
> DataStreamRel and we can't convert DataStreamCorrelate to Java directly. 
> It would be great if we can convert these base RelNode 
> classes(CommonCorrelate, DataStreamRel, etc) from Scala to Java so that we 
> can add new Java RelNodes and convert the existed RelNodes to Java.
> CC [~twalthr]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-16119) Port base RelNode classes from Scala to Java

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-16119:
-
Comment: was deleted

(was: This major issue is unassigned and itself and all of its Sub-Tasks have 
not been updated for 30 days. So, it has been labeled "stale-major". If this 
ticket is indeed "major", please either assign yourself or give an update. 
Afterwards, please remove the label. In 7 days the issue will be deprioritized.)

> Port base RelNode classes from Scala to Java
> 
>
> Key: FLINK-16119
> URL: https://issues.apache.org/jira/browse/FLINK-16119
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Hequn Cheng
>Priority: Minor
>
> Currently, when adding new Flink RelNodes, we have to write a Scala one due 
> to the problem that we can't use the implemented methods of a Scala trait 
> from Java([see 
> details|https://alvinalexander.com/scala/how-to-wrap-scala-traits-used-accessed-java-classes-methods]).
>  Take DataStreamCorrelate as an example, it extends both CommonCorrelate and 
> DataStreamRel and we can't convert DataStreamCorrelate to Java directly. 
> It would be great if we can convert these base RelNode 
> classes(CommonCorrelate, DataStreamRel, etc) from Scala to Java so that we 
> can add new Java RelNodes and convert the existed RelNodes to Java.
> CC [~twalthr]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-16119) Port base RelNode classes from Scala to Java

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-16119:
-
Comment: was deleted

(was: This issue was labeled "stale-major" 7 ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Major, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.
)

> Port base RelNode classes from Scala to Java
> 
>
> Key: FLINK-16119
> URL: https://issues.apache.org/jira/browse/FLINK-16119
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Hequn Cheng
>Priority: Minor
>
> Currently, when adding new Flink RelNodes, we have to write a Scala one due 
> to the problem that we can't use the implemented methods of a Scala trait 
> from Java([see 
> details|https://alvinalexander.com/scala/how-to-wrap-scala-traits-used-accessed-java-classes-methods]).
>  Take DataStreamCorrelate as an example, it extends both CommonCorrelate and 
> DataStreamRel and we can't convert DataStreamCorrelate to Java directly. 
> It would be great if we can convert these base RelNode 
> classes(CommonCorrelate, DataStreamRel, etc) from Scala to Java so that we 
> can add new Java RelNodes and convert the existed RelNodes to Java.
> CC [~twalthr]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-15944) Resolve the potential class conflict proplem when depend both planners

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-15944.

Resolution: Won't Fix

Since the old planner has been removed, I will close this issue now.

> Resolve the potential class conflict proplem when depend both planners
> --
>
> Key: FLINK-15944
> URL: https://issues.apache.org/jira/browse/FLINK-15944
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Legacy Planner, Table SQL / Planner
>Reporter: Jark Wu
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> FLINK-15935 raised the potential class conflict problem when the 
> project/application depend old planner and blink planner at the same time. 
> Currently, Calcite classes (fix Calcite bugs) and 
> {{PlannerExpressionParserImpl}} have the same classpath and may lead to 
> problems when they are different. 
> Currently, we keep these classes in sync in both planners manually. However, 
> it's not safe and error-prone. We should figure out a solution for this (we 
> can't remove old planner in the near future).
> A viable solution is having a {{flink-table-planner-common}} module to keep 
> the commonly used classes in both planner.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #16351: [FLINK-22972][datastream] Remove StreamOperator#dispose in favour of close and finish

2021-07-05 Thread GitBox


flinkbot edited a comment on pull request #16351:
URL: https://github.com/apache/flink/pull/16351#issuecomment-872790645


   
   ## CI report:
   
   * 68484a60cd14c0412ebed7b4d143e6ae14c342d5 Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=19896)
 
   * e6e93af590763a4828be5a385fce62ba0c138eb5 UNKNOWN
   * 05673474e842d6b5ca7ab3c7f4a702d37660322c Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=19921)
 
   * c134bf9ea698b23a3122a7f59c79f6b1c84512d6 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=19929)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #16361: [FLINK-23201][streaming] Prints a warning about a possible problem wi…

2021-07-05 Thread GitBox


flinkbot edited a comment on pull request #16361:
URL: https://github.com/apache/flink/pull/16361#issuecomment-873149764


   
   ## CI report:
   
   * 312bdaa1db791ba894ec16d3a563ed6244c85fff Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=19909)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-16315) throw JsonMappingException when using BatchTableEnvironment#explain to get the plan of sql with constant string

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-16315.

Resolution: Won't Fix

Since the old planner has been removed, I will close this issue now.

> throw JsonMappingException when using BatchTableEnvironment#explain to get 
> the plan of sql with constant string  
> -
>
> Key: FLINK-16315
> URL: https://issues.apache.org/jira/browse/FLINK-16315
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner
>Reporter: godfrey he
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> {code:java}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
> tEnv.registerTableSource("MyTable", CommonTestData.getCsvTableSource());
> Table table = tEnv.sqlQuery("select * from MyTable where first = '274' ");
> System.out.println(tEnv.explain(table));
> {code}
> when executing the above code, the following exception will occur.
> {panel:title=exception}
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException:
>  Unexpected character ('U' (code 85)): was expecting comma to separate Object 
> entries
>  at [Source: (String)"{
>   "nodes": [
>   {
>   "id": 2,
>   "type": "source",
>   "pact": "Data Source",
>   "contents": "CsvTableSource(read fields: first, id, score, 
> last)",
>   "parallelism": "8",
>   "global_properties": [
>   { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
> },
>   { "name": "Partitioning Order", "value": "(none)" },
>   { "name": "Uniqueness", "value": "not unique" }
>   ],
>   "local_properties": [
>   { "name": "Order", "value": "(none)" },
>   { "name": "Grouping", "value": "not grouped" },
>   { "name": "Uniq"[truncated 3501 chars]; line: 41, 
> column: 15] (through reference chain: 
> org.apache.flink.table.explain.PlanTree["nodes"]->java.util.ArrayList[1])
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:394)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:365)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:302)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:245)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:27)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:288)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:151)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4202)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3205)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3173)
>   at 
> org.apache.flink.table.explain.PlanJsonParser.getSqlExecutionPlan(PlanJsonParser.java:42)
>   at 
> org.apache.flink.table.api.internal.BatchTableEnvImpl.explain(BatchTableEnvImpl.scala:208)
>   at 
> org.apache.flink.table.api.internal.BatchTableEnvImpl.explain(BatchTableEnvImpl.scala:223)
> {panel}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21239) Upgrade Calcite version to 1.28

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-21239:
-
Component/s: (was: Table SQL / Legacy Planner)

> Upgrade Calcite version to 1.28
> ---
>
> Key: FLINK-21239
> URL: https://issues.apache.org/jira/browse/FLINK-21239
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Priority: Major
>
> The following files should be removed from the Flink code base during an 
> upgrade:
> - org.apache.calcite.rex.RexLiteral



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #16377: [BP-1.13][FLINK-23182][connectors/rabbitmq] Fix connection leak in RMQSource

2021-07-05 Thread GitBox


flinkbot edited a comment on pull request #16377:
URL: https://github.com/apache/flink/pull/16377#issuecomment-874121353


   
   ## CI report:
   
   * fb86bcdd3eae2cd138de6acffe74f69149cbc31c Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=19930)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #16378: [BP-1.12][FLINK-23182][connectors/rabbitmq] Fix connection leak in RMQSource

2021-07-05 Thread GitBox


flinkbot edited a comment on pull request #16378:
URL: https://github.com/apache/flink/pull/16378#issuecomment-874121463


   
   ## CI report:
   
   * 423bb06676e53788d3f12f523d510242ce82196a Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=19931)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #16379: [BP-1.13][FLINK-22722 / FLINK-22766][connector/kafka] Add docs and metrics for Kafka new source

2021-07-05 Thread GitBox


flinkbot commented on pull request #16379:
URL: https://github.com/apache/flink/pull/16379#issuecomment-874137469


   
   ## CI report:
   
   * ca0408634212c0c0ea1b4e7da66095fa10af2a04 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-20785) MapRelDataType for legacy planner has wrong digest

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-20785.

Resolution: Won't Fix

Since the old planner has been removed, I will close this issue now.

> MapRelDataType for legacy planner has wrong digest
> --
>
> Key: FLINK-20785
> URL: https://issues.apache.org/jira/browse/FLINK-20785
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Legacy Planner
>Affects Versions: 1.12.0
>Reporter: Danny Chen
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> In the legacy planner, we create the {{MapRelDataType}} ignoring the key and 
> value nullability. If we implements the digest correctly, it would conflict 
> with the logic.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-19567) JoinITCase.testInnerJoinOutputWithPk is unstable for POJO type inference

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-19567.

Resolution: Won't Fix

Since the old planner has been removed, I will close this issue now.

> JoinITCase.testInnerJoinOutputWithPk is unstable for POJO type inference
> 
>
> Key: FLINK-19567
> URL: https://issues.apache.org/jira/browse/FLINK-19567
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner
>Affects Versions: 1.11.2
>Reporter: Danny Chen
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> The 
> {{org.apache.flink.table.runtime.stream.table.JoinITCase.testInnerJoinOutputWithPk}}
>  is unstable, it fails randomly in the mvn test but success in the IDEA local 
> runner.
> Here is the stacktrace:
> {code:xml}
> [ERROR] 
> testInnerJoinOutputWithPk(org.apache.flink.table.runtime.stream.table.JoinITCase)
>   Time elapsed: 0.044 s  <<< ERROR!
> org.apache.flink.table.codegen.CodeGenException: Incompatible types of 
> expression and result type. 
> Expression[GeneratedExpression(result$19166,isNull$19167,,GenericType,false)]
>  type is [GenericType], result type is 
> [GenericType]
>   at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:379)
>   at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:377)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:377)
>   at 
> org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:295)
>   at 
> org.apache.flink.table.plan.nodes.datastream.StreamScan$class.generateConversionProcessFunction(StreamScan.scala:115)
>   at 
> org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:74)
>   at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamScan.convertToInternalRow(DataStreamScan.scala:46)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18717) reuse MiniCluster in table integration test class ?

2021-07-05 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17374864#comment-17374864
 ] 

Timo Walther commented on FLINK-18717:
--

[~godfreyhe] is this is a valid issue or can we close it?

> reuse MiniCluster in table integration test class ? 
> 
>
> Key: FLINK-18717
> URL: https://issues.apache.org/jira/browse/FLINK-18717
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.11.0
>Reporter: godfrey he
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> before 1.11, {{MiniCluster}} can be reused in each integration test class. 
> (see TestStreamEnvironment#setAsContext) 
> In 1.11, after we correct the execution behavior of TableEnvironment, 
> StreamTableEnvironment and BatchTableEnvironment (see 
> [FLINK-16363|https://issues.apache.org/jira/browse/FLINK-16363], 
> [FLINK-17126|https://issues.apache.org/jira/browse/FLINK-17126]), MiniCluster 
> will be created for each test case even in same test class (see 
> {{org.apache.flink.client.deployment.executors.LocalExecutor}}). It's better 
> we can reuse {{MiniCluster}} like before. One approach is we provide a new 
> kind of  MiniCluster factory (such as: SessionMiniClusterFactory) instead of 
> using  {{PerJobMiniClusterFactory}}. WDYT ?
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18717) reuse MiniCluster in table integration test class ?

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-18717:
-
Component/s: (was: Table SQL / Legacy Planner)

> reuse MiniCluster in table integration test class ? 
> 
>
> Key: FLINK-18717
> URL: https://issues.apache.org/jira/browse/FLINK-18717
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.11.0
>Reporter: godfrey he
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> before 1.11, {{MiniCluster}} can be reused in each integration test class. 
> (see TestStreamEnvironment#setAsContext) 
> In 1.11, after we correct the execution behavior of TableEnvironment, 
> StreamTableEnvironment and BatchTableEnvironment (see 
> [FLINK-16363|https://issues.apache.org/jira/browse/FLINK-16363], 
> [FLINK-17126|https://issues.apache.org/jira/browse/FLINK-17126]), MiniCluster 
> will be created for each test case even in same test class (see 
> {{org.apache.flink.client.deployment.executors.LocalExecutor}}). It's better 
> we can reuse {{MiniCluster}} like before. One approach is we provide a new 
> kind of  MiniCluster factory (such as: SessionMiniClusterFactory) instead of 
> using  {{PerJobMiniClusterFactory}}. WDYT ?
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16320) Can not use sub-queries in the VALUES clause

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-16320:
-
Component/s: (was: Table SQL / Legacy Planner)

> Can not use sub-queries in the VALUES clause 
> -
>
> Key: FLINK-16320
> URL: https://issues.apache.org/jira/browse/FLINK-16320
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.11.0
>Reporter: Dawid Wysakowicz
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> {code}
> StreamExecutionEnvironment sEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(
>   sEnv,
>   EnvironmentSettings.newInstance().useBlinkPlanner().build());
> Table table = tableEnvironment.sqlQuery("SELECT * FROM (VALUES(1), (SELECT 
> 1))");
> tableEnvironment.toRetractStream(table, Row.class).print();
> System.out.println(tableEnvironment.explain(table));
> {code}
> Produces:
> {code}
> == Optimized Logical Plan ==
> Union(all=[true], union=[EXPR$0])
> :- Calc(select=[CAST(1) AS EXPR$0])
> :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
> +- Calc(select=[$f0 AS EXPR$0])
>+- Join(joinType=[LeftOuterJoin], where=[true], select=[ZERO, $f0], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
>   :- Exchange(distribution=[single])
>   :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]], 
> reuse_id=[1])
>   +- Exchange(distribution=[single])
>  +- GroupAggregate(select=[SINGLE_VALUE(EXPR$0) AS $f0])
> +- Exchange(distribution=[single])
>+- Calc(select=[1 AS EXPR$0])
>   +- Reused(reference_id=[1])
> {code}
> which is wrong.
> Legacy planner fails with:
> {code}
> validated type:
> RecordType(INTEGER EXPR$0) NOT NULL
> converted type:
> RecordType(INTEGER NOT NULL EXPR$0) NOT NULL
> rel:
> LogicalProject(EXPR$0=[$0])
>   LogicalUnion(all=[true])
> LogicalProject(EXPR$0=[1])
>   LogicalValues(tuples=[[{ 0 }]])
> LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
>   LogicalProject(EXPR$0=[1])
> LogicalValues(tuples=[[{ 0 }]])
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-16552) Cannot include Option fields in any Table join

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-16552.

Resolution: Won't Fix

Since the old planner has been removed, I will close this issue now. The new 
planner only allows a certain set of classes as input and output. Please reopen 
if you think that this topic has not been addressed.

> Cannot include Option fields in any Table join
> --
>
> Key: FLINK-16552
> URL: https://issues.apache.org/jira/browse/FLINK-16552
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner
>Affects Versions: 1.7.2
>Reporter: Jason Sinn
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> The table API currently fails joins where one of the tables has an option 
> type, even though it is not in the join condition. A reproducible test case:
>  
> {code:java}
> object TestJoinWithOption {
>   case class JoinOne(joinKeyOne: String, otherFieldOne: Option[Int])
>   case class JoinTwo(joinKeyTwo: String, otherFieldTwo: Option[Int])
>   def main(args: Array[String]): Unit = {
> val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(sEnv)
> val testStream1 = sEnv.fromCollection(Seq(JoinOne("key", Some(1
> val testStream2 = sEnv.fromCollection(Seq(JoinTwo("key", Some(2
> val t1 = tEnv.fromDataStream(testStream1)
> val t2 = tEnv.fromDataStream(testStream2)
> val result = t1.join(t2, "joinKeyOne = joinKeyTwo")
> result.toAppendStream[Row].print()
> sEnv.execute()
>   }
> }
> {code}
> Result:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type 'scala.Option' cannot be used in a join operation because it does not 
> implement a proper hashCode() method.Exception in thread "main" 
> org.apache.flink.table.api.ValidationException: Type 'scala.Option' cannot be 
> used in a join operation because it does not implement a proper hashCode() 
> method. at 
> org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:174)
>  at 
> org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:153)
>  at 
> org.apache.flink.table.typeutils.TypeCheckUtils$$anonfun$validateEqualsHashCode$1.apply$mcVI$sp(TypeCheckUtils.scala:149)
>  at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at 
> org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:147)
>  at 
> org.apache.flink.table.runtime.join.NonWindowJoin.(NonWindowJoin.scala:56)
>  at 
> org.apache.flink.table.runtime.join.NonWindowInnerJoin.(NonWindowInnerJoin.scala:45)
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamJoinToCoProcessTranslator.createJoinOperator(DataStreamJoinToCoProcessTranslator.scala:112)
> {code}
> It seems as though this issue has been brought up before in the streams API 
> here: https://issues.apache.org/jira/browse/FLINK-2673
> Expected behaviour: Since the join condition does not contain the option, the 
> resulting schema should look like this (Actually, this was created by 
> result.printSchema)
> {code:java}
> root
>  |-- joinKeyOne: String
>  |-- otherFieldOne: Option[Integer]
>  |-- joinKeyTwo: String
>  |-- otherFieldTwo: Option[Integer] {code}
> Actual behaviour: Runtime exception is thrown above.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18019) The configuration specified in TableConfig may not take effect in certain cases

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-18019:
-
Component/s: (was: Table SQL / Legacy Planner)

> The configuration specified in TableConfig may not take effect in certain 
> cases
> ---
>
> Key: FLINK-18019
> URL: https://issues.apache.org/jira/browse/FLINK-18019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Dian Fu
>Priority: Major
>
> Currently If the following configuration is configured in flink-conf.yaml:
> {code:java}
> state.backend: rocksdb
> state.checkpoints.dir: file:///tmp/flink-checkpoints
> {code}
> and the following configuration is configured via TableConfig:
> {code:java}
> tableConfig.getConfiguration().setString("state.backend.rocksdb.memory.fixed-per-slot",
>  "200MB")
> tableConfig.getConfiguration().setString("taskmanager.memory.task.off-heap.size",
>  "200MB")
> {code}
> Then users submit the job via CliFrontend, the configuration set via 
> TableConfig will not take effect.
> Intuitively, it should be that user specified configuration via 
> TableConfig(has higher priority) and the configuration specified via 
> flink-conf.yaml together determines the configuration of a job. However, it 
> doesn't hold in all cases.
> The root cause is that only the configuration specified in TableConfig in 
> passed to *StreamExecutionEnvironment* during translate to plan. For the 
> above case, as *state.backend* is not specified in TableConfig and so the 
> configuration *state.backend.rocksdb.memory.fixed-per-slot* will not take 
> effect. Please note that in above example, the state backend actually used 
> will be RocksDB without the configuration 
> *state.backend.rocksdb.memory.fixed-per-slot* and 
> *taskmanager.memory.task.off-heap.size.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-16693) Legacy planner incompatible with Timestamp backed by LocalDateTime

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-16693.

Resolution: Won't Fix

Since the old planner has been removed, I will close this issue now. Please 
reopen if you think that this topic has not been addressed.

> Legacy planner incompatible with Timestamp backed by LocalDateTime
> --
>
> Key: FLINK-16693
> URL: https://issues.apache.org/jira/browse/FLINK-16693
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner
>Affects Versions: 1.10.0
>Reporter: Paul Lin
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Recently I upgraded a simple application that inserts static data into a 
> table from 1.9.0 to 1.10.0, and 
> encountered a timestamp type incompatibility problem during the table sink 
> validation.
> The SQL is like:
> ```
> insert into kafka.test.tbl_a # schema: (user_name STRING, user_id INT, 
> login_time TIMESTAMP)
> select ("ann", 1000, TIMESTAMP "2019-12-30 00:00:00")
> ```
> And the error thrown:
> ```
> Field types of query result and registered TableSink `kafka`.`test`.`tbl_a` 
> do not match.
>   Query result schema: [EXPR$0: String, EXPR$1: Integer, EXPR$2: 
> Timestamp]
>   TableSink schema:[user_name: String, user_id: Integer, login_time: 
> LocalDateTime]
> ```
> After some digging, I found the root cause might be that since FLINK-14645 
> timestamp fields defined via TableFactory had been bridged to LocalDateTime, 
> but timestamp functions are still backed by java.sql.Timestamp.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wangyang0918 commented on a change in pull request #16286: [FLINK-21445] Refactors the PackagedProgramRetriever implementation and adds configuration to PackagedProgram

2021-07-05 Thread GitBox


wangyang0918 commented on a change in pull request #16286:
URL: https://github.com/apache/flink/pull/16286#discussion_r663959462



##
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/FromJarEntryClassInformationProvider.java
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.Optional;
+
+/**
+ * {@code FromJarEntryClassInformationProvider} is used for cases where the 
Jar archive is
+ * explicitly specified.
+ */
+public class FromJarEntryClassInformationProvider implements 
EntryClassInformationProvider {
+
+private final File jarFile;
+private final String jobClassName;
+
+/**
+ * Creates a {@code FromJarEntryClassInformationProvider} for a custom Jar 
archive. At least the
+ * {@code jarFile} or the {@code jobClassName} has to be set.
+ *
+ * @param jarFile The Jar archive.
+ * @param jobClassName The name of the job class.
+ * @return The {@code FromJarEntryClassInformationProvider} referring to 
the passed information.
+ */
+public static FromJarEntryClassInformationProvider createFromCustomJar(
+File jarFile, @Nullable String jobClassName) {
+return new FromJarEntryClassInformationProvider(jarFile, jobClassName);

Review comment:
   Make sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2021-07-05 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17374868#comment-17374868
 ] 

Till Rohrmann commented on FLINK-23190:
---

Thanks for reporting this issue [~loyi]. Have you checked how your solution 
would work together with declarative resource management? With this feature we 
no longer send an {{AllocationID}} to the {{ResourceManager}} and, hence, there 
is no longer the information available to spread tasks out evenly. Also with 
the {{AdaptiveScheduler}} you might not know upfront with what parallelism you 
can run your job. How would this work together with your proposal?

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.3
>Reporter: loyi
>Priority: Major
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zentol commented on a change in pull request #16332: [FLINK-22386] Introduce cache for docker images used during testing

2021-07-05 Thread GitBox


zentol commented on a change in pull request #16332:
URL: https://github.com/apache/flink/pull/16332#discussion_r663962444



##
File path: tools/azure-pipelines/jobs-template.yml
##
@@ -243,3 +278,9 @@ jobs:
   inputs:
 targetPath: $(DEBUG_FILES_OUTPUT_DIR)
 artifact: logs-${{parameters.stage_name}}-$(DEBUG_FILES_NAME)
+- script: |
+mkdir -p $(TESTCONTAINER_CACHE_FOLDER)
+docker save $(docker image ls --format "{{.Repository}}:{{.Tag}}" | 
grep -E -- "testcontainers|kafka|elasticsearch|postgres|mysql") -o 
$(TESTCONTAINER_CACHE_FOLDER)/cache.tar || true

Review comment:
   that's not necessarily the case for other tests though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] knaufk commented on a change in pull request #236: [FLINK-22771] Add TestContext to Java SDK

2021-07-05 Thread GitBox


knaufk commented on a change in pull request #236:
URL: https://github.com/apache/flink-statefun/pull/236#discussion_r663963392



##
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/TestContext.java
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+/**
+ * An implementation of {@link Context} to to make it easier to test {@link
+ * org.apache.flink.statefun.sdk.java.StatefulFunction}s in isolation. It can 
be instantiated with
+ * the address of the function under test and optionally the address of the 
caller.
+ */
+public class TestContext implements Context {
+
+  private final TestAddressScopedStorage storage;
+  private Address self;
+  private Optional caller;
+
+  private List sentMessages = new ArrayList<>();
+  private List sentEgressMessages = new ArrayList<>();
+
+  private TestContext(Address self, Optional caller) {
+this.self = self;
+this.caller = caller;
+this.storage = new TestAddressScopedStorage();
+  }
+
+  public TestContext(Address self) {
+this(self, Optional.empty());
+  }
+
+  public TestContext(Address self, Address caller) {
+this(self, Optional.of(caller));
+  }
+
+  @Override
+  public Address self() {
+return self;
+  }
+
+  @Override
+  public Optional caller() {
+return caller;
+  }
+
+  @Override
+  public void send(Message message) {
+sentMessages.add(new Envelope(Duration.ofMillis(0), message));
+  }
+
+  @Override
+  public void sendAfter(Duration duration, Message message) {
+sentMessages.add(new Envelope(duration, message));
+  }
+
+  @Override
+  public void send(EgressMessage message) {
+sentEgressMessages.add(message);
+  }
+
+  @Override
+  public AddressScopedStorage storage() {
+return storage;
+  }
+
+  /**
+   * This method returns a list of all messages sent by this function via 
{@link
+   * Context#send(Message)} or {@link Context#sendAfter(Duration, Message)}.
+   *
+   * Messages are wrapped in an {@link Envelope} that contains the message 
itself and the
+   * duration after which the message was sent. The Duration is {@link 
Duration#ZERO} for messages
+   * sent via {@link Context#send(Message)}.
+   *
+   * @return the list of sent messages wrapped in {@link Envelope}s
+   */
+  public List getSentMessages() {
+return sentMessages;

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] knaufk commented on a change in pull request #236: [FLINK-22771] Add TestContext to Java SDK

2021-07-05 Thread GitBox


knaufk commented on a change in pull request #236:
URL: https://github.com/apache/flink-statefun/pull/236#discussion_r663963573



##
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/TestContext.java
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+/**
+ * An implementation of {@link Context} to to make it easier to test {@link
+ * org.apache.flink.statefun.sdk.java.StatefulFunction}s in isolation. It can 
be instantiated with
+ * the address of the function under test and optionally the address of the 
caller.
+ */
+public class TestContext implements Context {
+
+  private final TestAddressScopedStorage storage;
+  private Address self;
+  private Optional caller;
+
+  private List sentMessages = new ArrayList<>();

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] knaufk commented on a change in pull request #236: [FLINK-22771] Add TestContext to Java SDK

2021-07-05 Thread GitBox


knaufk commented on a change in pull request #236:
URL: https://github.com/apache/flink-statefun/pull/236#discussion_r663963787



##
File path: 
statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/testing/TestContextIntegrationTest.java
##
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.*;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.message.MessageBuilder;
+import org.junit.Test;
+
+public class TestContextIntegrationTest {
+
+  private static class SimpleFunctionUnderTest implements StatefulFunction {
+
+static final TypeName TYPE = 
TypeName.typeNameFromString("com.example.fns/simple-fn");
+
+static final TypeName ANOTHER_TYPE = 
TypeName.typeNameFromString("com.example.fns/another-fn");
+
+static final TypeName SOME_EGRESS = 
TypeName.typeNameFromString("com.example.fns/another-fn");
+
+static final ValueSpec NUM_INVOCATIONS = 
ValueSpec.named("seen").withIntType();
+
+@Override
+public CompletableFuture apply(Context context, Message message) 
throws Throwable {
+
+  String name = message.asUtf8String();

Review comment:
   Removed.

##
File path: 
statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/testing/TestContextIntegrationTest.java
##
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.*;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.message.MessageBuilder;
+import org.junit.Test;
+
+public class TestContextIntegrationTest {
+
+  private static class SimpleFunctionUnderTest implements StatefulFunction {
+
+static final TypeName TYPE = 
TypeName.typeNameFromString("com.example.fns/simple-fn");
+
+static final TypeName ANOTHER_TYPE = 
TypeName.typeNameFromString("com.example.fns/another-fn");
+
+static final TypeName SOME_EGRESS = 
TypeName.typeNameFromString("com.example.fns/another-fn");
+
+static final ValueSpec NUM_INVOCATIONS = 
ValueSpec.named("seen").withIntType();
+
+@Override
+public CompletableFuture apply(Context context, Message message) 
throws Throwable {
+
+  String name = message.asUtf8String();
+
+  AddressScopedStorage storage = context.storage();
+  int numInvocations = storage.get(NUM_INVOCATIONS).orElse(0);
+  storage.set(NUM_INVOCATIONS, numInvocations 

[jira] [Commented] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-05 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17374869#comment-17374869
 ] 

Till Rohrmann commented on FLINK-23218:
---

Thanks for creating this proposal [~Thesharing]. Are there any known drawbacks 
for low scale jobs caused by the compression step?

> Distribute the ShuffleDescriptors via blob server
> -
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhilong Hong
>Priority: Major
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy on the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> h3. Summary
> In summary, the optimization of task deployment is to introduce a cache for 
> the TaskDeploymentDescriptor. We cache the compressed serialized value of 
> ShuffleDescriptors. If the size of the value exceeds a certain threshold, the 
> value would be distributed via the blob server.
> h3. Comparison
> We implemented a POC and conducted an experiment to compare the perf

[GitHub] [flink] zentol commented on a change in pull request #16332: [FLINK-22386] Introduce cache for docker images used during testing

2021-07-05 Thread GitBox


zentol commented on a change in pull request #16332:
URL: https://github.com/apache/flink/pull/16332#discussion_r663964704



##
File path: tools/azure-pipelines/jobs-template.yml
##
@@ -243,3 +278,9 @@ jobs:
   inputs:
 targetPath: $(DEBUG_FILES_OUTPUT_DIR)
 artifact: logs-${{parameters.stage_name}}-$(DEBUG_FILES_NAME)
+- script: |
+mkdir -p $(TESTCONTAINER_CACHE_FOLDER)
+docker save $(docker image ls --format "{{.Repository}}:{{.Tag}}" | 
grep -E -- "testcontainers|kafka|elasticsearch|postgres|mysql") -o 
$(TESTCONTAINER_CACHE_FOLDER)/cache.tar || true

Review comment:
   I won't object to using the pom files, but it's not a long-term solution.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-23230) Cannot compile Flink on MacOS with M1 chip

2021-07-05 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17374871#comment-17374871
 ] 

Till Rohrmann commented on FLINK-23230:
---

Thanks for reporting the issue [~osaman88]. Can you attach some more details 
about what exactly is not working and how it is failing. Could you try 
disabling building the web-ui via {{-Dskip-webui-build}} when running {{mvn}}?

> Cannot compile Flink on MacOS with M1 chip
> --
>
> Key: FLINK-23230
> URL: https://issues.apache.org/jira/browse/FLINK-23230
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.13.1
>Reporter: Osama Neiroukh
>Priority: Minor
>
> Flink doesn't currently compile on MacOS with M1 silicon.
> This is true for all recent versions (1.13.X) as well as master.
> Some of the problems have potentially easy fixes, such as installing node 
> separately or updating the relevant pom.xml to use a newer version of node. I 
> am getting some errors about deprecated features being used which are not 
> supported by newer node, but on the surface they seem easy to resolve. 
> I've had less success with complex dependencies such as protobuf.
> My long term objective is to use and contribute to Flink. If I can get some 
> help with the above issues, I am willing to make the modifications, submit 
> the changes as a pull request, and shepherd them to release. If compilation 
> on MacOS/M1 is not a priority, I can look for a virtual machine solution 
> instead. Feedback appreciated. 
>  
> Thanks
>  
> Osama



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] tillrohrmann commented on pull request #16256: [hotfix][release-notes] Update release notes for 1.11, 1.12, 1.13 with accumulators semantics change in MiniClusterJobClient

2021-07-05 Thread GitBox


tillrohrmann commented on pull request #16256:
URL: https://github.com/apache/flink/pull/16256#issuecomment-874154398


   Hmm, @rmetzger only linked a single merge commit and set the fixVersion to 
`1.13.0`. If this is indeed the case, could you update the JIRA ticket 
@echauchot?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-07-05 Thread GitBox


flinkbot edited a comment on pull request #15924:
URL: https://github.com/apache/flink/pull/15924#issuecomment-841943851


   
   ## CI report:
   
   * c95109768facc0535e3ca1b9da56cf4197fb4ba9 UNKNOWN
   * 1b2cc77ae6ec783bad99accd53467be8c9eb3d91 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=19910)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #16163: [FLINK-22994][Table SQL / Planner] improve the performace of invoking…

2021-07-05 Thread GitBox


flinkbot edited a comment on pull request #16163:
URL: https://github.com/apache/flink/pull/16163#issuecomment-861284972


   
   ## CI report:
   
   * 00f7cd34fc503db8cc116dd201bcc7f54a4b6f20 UNKNOWN
   * da303c24e513b0678794526c6351003b6e76cb6d UNKNOWN
   * 4133ee287c23af5da472db1098c9d7ab51b81cb9 UNKNOWN
   * 902e3874f3d6b223fc36daeeb94ade1262ab376c Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=19911)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #16373: [FLINK-23248][datastream] Close SinkWriter when calling dispose

2021-07-05 Thread GitBox


flinkbot edited a comment on pull request #16373:
URL: https://github.com/apache/flink/pull/16373#issuecomment-874006333


   
   ## CI report:
   
   * 17b0a4b4ab8019860939249f3533187d53be6b85 Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=19914)
 
   * cd8d063735b6e23a31b2069bfb8bf3e28cf52bfc Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=19923)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #16369: [FLINK-22443][streaming-java] Fix overflow in MultipleInputSelectionHandler

2021-07-05 Thread GitBox


flinkbot edited a comment on pull request #16369:
URL: https://github.com/apache/flink/pull/16369#issuecomment-873893547


   
   ## CI report:
   
   * 3bb34b002d5a8952f045088b16f6a4c6a0532527 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=19917)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #16379: [BP-1.13][FLINK-22722 / FLINK-22766][connector/kafka] Add docs and metrics for Kafka new source

2021-07-05 Thread GitBox


flinkbot edited a comment on pull request #16379:
URL: https://github.com/apache/flink/pull/16379#issuecomment-874137469


   
   ## CI report:
   
   * ca0408634212c0c0ea1b4e7da66095fa10af2a04 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=19932)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] knaufk commented on a change in pull request #236: [FLINK-22771] Add TestContext to Java SDK

2021-07-05 Thread GitBox


knaufk commented on a change in pull request #236:
URL: https://github.com/apache/flink-statefun/pull/236#discussion_r663981249



##
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/TestAddressScopedStorage.java
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.util.HashMap;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+
+class TestAddressScopedStorage implements AddressScopedStorage {

Review comment:
   Do you suggest to use `ConcurrentAddressScopedStorage` instead of the 
TestAdressScopedStorage? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-statefun] knaufk commented on a change in pull request #236: [FLINK-22771] Add TestContext to Java SDK

2021-07-05 Thread GitBox


knaufk commented on a change in pull request #236:
URL: https://github.com/apache/flink-statefun/pull/236#discussion_r663981529



##
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/Envelope.java
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.time.Duration;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+/**
+ * A utility class that wraps a {@link Message} and the {@link Duration} after 
which it was sent by
+ * a {@link org.apache.flink.statefun.sdk.java.StatefulFunction}. It is used 
by the {@link
+ * TestContext}.
+ */
+public class Envelope {

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] fapaul commented on a change in pull request #15972: Add common source and operator metrics.

2021-07-05 Thread GitBox


fapaul commented on a change in pull request #15972:
URL: https://github.com/apache/flink/pull/15972#discussion_r663972500



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceMetricGroup.java
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SettableGauge;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.metrics.groups.SourceMetricGroup;
+import org.apache.flink.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.util.clock.Clock;
+
+/** Special {@link org.apache.flink.metrics.MetricGroup} representing an 
Operator. */
+@Internal
+public class InternalSourceMetricGroup extends 
ProxyMetricGroup
+implements SourceMetricGroup {
+
+public static final long ACTIVE = Long.MAX_VALUE;
+private final Clock clock;
+private final Counter numRecordsInErrors;
+// only if source emits at least one watermark
+private SettableGauge watermarkGauge;
+// only if records with event timestamp are emitted
+private SettableGauge eventTimeGauge;
+private long idleStartTime = ACTIVE;
+
+public InternalSourceMetricGroup(OperatorMetricGroup parentMetricGroup, 
Clock clock) {
+super(parentMetricGroup);
+numRecordsInErrors = 
parentMetricGroup.counter(MetricNames.NUM_RECORDS_IN_ERRORS);
+this.clock = clock;
+parentMetricGroup.gauge(
+MetricNames.SOURCE_IDLE_TIME_GAUGE,
+() -> isIdling() ? 0 : this.clock.absoluteTimeMillis() - 
idleStartTime);
+}
+
+private boolean isIdling() {
+return idleStartTime == ACTIVE;

Review comment:
   Do we really provide much value with the source-specific idle value in 
comparison to the overall `idleTimeMs`?

##
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
##
@@ -29,6 +33,17 @@
  * The interface for a source reader which is responsible for reading the 
records from the source
  * splits assigned by {@link SplitEnumerator}.
  *
+ * Implementations should can provide the following metrics:

Review comment:
   ```suggestion
* Implementations can provide the following metrics:
   ```
   ?

##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
##
@@ -18,12 +18,22 @@
 
 package org.apache.flink.connector.base.source.reader;
 
+import org.apache.flink.metrics.groups.SourceMetricGroup;
+
 import javax.annotation.Nullable;
 
 import java.util.Set;
 
 /** An interface for the elements passed from the fetchers to the source 
reader. */
 public interface RecordsWithSplitIds {
+/**
+ * Returns the timestamp of the last fetch. Will be used to automatically 
set {@link
+ * SourceMetricGroup#addLastFetchTimeGauge(Gauge)}.
+ */
+@Nullable
+default Long lastFetchTime() {

Review comment:
   Is there some guidance when deciding between returning `null` by default 
or using `Optional`?

##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##
@@ -291,13 +300,23 @@ public InputStatus emitNext(DataOutput output) 
throws Exception {
 
 // short circuit the common case (every invocation except the first)
 if (currentMainOutput != null) {
-return sourceReader.pollNext(currentMainOutput);
+return pollNext();
 }
 
 // this creates a batch or streaming output based on the runtime mode
-currentMainOutput = eventTimeLogic.createMainOutput(output);
+currentMainOutput =
+eventTimeLogic.createMainOutput(
+new MetricTrackingOutput<>(output, sourceMetricGroup));
 lastInvokedOutput = output;
-return sourceRe

[jira] [Commented] (FLINK-23237) Add log to print data that failed to deserialize when ignore-parse-error=true and threre is NPE question when use `new String(message) ` if message = null

2021-07-05 Thread GongZhongQiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17374883#comment-17374883
 ] 

GongZhongQiang commented on FLINK-23237:


Please read this code carefully.There is no problem.

> Add log to print data that failed to deserialize when  
> ignore-parse-error=true and threre is NPE question when use `new 
> String(message) ` if message = null  
> -
>
> Key: FLINK-23237
> URL: https://issues.apache.org/jira/browse/FLINK-23237
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: hehuiyuan
>Priority: Minor
>
> (1)Add log to print error data that failed to deserialize when set 
> `ignore-parse-error` = `true`
> (2)Threre is NPE question when use `new String(message) `  if message is null 
> ,which causes IOException info is not displayed.
> {code:java}
> public RowData deserialize(@Nullable byte[] message) throws IOException {
> if (message == null) {
> return null;
> }
> try {
> final JsonNode root = objectReader.readValue(message);
> return (RowData) runtimeConverter.convert(root);
> } catch (Throwable t) {
> if (ignoreParseErrors) {
> return null;
> }
> throw new IOException(
> String.format("Failed to deserialize CSV row '%s'.", new 
> String(message)), t);
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #16173: [FLINK-15031][runtime] Calculate required shuffle memory before allocating slots if resources are specified

2021-07-05 Thread GitBox


flinkbot edited a comment on pull request #16173:
URL: https://github.com/apache/flink/pull/16173#issuecomment-862370157


   
   ## CI report:
   
   * ed948a42825ed6511e31c778c727328fc4150c72 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=19912)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #16363: [FLINK-23225][doc-zh]Fixed some mismatches in the translation pages

2021-07-05 Thread GitBox


flinkbot edited a comment on pull request #16363:
URL: https://github.com/apache/flink/pull/16363#issuecomment-873435468


   
   ## CI report:
   
   * 4c7d15553969e2f13b3dc3e1a1a23f840acf4860 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=19926)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zck573693104 commented on a change in pull request #16295: [FLINK-11622][chinese-translation,Documentation]Translate the "Command-Line Interface" page into Chinese

2021-07-05 Thread GitBox


zck573693104 commented on a change in pull request #16295:
URL: https://github.com/apache/flink/pull/16295#discussion_r663961148



##
File path: docs/content.zh/docs/deployment/cli.md
##
@@ -25,37 +25,31 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+ 
 # 命令行界面
 
-Flink provides a Command-Line Interface (CLI) `bin/flink` to run programs that 
-are packaged as JAR files and to control their execution. The CLI is part of 
any 
-Flink setup, available in local single node setups and in distributed setups. 
-It connects to the running JobManager specified in `conf/flink-config.yaml`.
+Flink提供了命令界面(CLI)`bin/flink` 来运行 JAR 格式的程序,同时控制其执行。该 CLI 作为所有 Flink 
安装配置的一部分,在单节点或分布式安装的方式中都可以使用。命令行程序与运行中的 JobManager 建立连接来通信,JobManager 
的连接信息可以通过`conf/flink-config.yaml`指定。

Review comment:
   “Flink提供了命令界面”
   Flink提供了命令行界面。加一个“行”是不是好点。
   该 CLI 作为所有 Flink 安装配置的一部分
   “所有”去掉是不是好点。

##
File path: docs/content.zh/docs/deployment/cli.md
##
@@ -139,26 +134,25 @@ Waiting for response...
 Savepoint '/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab' disposed.
 ```
 
-If you use custom state instances (for example custom reducing state or 
RocksDB state), you have to 
-specify the path to the program JAR with which the savepoint was triggered. 
Otherwise, you will run 
-into a `ClassNotFoundException`:
+如果使用自定义状态实例(例如自定义 reducing 状态或 RocksDB 状态),则必须指定触发 savepoint 的  JAR 
程序路径。否则,会遇到 `ClassNotFoundException` 异常 :
+
 ```bash
 $ ./bin/flink savepoint \
   --dispose  \ 
   --jarfile 
 ```
 
-Triggering the savepoint disposal through the `savepoint` action does not only 
remove the data from 
-the storage but makes Flink clean up the savepoint-related metadata as well.
+通过 `savepoint` 操作触发 savepoint 废弃,不仅会将数据从存储中删除,还会使 Flink 清理与 savepoint 相关的元数据。
+
+ 
 
-### Terminating a Job
+### 终止作业
 
- Stopping a Job Gracefully Creating a Final Savepoint
+ 
 
-Another action for stopping a job is `stop`. It is a more graceful way of 
stopping a running streaming 
-job as the `stop`  flows from source to sink. When the user requests to stop a 
job, all sources will 
-be requested to send the last checkpoint barrier that will trigger a 
savepoint, and after the successful 
-completion of that savepoint, they will finish by calling their `cancel()` 
method. 
+ 优雅地终止作业并创建最终 Savepoint
+
+终止作业运行的操作是 `stop`。`stop` 操作停止从 source 到 sink 的作业流,是一种更优雅的终止方式。当用户请求终止一项作业时,所有的 
sources 将被要求发送最后的 checkpoint 障碍,这会触发创建 checkpoint ,在成功完成 checkpoint 的创建后,Flink 
会调用 `cancel()` 方法来终止作业。

Review comment:
   “checkpoint 障碍” 修改为“checkpoint 屏障”是不是更好点

##
File path: docs/content.zh/docs/deployment/cli.md
##
@@ -226,191 +220,179 @@ Using standalone source with error rate 0.00 and 
sleep delay 1 millis
 Job has been submitted with JobID 97b20a0a8ffd5c1d656328b0cd6436a6
 ```
 
-See how the command is equal to the [initial run command](#submitting-a-job) 
except for the 
-`--fromSavepoint` parameter which is used to refer to the state of the 
-[previously stopped 
job](#stopping-a-job-gracefully-creating-a-final-savepoint). A new JobID is 
-generated that can be used to maintain the job.
+请注意,该命令除了使用 `-fromSavepoint` 
参数关联[之前停止作业](#stopping-a-job-gracefully-creating-a-final-savepoint)的状态外,其它参数都与[初始
 run 命令](#submitting-a-job)相同。该操作会生成一个新的 JobID,用于维护作业的运行。
+
 
-By default, we try to match the whole savepoint state to the job being 
submitted. If you want to 
-allow to skip savepoint state that cannot be restored with the new job you can 
set the 
-`--allowNonRestoredState` flag. You need to allow this if you removed an 
operator from your program 
-that was part of the program when the savepoint was triggered and you still 
want to use the savepoint.
+默认情况下,Flink 尝试将新提交的作业恢复到完整的 savepoint 状态。如果你想忽略不能随新作业恢复的 savepoint 状态,可以设置 
`--allowNonRestoredState` 标志。当你删除了程序的某个操作,同时该操作是创建 savepoint 
时对应程序的一部分,这种情况下,如果你仍想使用 savepoint,就需要设置此参数。
 
 ```bash
 $ ./bin/flink run \
   --fromSavepoint  \
   --allowNonRestoredState ...
 ```
-This is useful if your program dropped an operator that was part of the 
savepoint.
+如果你的程序删除了相应 savepoint 的部分运算操作,使用该选项将很有帮助。
 
 {{< top >}}
 
-## CLI Actions
+ 
+
+## CLI 操作
+
+以下是 Flink CLI 工具支持操作的概览:
 
-Here's an overview of actions supported by Flink's CLI tool:
 
 
 
-  Action
-  Purpose
+  操作
+  目的
 
 
 
 
 run
 
-This action executes jobs. It requires at least the jar 
containing the job. Flink-
-or job-related arguments can be passed if necessary.
+该操作用于执行作业。必须指定包含作业的 jar 包。如有必要,可以传递与 Flink 或作业相关的参数。
 
 
 
 run-application
 
-This action executes jobs in }}#application-mode">
-Application Mode. Other than that, it requires the same 
parameters as the 
-run action.
+该操作用于在 }}#applic

[GitHub] [flink] zentol commented on a change in pull request #15972: Add common source and operator metrics.

2021-07-05 Thread GitBox


zentol commented on a change in pull request #15972:
URL: https://github.com/apache/flink/pull/15972#discussion_r664011654



##
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
##
@@ -29,6 +33,17 @@
  * The interface for a source reader which is responsible for reading the 
records from the source
  * splits assigned by {@link SplitEnumerator}.
  *
+ * Implementations should can provide the following metrics:

Review comment:
   ```suggestion
* Implementations should provide the following metrics:
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-11448) Clean-up and prepare Table API to be uncoupled from table core

2021-07-05 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-11448.

Fix Version/s: 1.9.0
   Resolution: Resolved

Resolved in Flink 1.9.0.

> Clean-up and prepare Table API to be uncoupled from table core
> --
>
> Key: FLINK-11448
> URL: https://issues.apache.org/jira/browse/FLINK-11448
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Priority: Major
>  Labels: auto-unassigned
> Fix For: 1.9.0
>
>
> A more detailed description can be found in 
> [FLIP-32|https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions].
> This step aims to provide a clean API that is preferably implemented in Java 
> and uncoupled from the table core. Ideally, the API consists of 
> well-documented interfaces. A planner can provide an implementation for those.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #16134: [FLINK-21906][hive] Support computed column syntax for Hive DDL dialect

2021-07-05 Thread GitBox


flinkbot edited a comment on pull request #16134:
URL: https://github.com/apache/flink/pull/16134#issuecomment-858596907


   
   ## CI report:
   
   * f055c47f3860d96a7e06544944ebe995d386d6a7 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18877)
 
   * b198b330d6695013698d01af4b5090f5ff8ab0e4 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   4   5   6   7   >