[jira] [Commented] (FLINK-20913) Improve new HiveConf(jobConf, HiveConf.class)

2021-01-20 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17268466#comment-17268466
 ] 

Xingxing Di commented on FLINK-20913:
-

Thanks [~lirui], I've sumitted PRs.

> Improve new HiveConf(jobConf, HiveConf.class)
> -
>
> Key: FLINK-20913
> URL: https://issues.apache.org/jira/browse/FLINK-20913
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.12.0, 1.12.1, 1.12.2
> Environment: Hive 2.0.1
> Flink 1.12.0
> Query with SQL client
>Reporter: Xingxing Di
>Assignee: Xingxing Di
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.11.4, 1.12.2
>
>
> When we query hive tables We got an Exception in  
> org.apache.flink.connectors.hive.util.HivePartitionUtils#getAllPartitions
> Exception:
>  
> {code:java}
> org.apache.thrift.transport.TTransportException
> {code}
>  
> SQL: 
> {code:java}
> select * from dxx1 limit 1;
> {code}
>  
> After debug we found that new HiveConf will override the configurations in 
> jobConf,in my case `hive.metastore.sasl.enabled` was reset to `false`, which 
> is unexpected.
> {code:java}
> // org.apache.flink.connectors.hive.util.HivePartitionUtils
> new HiveConf(jobConf, HiveConf.class){code}
>  
> *I think we should add an HiveConfUtils to create HiveConf, which would be 
> like this:*
>  
> {code:java}
> HiveConf hiveConf = new HiveConf(jobConf, HiveConf.class);
> hiveConf.addResource(jobConf);{code}
> Above code can fix the error, i will make a PR if this improvement is 
> acceptable.
>  
> Here is the detail error stack:
> {code:java}
> 2021-01-10 17:27:11,995 WARN  org.apache.flink.table.client.cli.CliClient     
>              [] - Could not execute SQL statement.2021-01-10 17:27:11,995 
> WARN  org.apache.flink.table.client.cli.CliClient                  [] - Could 
> not execute SQL 
> statement.org.apache.flink.table.client.gateway.SqlExecutionException: 
> Invalid SQL query. at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:527)
>  ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:365)
>  ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:634) 
> ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:324) 
> ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_202] at 
> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:216) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:141) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:196) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0]Caused by: 
> org.apache.flink.connectors.hive.FlinkHiveException: Failed to collect all 
> partitions from hive metaStore at 
> org.apache.flink.connectors.hive.util.HivePartitionUtils.getAllPartitions(HivePartitionUtils.java:142)
>  ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:133)
>  ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:119)
>  ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalTableSourceScan.createSourceTransformation(CommonPhysicalTableSourceScan.scala:88)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:94)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:44)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlan(BatchExecTableSourceScan.scala:44)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLimit.translateToPlanInternal(BatchExecLimit.scala:105)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.ta

[jira] [Commented] (FLINK-20913) Improve new HiveConf(jobConf, HiveConf.class)

2021-01-19 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17267966#comment-17267966
 ] 

Xingxing Di commented on FLINK-20913:
-

[~lirui] , I would like to, and i'm working on it.

> Improve new HiveConf(jobConf, HiveConf.class)
> -
>
> Key: FLINK-20913
> URL: https://issues.apache.org/jira/browse/FLINK-20913
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.12.0, 1.12.1, 1.12.2
> Environment: Hive 2.0.1
> Flink 1.12.0
> Query with SQL client
>Reporter: Xingxing Di
>Assignee: Xingxing Di
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.11.4, 1.12.2
>
>
> When we query hive tables We got an Exception in  
> org.apache.flink.connectors.hive.util.HivePartitionUtils#getAllPartitions
> Exception:
>  
> {code:java}
> org.apache.thrift.transport.TTransportException
> {code}
>  
> SQL: 
> {code:java}
> select * from dxx1 limit 1;
> {code}
>  
> After debug we found that new HiveConf will override the configurations in 
> jobConf,in my case `hive.metastore.sasl.enabled` was reset to `false`, which 
> is unexpected.
> {code:java}
> // org.apache.flink.connectors.hive.util.HivePartitionUtils
> new HiveConf(jobConf, HiveConf.class){code}
>  
> *I think we should add an HiveConfUtils to create HiveConf, which would be 
> like this:*
>  
> {code:java}
> HiveConf hiveConf = new HiveConf(jobConf, HiveConf.class);
> hiveConf.addResource(jobConf);{code}
> Above code can fix the error, i will make a PR if this improvement is 
> acceptable.
>  
> Here is the detail error stack:
> {code:java}
> 2021-01-10 17:27:11,995 WARN  org.apache.flink.table.client.cli.CliClient     
>              [] - Could not execute SQL statement.2021-01-10 17:27:11,995 
> WARN  org.apache.flink.table.client.cli.CliClient                  [] - Could 
> not execute SQL 
> statement.org.apache.flink.table.client.gateway.SqlExecutionException: 
> Invalid SQL query. at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:527)
>  ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:365)
>  ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:634) 
> ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:324) 
> ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_202] at 
> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:216) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:141) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:196) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0]Caused by: 
> org.apache.flink.connectors.hive.FlinkHiveException: Failed to collect all 
> partitions from hive metaStore at 
> org.apache.flink.connectors.hive.util.HivePartitionUtils.getAllPartitions(HivePartitionUtils.java:142)
>  ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:133)
>  ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:119)
>  ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalTableSourceScan.createSourceTransformation(CommonPhysicalTableSourceScan.scala:88)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:94)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:44)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlan(BatchExecTableSourceScan.scala:44)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLimit.translateToPlanInternal(BatchExecLimit.scala:105)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.

[jira] [Issue Comment Deleted] (FLINK-20913) Improve new HiveConf(jobConf, HiveConf.class)

2021-01-15 Thread Xingxing Di (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingxing Di updated FLINK-20913:

Comment: was deleted

(was: [~lirui] could you assign it to me, i'll submit a PR.)

> Improve new HiveConf(jobConf, HiveConf.class)
> -
>
> Key: FLINK-20913
> URL: https://issues.apache.org/jira/browse/FLINK-20913
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.12.0, 1.12.1, 1.12.2
> Environment: Hive 2.0.1
> Flink 1.12.0
> Query with SQL client
>Reporter: Xingxing Di
>Assignee: Xingxing Di
>Priority: Major
>  Labels: pull-request-available
>
> When we query hive tables We got an Exception in  
> org.apache.flink.connectors.hive.util.HivePartitionUtils#getAllPartitions
> Exception:
>  
> {code:java}
> org.apache.thrift.transport.TTransportException
> {code}
>  
> SQL: 
> {code:java}
> select * from dxx1 limit 1;
> {code}
>  
> After debug we found that new HiveConf will override the configurations in 
> jobConf,in my case `hive.metastore.sasl.enabled` was reset to `false`, which 
> is unexpected.
> {code:java}
> // org.apache.flink.connectors.hive.util.HivePartitionUtils
> new HiveConf(jobConf, HiveConf.class){code}
>  
> *I think we should add an HiveConfUtils to create HiveConf, which would be 
> like this:*
>  
> {code:java}
> HiveConf hiveConf = new HiveConf(jobConf, HiveConf.class);
> hiveConf.addResource(jobConf);{code}
> Above code can fix the error, i will make a PR if this improvement is 
> acceptable.
>  
> Here is the detail error stack:
> {code:java}
> 2021-01-10 17:27:11,995 WARN  org.apache.flink.table.client.cli.CliClient     
>              [] - Could not execute SQL statement.2021-01-10 17:27:11,995 
> WARN  org.apache.flink.table.client.cli.CliClient                  [] - Could 
> not execute SQL 
> statement.org.apache.flink.table.client.gateway.SqlExecutionException: 
> Invalid SQL query. at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:527)
>  ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:365)
>  ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:634) 
> ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:324) 
> ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_202] at 
> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:216) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:141) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:196) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0]Caused by: 
> org.apache.flink.connectors.hive.FlinkHiveException: Failed to collect all 
> partitions from hive metaStore at 
> org.apache.flink.connectors.hive.util.HivePartitionUtils.getAllPartitions(HivePartitionUtils.java:142)
>  ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:133)
>  ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:119)
>  ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalTableSourceScan.createSourceTransformation(CommonPhysicalTableSourceScan.scala:88)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:94)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:44)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlan(BatchExecTableSourceScan.scala:44)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLimit.translateToPlanInternal(BatchExecLimit.scala:105)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLimit.trans

[jira] [Commented] (FLINK-20913) Improve new HiveConf(jobConf, HiveConf.class)

2021-01-13 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17264242#comment-17264242
 ] 

Xingxing Di commented on FLINK-20913:
-

[~lirui] could you assign it to me, i'll submit a PR.

> Improve new HiveConf(jobConf, HiveConf.class)
> -
>
> Key: FLINK-20913
> URL: https://issues.apache.org/jira/browse/FLINK-20913
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Hive
>Affects Versions: 1.12.0, 1.12.1, 1.12.2
> Environment: Hive 2.0.1
> Flink 1.12.0
> Query with SQL client
>Reporter: Xingxing Di
>Priority: Major
>  Labels: pull-request-available
>
> When we query hive tables We got an Exception in  
> org.apache.flink.connectors.hive.util.HivePartitionUtils#getAllPartitions
> Exception:
>  
> {code:java}
> org.apache.thrift.transport.TTransportException
> {code}
>  
> SQL: 
> {code:java}
> select * from dxx1 limit 1;
> {code}
>  
> After debug we found that new HiveConf will override the configurations in 
> jobConf,in my case `hive.metastore.sasl.enabled` was reset to `false`, which 
> is unexpected.
> {code:java}
> // org.apache.flink.connectors.hive.util.HivePartitionUtils
> new HiveConf(jobConf, HiveConf.class){code}
>  
> *I think we should add an HiveConfUtils to create HiveConf, which would be 
> like this:*
>  
> {code:java}
> HiveConf hiveConf = new HiveConf(jobConf, HiveConf.class);
> hiveConf.addResource(jobConf);{code}
> Above code can fix the error, i will make a PR if this improvement is 
> acceptable.
>  
> Here is the detail error stack:
> {code:java}
> 2021-01-10 17:27:11,995 WARN  org.apache.flink.table.client.cli.CliClient     
>              [] - Could not execute SQL statement.2021-01-10 17:27:11,995 
> WARN  org.apache.flink.table.client.cli.CliClient                  [] - Could 
> not execute SQL 
> statement.org.apache.flink.table.client.gateway.SqlExecutionException: 
> Invalid SQL query. at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:527)
>  ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:365)
>  ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:634) 
> ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:324) 
> ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_202] at 
> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:216) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:141) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:196) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0]Caused by: 
> org.apache.flink.connectors.hive.FlinkHiveException: Failed to collect all 
> partitions from hive metaStore at 
> org.apache.flink.connectors.hive.util.HivePartitionUtils.getAllPartitions(HivePartitionUtils.java:142)
>  ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:133)
>  ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:119)
>  ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalTableSourceScan.createSourceTransformation(CommonPhysicalTableSourceScan.scala:88)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:94)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:44)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlan(BatchExecTableSourceScan.scala:44)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLimit.translateToPlanInternal(BatchExecLimit.scala:105)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLimit.transla

[jira] [Commented] (FLINK-20913) Improve new HiveConf(jobConf, HiveConf.class)

2021-01-12 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17263838#comment-17263838
 ] 

Xingxing Di commented on FLINK-20913:
-

Hi [~lirui], i looked into the code, i found the configurations will not be 
reset which default value is 'null' or empty string, since the default value of 
'_hive.metastore.uris_' is empty string, so it not been reset. 

[More 
details|https://docs.google.com/document/d/1i0us6Czk28He844MAj4s0r6-wDaQNiUp-BEhye92E2U/edit?usp=sharing]

I will submit a PR to fix it.

> Improve new HiveConf(jobConf, HiveConf.class)
> -
>
> Key: FLINK-20913
> URL: https://issues.apache.org/jira/browse/FLINK-20913
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Hive
>Affects Versions: 1.12.0, 1.12.1, 1.12.2
> Environment: Hive 2.0.1
> Flink 1.12.0
> Query with SQL client
>Reporter: Xingxing Di
>Priority: Major
>
> When we query hive tables We got an Exception in  
> org.apache.flink.connectors.hive.util.HivePartitionUtils#getAllPartitions
> Exception:
>  
> {code:java}
> org.apache.thrift.transport.TTransportException
> {code}
>  
> SQL: 
> {code:java}
> select * from dxx1 limit 1;
> {code}
>  
> After debug we found that new HiveConf will override the configurations in 
> jobConf,in my case `hive.metastore.sasl.enabled` was reset to `false`, which 
> is unexpected.
> {code:java}
> // org.apache.flink.connectors.hive.util.HivePartitionUtils
> new HiveConf(jobConf, HiveConf.class){code}
>  
> *I think we should add an HiveConfUtils to create HiveConf, which would be 
> like this:*
>  
> {code:java}
> HiveConf hiveConf = new HiveConf(jobConf, HiveConf.class);
> hiveConf.addResource(jobConf);{code}
> Above code can fix the error, i will make a PR if this improvement is 
> acceptable.
>  
> Here is the detail error stack:
> {code:java}
> 2021-01-10 17:27:11,995 WARN  org.apache.flink.table.client.cli.CliClient     
>              [] - Could not execute SQL statement.2021-01-10 17:27:11,995 
> WARN  org.apache.flink.table.client.cli.CliClient                  [] - Could 
> not execute SQL 
> statement.org.apache.flink.table.client.gateway.SqlExecutionException: 
> Invalid SQL query. at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:527)
>  ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:365)
>  ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:634) 
> ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:324) 
> ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_202] at 
> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:216) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:141) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:196) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0]Caused by: 
> org.apache.flink.connectors.hive.FlinkHiveException: Failed to collect all 
> partitions from hive metaStore at 
> org.apache.flink.connectors.hive.util.HivePartitionUtils.getAllPartitions(HivePartitionUtils.java:142)
>  ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:133)
>  ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:119)
>  ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalTableSourceScan.createSourceTransformation(CommonPhysicalTableSourceScan.scala:88)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:94)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:44)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlan(BatchExecTableSourceScan.scala:44)
>  ~[flink-table-blink_2.11-1.12.0.jar:1

[jira] [Updated] (FLINK-20913) Improve new HiveConf(jobConf, HiveConf.class)

2021-01-10 Thread Xingxing Di (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingxing Di updated FLINK-20913:

Summary: Improve new HiveConf(jobConf, HiveConf.class)  (was: Improve new 
HiveConf)

> Improve new HiveConf(jobConf, HiveConf.class)
> -
>
> Key: FLINK-20913
> URL: https://issues.apache.org/jira/browse/FLINK-20913
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Hive
>Affects Versions: 1.12.0, 1.12.1, 1.12.2
> Environment: Hive 2.0.1
> Flink 1.12.0
> Query with SQL client
>Reporter: Xingxing Di
>Priority: Major
>
> When we query hive tables We got an Exception in  
> org.apache.flink.connectors.hive.util.HivePartitionUtils#getAllPartitions
> Exception:
>  
> {code:java}
> org.apache.thrift.transport.TTransportException
> {code}
>  
> SQL: 
> {code:java}
> select * from dxx1 limit 1;
> {code}
>  
> After debug we found that new HiveConf will override the configurations in 
> jobConf,in my case `hive.metastore.sasl.enabled` was reset to `false`, which 
> is unexpected.
> {code:java}
> // org.apache.flink.connectors.hive.util.HivePartitionUtils
> new HiveConf(jobConf, HiveConf.class){code}
>  
> *I think we should add an HiveConfUtils to create HiveConf, which would be 
> like this:*
>  
> {code:java}
> HiveConf hiveConf = new HiveConf(jobConf, HiveConf.class);
> hiveConf.addResource(jobConf);{code}
> Above code can fix the error, i will make a PR if this improvement is 
> acceptable.
>  
> Here is the detail error stack:
> {code:java}
> 2021-01-10 17:27:11,995 WARN  org.apache.flink.table.client.cli.CliClient     
>              [] - Could not execute SQL statement.2021-01-10 17:27:11,995 
> WARN  org.apache.flink.table.client.cli.CliClient                  [] - Could 
> not execute SQL 
> statement.org.apache.flink.table.client.gateway.SqlExecutionException: 
> Invalid SQL query. at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:527)
>  ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:365)
>  ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:634) 
> ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:324) 
> ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_202] at 
> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:216) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:141) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:196) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0]Caused by: 
> org.apache.flink.connectors.hive.FlinkHiveException: Failed to collect all 
> partitions from hive metaStore at 
> org.apache.flink.connectors.hive.util.HivePartitionUtils.getAllPartitions(HivePartitionUtils.java:142)
>  ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:133)
>  ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:119)
>  ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalTableSourceScan.createSourceTransformation(CommonPhysicalTableSourceScan.scala:88)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:94)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:44)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlan(BatchExecTableSourceScan.scala:44)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLimit.translateToPlanInternal(BatchExecLimit.scala:105)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLimit.translateToPlanInternal(BatchExecLimit.scala:47)
>  ~[flink-table-blink_2.11

[jira] [Updated] (FLINK-20913) Improve new HiveConf

2021-01-10 Thread Xingxing Di (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingxing Di updated FLINK-20913:

Description: 
When we query hive tables We got an Exception in  
org.apache.flink.connectors.hive.util.HivePartitionUtils#getAllPartitions

Exception:

 
{code:java}
org.apache.thrift.transport.TTransportException
{code}
 

SQL: 
{code:java}
select * from dxx1 limit 1;
{code}
 

After debug we found that new HiveConf will override the configurations in 
jobConf,in my case `hive.metastore.sasl.enabled` was reset to `false`, which is 
unexpected.
{code:java}
// org.apache.flink.connectors.hive.util.HivePartitionUtils
new HiveConf(jobConf, HiveConf.class){code}
 

*I think we should add an HiveConfUtils to create HiveConf, which would be like 
this:*

 
{code:java}
HiveConf hiveConf = new HiveConf(jobConf, HiveConf.class);
hiveConf.addResource(jobConf);{code}
Above code can fix the error, i will make a PR if this improvement is 
acceptable.

 

Here is the detail error stack:
{code:java}
2021-01-10 17:27:11,995 WARN  org.apache.flink.table.client.cli.CliClient       
           [] - Could not execute SQL statement.2021-01-10 17:27:11,995 WARN  
org.apache.flink.table.client.cli.CliClient                  [] - Could not 
execute SQL 
statement.org.apache.flink.table.client.gateway.SqlExecutionException: Invalid 
SQL query. at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:527)
 ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:365)
 ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:634) 
~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:324) 
~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_202] at 
org.apache.flink.table.client.cli.CliClient.open(CliClient.java:216) 
[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:141) 
[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) 
[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.SqlClient.main(SqlClient.java:196) 
[flink-sql-client_2.11-1.12.0.jar:1.12.0]Caused by: 
org.apache.flink.connectors.hive.FlinkHiveException: Failed to collect all 
partitions from hive metaStore at 
org.apache.flink.connectors.hive.util.HivePartitionUtils.getAllPartitions(HivePartitionUtils.java:142)
 ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:133)
 ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:119)
 ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalTableSourceScan.createSourceTransformation(CommonPhysicalTableSourceScan.scala:88)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:94)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:44)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlan(BatchExecTableSourceScan.scala:44)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLimit.translateToPlanInternal(BatchExecLimit.scala:105)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLimit.translateToPlanInternal(BatchExecLimit.scala:47)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLimit.translateToPlan(BatchExecLimit.scala:47)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange.translateToPlanInternal(BatchExecExchange.scala:141)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange.translateToPlanInternal(BatchExecExchange.scala:52)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.plan

[jira] [Updated] (FLINK-20913) Improve new HiveConf

2021-01-10 Thread Xingxing Di (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingxing Di updated FLINK-20913:

Summary: Improve new HiveConf  (was: Got TTransportException while querying 
Hive tables)

> Improve new HiveConf
> 
>
> Key: FLINK-20913
> URL: https://issues.apache.org/jira/browse/FLINK-20913
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Hive
>Affects Versions: 1.12.0, 1.12.1, 1.12.2
> Environment: Hive 2.0.1
> Flink 1.12.0
> Query with SQL client
>Reporter: Xingxing Di
>Priority: Major
>
> When we query hive tables We got an Exception in  
> org.apache.flink.connectors.hive.util.HivePartitionUtils#getAllPartitions
> Exception:
>  
> {code:java}
> org.apache.thrift.transport.TTransportException
> {code}
>  
> SQL: 
> {code:java}
> select * from dxx1 limit 1;
> {code}
>  
> After debug we found that new HiveConf will override the configurations in 
> jobConf,(`hive.metastore.sasl.enabled` was set to `false` , which is 
> unexpected)
> {code:java}
> // org.apache.flink.connectors.hive.util.HivePartitionUtils
> new HiveConf(jobConf, HiveConf.class){code}
> *It is working after i use jobConf properties override hiveConf porperties, 
> if this is a right way to fix this error, i will make a PR.*
> Here is the detail error stack:
> {code:java}
> 2021-01-10 17:27:11,995 WARN  org.apache.flink.table.client.cli.CliClient     
>              [] - Could not execute SQL statement.2021-01-10 17:27:11,995 
> WARN  org.apache.flink.table.client.cli.CliClient                  [] - Could 
> not execute SQL 
> statement.org.apache.flink.table.client.gateway.SqlExecutionException: 
> Invalid SQL query. at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:527)
>  ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:365)
>  ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:634) 
> ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:324) 
> ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_202] at 
> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:216) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:141) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:196) 
> [flink-sql-client_2.11-1.12.0.jar:1.12.0]Caused by: 
> org.apache.flink.connectors.hive.FlinkHiveException: Failed to collect all 
> partitions from hive metaStore at 
> org.apache.flink.connectors.hive.util.HivePartitionUtils.getAllPartitions(HivePartitionUtils.java:142)
>  ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:133)
>  ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:119)
>  ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalTableSourceScan.createSourceTransformation(CommonPhysicalTableSourceScan.scala:88)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:94)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:44)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlan(BatchExecTableSourceScan.scala:44)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLimit.translateToPlanInternal(BatchExecLimit.scala:105)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLimit.translateToPlanInternal(BatchExecLimit.scala:47)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>  ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
> org.apache.flink.table.planner.plan.nodes.p

[jira] [Updated] (FLINK-20913) Got TTransportException while querying Hive tables

2021-01-10 Thread Xingxing Di (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingxing Di updated FLINK-20913:

Description: 
When we query hive tables We got an Exception in  
org.apache.flink.connectors.hive.util.HivePartitionUtils#getAllPartitions

Exception:

 
{code:java}
org.apache.thrift.transport.TTransportException
{code}
 

SQL: 
{code:java}
select * from dxx1 limit 1;
{code}
 

After debug we found that new HiveConf will override the configurations in 
jobConf,(`hive.metastore.sasl.enabled` was set to `false` , which is unexpected)
{code:java}
// org.apache.flink.connectors.hive.util.HivePartitionUtils
new HiveConf(jobConf, HiveConf.class){code}
*It is working after i use jobConf properties override hiveConf porperties, if 
this is a right way to fix this error, i will make a PR.*

Here is the detail error stack:
{code:java}
2021-01-10 17:27:11,995 WARN  org.apache.flink.table.client.cli.CliClient       
           [] - Could not execute SQL statement.2021-01-10 17:27:11,995 WARN  
org.apache.flink.table.client.cli.CliClient                  [] - Could not 
execute SQL 
statement.org.apache.flink.table.client.gateway.SqlExecutionException: Invalid 
SQL query. at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:527)
 ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:365)
 ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:634) 
~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:324) 
~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_202] at 
org.apache.flink.table.client.cli.CliClient.open(CliClient.java:216) 
[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:141) 
[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) 
[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.SqlClient.main(SqlClient.java:196) 
[flink-sql-client_2.11-1.12.0.jar:1.12.0]Caused by: 
org.apache.flink.connectors.hive.FlinkHiveException: Failed to collect all 
partitions from hive metaStore at 
org.apache.flink.connectors.hive.util.HivePartitionUtils.getAllPartitions(HivePartitionUtils.java:142)
 ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:133)
 ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:119)
 ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalTableSourceScan.createSourceTransformation(CommonPhysicalTableSourceScan.scala:88)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:94)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:44)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlan(BatchExecTableSourceScan.scala:44)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLimit.translateToPlanInternal(BatchExecLimit.scala:105)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLimit.translateToPlanInternal(BatchExecLimit.scala:47)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLimit.translateToPlan(BatchExecLimit.scala:47)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange.translateToPlanInternal(BatchExecExchange.scala:141)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange.translateToPlanInternal(BatchExecExchange.scala:52)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.p

[jira] [Updated] (FLINK-20913) Got TTransportException while querying Hive tables

2021-01-10 Thread Xingxing Di (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingxing Di updated FLINK-20913:

Description: 
When we query hive tables We will got an Exception in  
org.apache.flink.connectors.hive.util.HivePartitionUtils#getAllPartitions

Exception:

 
{code:java}
org.apache.thrift.transport.TTransportException
{code}
 

SQL: 
{code:java}
select * from dxx1 limit 1;
{code}
 

After debug we found that new HiveConf will override the configurations in 
jobConf,(`hive.metastore.sasl.enabled` was set to `false` , which is unexpected)
{code:java}
// org.apache.flink.connectors.hive.util.HivePartitionUtils
new HiveConf(jobConf, HiveConf.class){code}
*It is working after i use jobConf properties override hiveConf porperties, if 
this is a right way to fix this error, i will make a PR.*

Here is the detail error stack:
{code:java}
2021-01-10 17:27:11,995 WARN  org.apache.flink.table.client.cli.CliClient       
           [] - Could not execute SQL statement.2021-01-10 17:27:11,995 WARN  
org.apache.flink.table.client.cli.CliClient                  [] - Could not 
execute SQL 
statement.org.apache.flink.table.client.gateway.SqlExecutionException: Invalid 
SQL query. at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:527)
 ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:365)
 ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:634) 
~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:324) 
~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_202] at 
org.apache.flink.table.client.cli.CliClient.open(CliClient.java:216) 
[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:141) 
[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) 
[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.SqlClient.main(SqlClient.java:196) 
[flink-sql-client_2.11-1.12.0.jar:1.12.0]Caused by: 
org.apache.flink.connectors.hive.FlinkHiveException: Failed to collect all 
partitions from hive metaStore at 
org.apache.flink.connectors.hive.util.HivePartitionUtils.getAllPartitions(HivePartitionUtils.java:142)
 ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:133)
 ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:119)
 ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalTableSourceScan.createSourceTransformation(CommonPhysicalTableSourceScan.scala:88)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:94)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:44)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlan(BatchExecTableSourceScan.scala:44)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLimit.translateToPlanInternal(BatchExecLimit.scala:105)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLimit.translateToPlanInternal(BatchExecLimit.scala:47)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLimit.translateToPlan(BatchExecLimit.scala:47)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange.translateToPlanInternal(BatchExecExchange.scala:141)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange.translateToPlanInternal(BatchExecExchange.scala:52)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.no

[jira] [Created] (FLINK-20913) Got TTransportException while querying Hive tables

2021-01-10 Thread Xingxing Di (Jira)
Xingxing Di created FLINK-20913:
---

 Summary: Got TTransportException while querying Hive tables
 Key: FLINK-20913
 URL: https://issues.apache.org/jira/browse/FLINK-20913
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Hive
Affects Versions: 1.12.0, 1.12.1, 1.12.2
 Environment: Hive 2.0.1

Flink 1.12.0

Query with SQL client
Reporter: Xingxing Di


When we query hive tables We will got an Exception in  
org.apache.flink.connectors.hive.util.HivePartitionUtils#getAllPartitions

Exception:

 
{code:java}
org.apache.thrift.transport.TTransportException
{code}
 

SQL: 
{code:java}
select * from dxx1 limit 1;
{code}
 

After debug we found that new HiveConf will override the configurations in 
jobConf,(
override `hive.metastore.sasl.enabled` to false)
{code:java}

// org.apache.flink.connectors.hive.util.HivePartitionUtils
new HiveConf(jobConf, HiveConf.class){code}
 

Here is the detail error stack:

 
{code:java}
2021-01-10 17:27:11,995 WARN  org.apache.flink.table.client.cli.CliClient       
           [] - Could not execute SQL statement.2021-01-10 17:27:11,995 WARN  
org.apache.flink.table.client.cli.CliClient                  [] - Could not 
execute SQL 
statement.org.apache.flink.table.client.gateway.SqlExecutionException: Invalid 
SQL query. at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:527)
 ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:365)
 ~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:634) 
~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:324) 
~[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_202] at 
org.apache.flink.table.client.cli.CliClient.open(CliClient.java:216) 
[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:141) 
[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) 
[flink-sql-client_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.client.SqlClient.main(SqlClient.java:196) 
[flink-sql-client_2.11-1.12.0.jar:1.12.0]Caused by: 
org.apache.flink.connectors.hive.FlinkHiveException: Failed to collect all 
partitions from hive metaStore at 
org.apache.flink.connectors.hive.util.HivePartitionUtils.getAllPartitions(HivePartitionUtils.java:142)
 ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:133)
 ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:119)
 ~[flink-connector-hive_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalTableSourceScan.createSourceTransformation(CommonPhysicalTableSourceScan.scala:88)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:94)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:44)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlan(BatchExecTableSourceScan.scala:44)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLimit.translateToPlanInternal(BatchExecLimit.scala:105)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLimit.translateToPlanInternal(BatchExecLimit.scala:47)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecLimit.translateToPlan(BatchExecLimit.scala:47)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange.translateToPlanInternal(BatchExecExchange.scala:141)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange.translateToPlanInternal(BatchExecExchange.scala:52)
 ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.table.planner.plan.nodes.exec.Exe

[jira] [Commented] (FLINK-19947) Support sink parallelism configuration to Print connector

2020-11-03 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17225798#comment-17225798
 ] 

Xingxing Di commented on FLINK-19947:
-

Is anyone working on this, I would like to do this work.

> Support sink parallelism configuration to Print connector
> -
>
> Key: FLINK-19947
> URL: https://issues.apache.org/jira/browse/FLINK-19947
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Reporter: CloseRiver
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-10674) Fix handling of retractions after clean up

2020-07-26 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17165401#comment-17165401
 ] 

Xingxing Di commented on FLINK-10674:
-

Hi Timo,  I saw this issue has been fixed in 1.7.0 but today we still got the 
exception under flink 1.7.2, should we reopen this issue?
 !screenshot-1.png! 

> Fix handling of retractions after clean up
> --
>
> Key: FLINK-10674
> URL: https://issues.apache.org/jira/browse/FLINK-10674
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.6.1
> Environment: Flink 1.6.0
>Reporter: ambition
>Assignee: Timo Walther
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
> Attachments: image-2018-10-25-14-46-03-373.png, screenshot-1.png
>
>
> Our online Flink Job run about a week,job contain sql :
> {code:java}
> select  `time`,  
> lower(trim(os_type)) as os_type, 
> count(distinct feed_id) as feed_total_view  
> from  my_table 
> group by `time`, lower(trim(os_type)){code}
>  
>   then occur NPE: 
>  
> {code:java}
> java.lang.NullPointerException
> at scala.Predef$.Long2long(Predef.scala:363)
> at 
> org.apache.flink.table.functions.aggfunctions.DistinctAccumulator.remove(DistinctAccumulator.scala:109)
> at NonWindowedAggregationHelper$894.retract(Unknown Source)
> at 
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:124)
> at 
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:39)
> at 
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:745)
> {code}
>  
>  
> View DistinctAccumulator.remove
>  !image-2018-10-25-14-46-03-373.png!
>  
> this NPE should currentCnt = null lead to, so we simple handle like :
> {code:java}
> def remove(params: Row): Boolean = {
>   if(!distinctValueMap.contains(params)){
> true
>   }else{
> val currentCnt = distinctValueMap.get(params)
> // 
> if (currentCnt == null || currentCnt == 1) {
>   distinctValueMap.remove(params)
>   true
> } else {
>   var value = currentCnt - 1L
>   if(value < 0){
> value = 1
>   }
>   distinctValueMap.put(params, value)
>   false
> }
>   }
> }{code}
>  
> Update:
> Because state clean up happens in processing time, it might be
>  the case that retractions are arriving after the state has
>  been cleaned up. Before these changes, a new accumulator was
>  created and invalid retraction messages were emitted. This
>  change drops retraction messages for which no accumulator
>  exists. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-10674) Fix handling of retractions after clean up

2020-07-26 Thread Xingxing Di (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingxing Di updated FLINK-10674:

Attachment: screenshot-1.png

> Fix handling of retractions after clean up
> --
>
> Key: FLINK-10674
> URL: https://issues.apache.org/jira/browse/FLINK-10674
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.6.1
> Environment: Flink 1.6.0
>Reporter: ambition
>Assignee: Timo Walther
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
> Attachments: image-2018-10-25-14-46-03-373.png, screenshot-1.png
>
>
> Our online Flink Job run about a week,job contain sql :
> {code:java}
> select  `time`,  
> lower(trim(os_type)) as os_type, 
> count(distinct feed_id) as feed_total_view  
> from  my_table 
> group by `time`, lower(trim(os_type)){code}
>  
>   then occur NPE: 
>  
> {code:java}
> java.lang.NullPointerException
> at scala.Predef$.Long2long(Predef.scala:363)
> at 
> org.apache.flink.table.functions.aggfunctions.DistinctAccumulator.remove(DistinctAccumulator.scala:109)
> at NonWindowedAggregationHelper$894.retract(Unknown Source)
> at 
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:124)
> at 
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:39)
> at 
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:745)
> {code}
>  
>  
> View DistinctAccumulator.remove
>  !image-2018-10-25-14-46-03-373.png!
>  
> this NPE should currentCnt = null lead to, so we simple handle like :
> {code:java}
> def remove(params: Row): Boolean = {
>   if(!distinctValueMap.contains(params)){
> true
>   }else{
> val currentCnt = distinctValueMap.get(params)
> // 
> if (currentCnt == null || currentCnt == 1) {
>   distinctValueMap.remove(params)
>   true
> } else {
>   var value = currentCnt - 1L
>   if(value < 0){
> value = 1
>   }
>   distinctValueMap.put(params, value)
>   false
> }
>   }
> }{code}
>  
> Update:
> Because state clean up happens in processing time, it might be
>  the case that retractions are arriving after the state has
>  been cleaned up. Before these changes, a new accumulator was
>  created and invalid retraction messages were emitted. This
>  change drops retraction messages for which no accumulator
>  exists. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16478) add restApi to modify loglevel

2020-07-22 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162494#comment-17162494
 ] 

Xingxing Di edited comment on FLINK-16478 at 7/22/20, 12:51 PM:


Hi [~chesnay], we run flink jobs in "per job" mode on YARN,  first i don't see 
a way to change the configuration files through  REST API, secondly I think use 
API is a better way.
About the SLF4J API part,I totaly agree with you. Is it acceptable if we add an 
configuration to show we are using log4j2 or logback, so we can stay away from 
the SLF4J API. 



was (Author: dixingx...@yeah.net):
Hi [~chesnay], we run flink jobs in "per job" mode on YARN,  first i don't see 
a way to change the configuration files through  REST API, secondly I think use 
API is a better way.
About the SDF4J API part,I totaly agree with you. Is it acceptable if we add an 
configuration to show we are using log4j2 or logback, so we can stay away from 
the SLF4J API. 


> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: xiaodao
>Priority: Minor
>
> sometimes we may need to change loglevel to get more information to resolved 
> bug, now we need to stop it and modify conf/log4j.properties and resubmit it 
> ,i think it's better to add rest api to modify loglevel.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16478) add restApi to modify loglevel

2020-07-22 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162494#comment-17162494
 ] 

Xingxing Di edited comment on FLINK-16478 at 7/22/20, 12:51 PM:


Hi [~chesnay], we run flink jobs in "per job" mode on YARN,  first i don't see 
a way to change the configuration files through  REST API, secondly I think use 
API is a better way.
About the SDF4J API part,I totaly agree with you. Is it acceptable if we add an 
configuration to show we are using log4j2 or logback, so we can stay away from 
the SLF4J API. 



was (Author: dixingx...@yeah.net):
Hi [~chesnay], we run flink jobs in "per job" mode on YARN,  first i don't see 
a way to change the configuration files through  REST API, secondly I think use 
API is an better way.
About the SDF4J API part,I totaly agree with you. Is it acceptable if we add an 
configuration to show we are using log4j2 or logback, so we can stay away from 
the SLF4J API. 


> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: xiaodao
>Priority: Minor
>
> sometimes we may need to change loglevel to get more information to resolved 
> bug, now we need to stop it and modify conf/log4j.properties and resubmit it 
> ,i think it's better to add rest api to modify loglevel.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16478) add restApi to modify loglevel

2020-07-21 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162494#comment-17162494
 ] 

Xingxing Di commented on FLINK-16478:
-

Hi [~chesnay], we run flink jobs in "per job" mode on YARN,  first i don't see 
a way to change the configuration files through  REST API, secondly I think use 
API is an better way.
About the SDF4J API part,I totaly agree with you. Is it acceptable if we add an 
configuration to show we are using log4j2 or logback, so we can stay away from 
the SLF4J API. 


> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: xiaodao
>Priority: Minor
>
> sometimes we may need to change loglevel to get more information to resolved 
> bug, now we need to stop it and modify conf/log4j.properties and resubmit it 
> ,i think it's better to add rest api to modify loglevel.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16478) add restApi to modify loglevel

2020-05-13 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106217#comment-17106217
 ] 

Xingxing Di edited comment on FLINK-16478 at 5/13/20, 2:56 PM:
---

Hi [~trohrmann], thanks for the comments.
I updated the design as you suggested [Flink should support dynamic log level 
setting|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY]:
* Removed the timer part
* Deleted the List and Cancel REST API

Now we will intruduce just one REST API, the design looks much simple now.



was (Author: dixingx...@yeah.net):
Hi [~trohrmann], thanks for the comments.
I updated the design as you suggested [Flink should support dynamic log level 
setting|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY]:
* removed the timer part
* deleted the List and Cancel REST API

Now we will intruduce just one REST API, the design looks much simple now.


> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: xiaodao
>Priority: Minor
>
> sometimes we may need to change loglevel to get more information to resolved 
> bug, now we need to stop it and modify conf/log4j.properties and resubmit it 
> ,i think it's better to add rest api to modify loglevel.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16478) add restApi to modify loglevel

2020-05-13 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106217#comment-17106217
 ] 

Xingxing Di edited comment on FLINK-16478 at 5/13/20, 11:56 AM:


Hi [~trohrmann], thanks for the comments.
I updated the design as you suggested [Flink should support dynamic log level 
setting|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY]:
* removed the timer part
* deleted the List and Cancel REST API

Now we will intruduce just one REST API, the design looks much simple now.



was (Author: dixingx...@yeah.net):
Hi [~trohrmann], thanks for the comments.
I updated the design as you suggested:
* removed the timer part
* deleted the List and Cancel REST API

Now we will intruduce just one REST API, the design looks much simple now.


> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: xiaodao
>Priority: Minor
>
> sometimes we may need to change loglevel to get more information to resolved 
> bug, now we need to stop it and modify conf/log4j.properties and resubmit it 
> ,i think it's better to add rest api to modify loglevel.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16478) add restApi to modify loglevel

2020-05-13 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106217#comment-17106217
 ] 

Xingxing Di commented on FLINK-16478:
-

Hi [~trohrmann], thanks for the comments.
I updated the design as you suggested:
* removed the timer part
* deleted the List and Cancel REST API

Now we will intruduce just one REST API, the design looks much simple now.


> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: xiaodao
>Priority: Minor
>
> sometimes we may need to change loglevel to get more information to resolved 
> bug, now we need to stop it and modify conf/log4j.properties and resubmit it 
> ,i think it's better to add rest api to modify loglevel.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16478) add restApi to modify loglevel

2020-05-12 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099746#comment-17099746
 ] 

Xingxing Di edited comment on FLINK-16478 at 5/12/20, 11:53 AM:


Hi [~trohrmann], thanks for the comments, The earlier class design was  too 
complicated, I have updated it to simplify the design: [Flink should support 
dynamic log level 
setting|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY]

1. About the scope thing, i totaly agree with you, we can start with the 
cluster wide log level, I've edit the google doc as well.
 About the timer thing, i think a timer for reseting log level would be very 
helpful, user can simply config one time for a short time debug, no need to 
worry about forgetting to change it back. This referenced to the design of 
Apache Storm: 
[https://github.com/apache/storm/blob/master/docs/dynamic-log-level-settings.md]

2. Since flink already migrate to log4j2 , I think we should at least support 
log4j2 and log4j.(According to the current design, we can easily support 
logback as well.)
 As you said before, log4j2 do have different means to configure the log level 
depending on the log4j2 version, but I found the way to configure log level 
which in your shared link will support all the log4j2 version as i known. Here 
is the 
[Log4j2ConfigManager|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.gpmra3ql0940]
 which similar to storm's [LogConfigManager . 
|https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java]I
 also added the [LogConfigManagerFactory 
|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.d9siisf8rvjt]
 to show how to detect the logging backend.

Compatibility:
 * For a unsupported logging backend, dynamic log level setting will not work, 
but cluster will work fine, since we do not depend on a specific implementation 
directly unless we detect an supported logging backend.
 * For an incompatible 
version([Log4j2ConfigManager|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.gpmra3ql0940]
 should work for all log4j2 versions, here we assume there is an unexpected 
case.), dynamic log level setting may not work properly, also flink logging 
system may not work properly either.

3. As above, i looked into the design of apache storm, seems storm only support 
log4j2. 

 

 


was (Author: dixingx...@yeah.net):
Hi [~trohrmann], thanks for the comments, The earlier design was  too 
complicated, I have updated it to simplify the design: [Flink should support 
dynamic log level 
setting|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY]

1. About the scope thing, i totaly agree with you, we can start with the 
cluster wide log level, I've edit the google doc as well.
 About the timer thing, i think a timer for reseting log level would be very 
helpful, user can simply config one time for a short time debug, no need to 
worry about forgetting to change it back. This referenced to the design of 
Apache Storm: 
[https://github.com/apache/storm/blob/master/docs/dynamic-log-level-settings.md]

2. Since flink already migrate to log4j2 , I think we should at least support 
log4j2 and log4j.(According to the current design, we can easily support 
logback as well.)
 As you said before, log4j2 do have different means to configure the log level 
depending on the log4j2 version, but I found the way to configure log level 
which in your shared link will support all the log4j2 version as i known. Here 
is the 
[Log4j2ConfigManager|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.gpmra3ql0940]
 which similar to storm's [LogConfigManager . 
|https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java]I
 also added the [LogConfigManagerFactory 
|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.d9siisf8rvjt]
 to show how to detect the logging backend.

Compatibility:
 * For a unsupported logging backend, dynamic log level setting will not work, 
but cluster will work fine, since we do not depend on a specific implementation 
directly unless we detect an supported logging backend.
 * For an incompatible 
version([Log4j2ConfigManager|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.gpmra3ql0940]
 should work for all log4j2 versions, here we assume there is an unexpected 
case.), dynamic log level setting may not work properly, also flink logging 
system may not work properly either.

3. As above, i looked into the design of apache storm, seems storm only support 
log4j2. 

 

 

> add restApi to modify lo

[jira] [Comment Edited] (FLINK-16478) add restApi to modify loglevel

2020-05-12 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099746#comment-17099746
 ] 

Xingxing Di edited comment on FLINK-16478 at 5/12/20, 11:42 AM:


Hi [~trohrmann], thanks for the comments, The earlier design was  too 
complicated, I have updated it to simplify the design: [Flink should support 
dynamic log level 
setting|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY]

1. About the scope thing, i totaly agree with you, we can start with the 
cluster wide log level, I've edit the google doc as well.
 About the timer thing, i think a timer for reseting log level would be very 
helpful, user can simply config one time for a short time debug, no need to 
worry about forgetting to change it back. This referenced to the design of 
Apache Storm: 
[https://github.com/apache/storm/blob/master/docs/dynamic-log-level-settings.md]

2. Since flink already migrate to log4j2 , I think we should at least support 
log4j2 and log4j.(According to the current design, we can easily support 
logback as well.)
 As you said before, log4j2 do have different means to configure the log level 
depending on the log4j2 version, but I found the way to configure log level 
which in your shared link will support all the log4j2 version as i known. Here 
is the 
[Log4j2ConfigManager|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.gpmra3ql0940]
 which similar to storm's [LogConfigManager . 
|https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java]I
 also added the [LogConfigManagerFactory 
|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.d9siisf8rvjt]
 to show how to detect the logging backend.

Compatibility:
 * For a unsupported logging backend, dynamic log level setting will not work, 
but cluster will work fine, since we do not depend on a specific implementation 
directly unless we detect an supported logging backend.
 * For an incompatible 
version([Log4j2ConfigManager|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.gpmra3ql0940]
 should work for all log4j2 versions, here we assume there is an unexpected 
case.), dynamic log level setting may not work properly, also flink logging 
system may not work properly either.

3. As above, i looked into the design of apache storm, seems storm only support 
log4j2. 

 

 


was (Author: dixingx...@yeah.net):
Hi [~trohrmann], thanks for the comments.

1. About the scope thing, i totaly agree with you, we can start with the 
cluster wide log level, I've edit the google doc as well.
About the timer thing, i think a timer for reseting log level would be very 
helpful, user can simply config one time for a short time debug, no need to 
worry about forgetting to change it back. This referenced to the design of 
Apache Storm: 
[https://github.com/apache/storm/blob/master/docs/dynamic-log-level-settings.md]

2. Since flink already migrate to log4j2 , I think we should at least support 
log4j2 and log4j.(According to the current design, we can easily support 
logback as well.)
As you said before, log4j2 do have different means to configure the log level 
depending on the log4j2 version, but I found the way to configure log level 
which in your shared link will support all the log4j2 version as i known. Here 
is the 
[Log4j2ConfigWorker|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 which similar to storm's [LogConfigManager . 
|https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java]I
 also added the [LogConfigWorkerFactory 
|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.jlgb5jpklf9k]
 to show how to detect the logging backend.

Compatibility:
 * For a unsupported logging backend, dynamic log level setting will not work, 
but cluster will work fine, since we do not depend on a specific implementation 
directly unless we detect an supported logging backend.
 * For an incompatible 
version([Log4j2ConfigWorker|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 should work for all log4j2 versions, here we assume there is an unexpected 
case.), dynamic log level setting may not work properly, also flink logging 
system may not work properly either.

3. As above, i looked into the design of apache storm, seems storm only support 
log4j2. I will continue to do the research.

 

 

> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
> 

[jira] [Comment Edited] (FLINK-16478) add restApi to modify loglevel

2020-05-06 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099746#comment-17099746
 ] 

Xingxing Di edited comment on FLINK-16478 at 5/7/20, 3:33 AM:
--

Hi [~trohrmann], thanks for the comments.

1. About the scope thing, i totaly agree with you, we can start with the 
cluster wide log level, I've edit the google doc as well.
About the timer thing, i think a timer for reseting log level would be very 
helpful, user can simply config one time for a short time debug, no need to 
worry about forgetting to change it back. This referenced to the design of 
Apache Storm: 
[https://github.com/apache/storm/blob/master/docs/dynamic-log-level-settings.md]

2. Since flink already migrate to log4j2 , I think we should at least support 
log4j2 and log4j.(According to the current design, we can easily support 
logback as well.)
As you said before, log4j2 do have different means to configure the log level 
depending on the log4j2 version, but I found the way to configure log level 
which in your shared link will support all the log4j2 version as i known. Here 
is the 
[Log4j2ConfigWorker|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 which similar to storm's [LogConfigManager . 
|https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java]I
 also added the [LogConfigWorkerFactory 
|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.jlgb5jpklf9k]
 to show how to detect the logging backend.

Compatibility:
 * For a unsupported logging backend, dynamic log level setting will not work, 
but cluster will work fine, since we do not depend on a specific implementation 
directly unless we detect an supported logging backend.
 * For an incompatible 
version([Log4j2ConfigWorker|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 should work for all log4j2 versions, here we assume there is an unexpected 
case.), dynamic log level setting may not work properly, also flink logging 
system may not work properly either.

3. As above, i looked into the design of apache storm, seems storm only support 
log4j2. I will continue to do the research.

 

 


was (Author: dixingx...@yeah.net):
Hi [~trohrmann], thanks for the comments.

1. About the scope thing, i totaly agree with you, we can start with the 
cluster wide log level, I've edit the google doc as well.
About the timer thing, i think a timer for reseting log level would be very 
helpful, user can simply config one time for a short time debug, no need to 
worry about forgetting to change it back. This referenced to the design of 
Apache Storm: 
[https://github.com/apache/storm/blob/master/docs/dynamic-log-level-settings.md]

2. Since flink already migrate to log4j2 , I think we should at least support 
log4j2 and log4j.(According to the current design, we can easily support 
logback as well.)
As you said before, log4j2 do have different means to configure the log level 
depending on the log4j2 version, but I found the way to configure log level 
which in your shared link will support all the log4j2 version as i known. Here 
is the 
[Log4j2ConfigWorker|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 which similar to storm's [LogConfigManager . 
|https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java]I
 also added the [LogConfigWorkerFactory 
|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.jlgb5jpklf9k]
 to show how to detect the logging backend.

Compatibility:
 * For a unsupported logging backend, Cluster will work as usual, since we do 
not depend on a specific implementation directly unless we detect an supported 
logging backend.
 * For an incompatible 
version([Log4j2ConfigWorker|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 should work for all log4j2 versions, here we assume there is an unexpected 
case.), dynamic log level setting may not work properly, also flink logging 
system may not work properly either.

3. As above, i looked into the design of apache storm, seems storm only support 
log4j2. I will continue to do the research.

 

 

> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: xiaodao
>Priority: Minor
>
> sometimes we may need to change loglevel to get more information to resolved 
> bug, now we need to sto

[jira] [Comment Edited] (FLINK-16478) add restApi to modify loglevel

2020-05-06 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099746#comment-17099746
 ] 

Xingxing Di edited comment on FLINK-16478 at 5/7/20, 3:29 AM:
--

Hi [~trohrmann], thanks for the comments.

1. About the scope thing, i totaly agree with you, we can start with the 
cluster wide log level, I've edit the google doc as well.
About the timer thing, i think a timer for reseting log level would be very 
helpful, user can simply config one time for a short time debug, no need to 
worry about forgetting to change it back. This referenced to the design of 
Apache Storm: 
[https://github.com/apache/storm/blob/master/docs/dynamic-log-level-settings.md]

2. Since flink already migrate to log4j2 , I think we should at least support 
log4j2 and log4j.(According to the current design, we can easily support 
logback as well.)
As you said before, log4j2 do have different means to configure the log level 
depending on the log4j2 version, but I found the way to configure log level 
which in your shared link will support all the log4j2 version as i known. Here 
is the 
[Log4j2ConfigWorker|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 which similar to storm's [LogConfigManager . 
|https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java]I
 also added the [LogConfigWorkerFactory 
|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.jlgb5jpklf9k]
 to show how to detect the logging backend.

Compatibility:
 * For a unsupported logging backend, Cluster will work as usual, since we do 
not depend on a specific implementation directly unless we detect an supported 
logging backend.
 * For an incompatible 
version([Log4j2ConfigWorker|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 should work for all log4j2 versions, here we assume there is an unexpected 
case.), dynamic log level setting may not work properly, also flink logging 
system may not work properly either.

3. As above, i looked into the design of apache storm, seems storm only support 
log4j2. I will continue to do the research.

 

 


was (Author: dixingx...@yeah.net):
Hi [~trohrmann], thanks for the comments.

1. About the scope thing, i totaly agree with you, we can start with the 
cluster wide log level, I've edit the google doc as well.
About the timer thing, i think a timer for reseting log level would be very 
helpful, user can simply config one time for a short time debug, no need to 
worry about forgetting to change it back. This referenced to the design of 
Apache Storm: 
[https://github.com/apache/storm/blob/master/docs/dynamic-log-level-settings.md]

2. Since flink already migrate to log4j2 , I think we should at least support 
log4j2 and log4j.
As you said before, log4j2 do have different means to configure the log level 
depending on the log4j2 version, but I found the way to configure log level 
which in your shared link will support all the log4j2 version as i known. Here 
is the 
[Log4j2ConfigWorker|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 which similar to storm's [LogConfigManager . 
|https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java]I
 also added the [LogConfigWorkerFactory 
|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.jlgb5jpklf9k]
 to show how to detect the logging backend.

In general
 * For a unsupported logging backend, Cluster will work as usual, since we do 
not depend on a specific implementation directly unless we detect an supported 
logging backend.
 * For an incompatible 
version([Log4j2ConfigWorker|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 should work for all log4j2 versions, here we assume there is an unexpected 
case.), dynamic log level setting may not work properly, also flink logging 
system may not work properly either.
* According to the current design, I think we can easily support logback as 
well although i am not familiar with it yet.

3. As above, i looked into the design of apache storm, seems storm only support 
log4j2. I will continue to do the research.

 

 

> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: xiaodao
>Priority: Minor
>
> sometimes we may need to change loglevel to get more information to resolved 
> bug, now we need to 

[jira] [Comment Edited] (FLINK-16478) add restApi to modify loglevel

2020-05-05 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099746#comment-17099746
 ] 

Xingxing Di edited comment on FLINK-16478 at 5/5/20, 3:38 PM:
--

Hi [~trohrmann], thanks for the comments.

1. About the scope thing, i totaly agree with you, we can start with the 
cluster wide log level, I've edit the google doc as well.
About the timer thing, i think a timer for reseting log level would be very 
helpful, user can simply config one time for a short time debug, no need to 
worry about forgetting to change it back. This referenced to the design of 
Apache Storm: 
[https://github.com/apache/storm/blob/master/docs/dynamic-log-level-settings.md]

2. Since flink already migrate to log4j2 , I think we should at least support 
log4j2 and log4j.
As you said before, log4j2 do have different means to configure the log level 
depending on the log4j2 version, but I found the way to configure log level 
which in your shared link will support all the log4j2 version as i known. Here 
is the 
[Log4j2ConfigWorker|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 which similar to storm's [LogConfigManager . 
|https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java]I
 also added the [LogConfigWorkerFactory 
|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.jlgb5jpklf9k]
 to show how to detect the logging backend.

In general
 * For a unsupported logging backend, Cluster will work as usual, since we do 
not depend on a specific implementation directly unless we detect an supported 
logging backend.
 * For an incompatible 
version([Log4j2ConfigWorker|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 should work for all log4j2 versions, here we assume there is an unexpected 
case.), dynamic log level setting may not work properly, also flink logging 
system may not work properly either.
* According to the current design, I think we can easily support logback as 
well although i am not familiar with it yet.

3. As above, i looked into the design of apache storm, seems storm only support 
log4j2. I will continue to do the research.

 

 


was (Author: dixingx...@yeah.net):
Hi [~trohrmann], thanks for the comments.

1. About the scope thing, i totaly agree with you, we can start with the 
cluster wide log level, I've edit the google doc as well.
About the timer thing, i think a timer for reseting log level would be very 
helpful, user can simply config one time for a short time debug, no need to 
worry about forgetting to change it back. This referenced to the design of 
Apache Storm: 
[https://github.com/apache/storm/blob/master/docs/dynamic-log-level-settings.md]

2. Since flink already migrate to log4j2 , I think we should at least support 
log4j2 and log4j.
As you said before, log4j2 do have different means to configure the log level 
depending on the log4j2 version, but I found the way to configure log level 
which in your shared link will support all the log4j2 version as i known. Here 
is the 
[Log4j2ConfigWorker|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 which similar to storm's [LogConfigManager . 
|https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java]I
 also added the [LogConfigWorkerFactory 
|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.jlgb5jpklf9k]
 to show how to detect the logging backend.

In general
 * For a unsupported logging backend, Cluster will work as usual, since we do 
not depend on a specific implementation directly unless we detect an supported 
logging backend.
 * For an incompatible 
version([Log4j2ConfigWorker|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 should work for all log4j2 versions, here we assume there is an unexpected 
case.), dynamic log level setting may not work properly, also flink logging 
system may not work properly either.
And under the current design, I think we can easily support logback as well 
although i am not familiar with it yet.

3. As above, i looked into the design of apache storm, seems storm only support 
log4j2. I will continue to do the research.

 

 

> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: xiaodao
>Priority: Minor
>
> sometimes we may need to change loglevel to get more informat

[jira] [Comment Edited] (FLINK-16478) add restApi to modify loglevel

2020-05-05 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099746#comment-17099746
 ] 

Xingxing Di edited comment on FLINK-16478 at 5/5/20, 10:45 AM:
---

Hi [~trohrmann], thanks for the comments.

1. About the scope thing, i totaly agree with you, we can start with the 
cluster wide log level, I've edit the google doc as well.
About the timer thing, i think a timer for reseting log level would be very 
helpful, user can simply config one time for a short time debug, no need to 
worry about forgetting to change it back. This referenced to the design of 
Apache Storm: 
[https://github.com/apache/storm/blob/master/docs/dynamic-log-level-settings.md]

2. Since flink already migrate to log4j2 , I think we should at least support 
log4j2 and log4j.
As you said before, log4j2 do have different means to configure the log level 
depending on the log4j2 version, but I found the way to configure log level 
which in your shared link will support all the log4j2 version as i known. Here 
is the 
[Log4j2ConfigWorker|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 which similar to storm's [LogConfigManager . 
|https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java]I
 also added the [LogConfigWorkerFactory 
|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.jlgb5jpklf9k]
 to show how to detect the logging backend.

In general
 * For a unsupported logging backend, Cluster will work as usual, since we do 
not depend on a specific implementation directly unless we detect an supported 
logging backend.
 * For an incompatible 
version([Log4j2ConfigWorker|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 should work for all log4j2 versions, here we assume there is an unexpected 
case.), dynamic log level setting may not work properly, also flink logging 
system may not work properly either.
And under the current design, I think we can easily support logback as well 
although i am not familiar with it yet.

3. As above, i looked into the design of apache storm, seems storm only support 
log4j2. I will continue to do the research.

 

 


was (Author: dixingx...@yeah.net):
Hi [~trohrmann], thanks for the comments.

1. About the scope thing, i totaly agree with you, we can start with the 
cluster wide log level, I've edit the google doc as well.

About the timer thing, i think a timer for reseting log level would be very 
helpful, user can simply config one time for a short time debug, no need to 
worry about forgetting to change it back. This referenced to the design of 
Apache Storm: 
[https://github.com/apache/storm/blob/master/docs/dynamic-log-level-settings.md]

2. Since flink already migrate to log4j2 , I think we should at least support 
log4j2 and log4j.

As you said before, log4j2 do have different means to configure the log level 
depending on the log4j2 version, but I found the way to configure log level 
which in your shared link will support all the log4j2 version as i known. Here 
is the 
[Log4j2ConfigWorker|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 which similar to storm's [LogConfigManager . 
|https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java]I
 also added the [LogConfigWorkerFactory 
|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.jlgb5jpklf9k]
 to show how to detect the logging backend.

In general
 * For a unsupported logging backend, Cluster will work as usual, since we do 
not depend on a specific implementation directly unless we detect an supported 
logging backend.
 * For an incompatible 
version([Log4j2ConfigWorker|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 should work for all log4j2 versions, here we assume there is an unexpected 
case.), dynamic log level setting may not work properly, also flink logging 
system may not work properly either.

And under the current design, I think we can easily support logback as well 
although i am not familiar with it yet.

 

3. As above, i looked into the design of apache storm, seems storm only support 
log4j2. I will continue to do the research.

 

 

> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: xiaodao
>Priority: Minor
>
> sometimes we may need to change loglevel to get more infor

[jira] [Comment Edited] (FLINK-16478) add restApi to modify loglevel

2020-05-05 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099746#comment-17099746
 ] 

Xingxing Di edited comment on FLINK-16478 at 5/5/20, 10:12 AM:
---

Hi [~trohrmann], thanks for the comments.

1. About the scope thing, i totaly agree with you, we can start with the 
cluster wide log level, I've edit the google doc as well.

About the timer thing, i think a timer for reseting log level would be very 
helpful, user can simply config one time for a short time debug, no need to 
worry about forgetting to change it back. This referenced to the design of 
Apache Storm: 
[https://github.com/apache/storm/blob/master/docs/dynamic-log-level-settings.md]

2. Since flink already migrate to log4j2 , I think we should at least support 
log4j2 and log4j.

As you said before, log4j2 do have different means to configure the log level 
depending on the log4j2 version, but I found the way to configure log level 
which in your shared link will support all the log4j2 version as i known. Here 
is the 
[Log4j2ConfigWorker|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 which similar to storm's [LogConfigManager . 
|https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java]I
 also added the [LogConfigWorkerFactory 
|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.jlgb5jpklf9k]
 to show how to detect the logging backend.

In general
 * For a unsupported logging backend, Cluster will work as usual, since we do 
not depend on a specific implementation directly unless we detect an supported 
logging backend.
 * For an incompatible 
version([Log4j2ConfigWorker|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 should work for all log4j2 versions, here we assume there is an unexpected 
case.), dynamic log level setting may not work properly, also flink logging 
system may not work properly either.

And under the current design, I think we can easily support logback as well 
although i am not familiar with it yet.

 

3. As above, i looked into the design of apache storm, seems storm only support 
log4j2. I will continue to do the research.

 

 


was (Author: dixingx...@yeah.net):
Hi [~trohrmann], thanks for the comments.

1. About the scope thing, i totaly agree with you, we can start with the 
cluster wide log level, I've edit the google doc as well.

About the timer thing, i think a timer for reseting log level would be very 
helpful, user can simply config one time for a short time debug, no need to 
worry about forgetting to change it back. This referenced to the design of 
Apache Storm: 
[https://github.com/apache/storm/blob/master/docs/dynamic-log-level-settings.md]

2. Since flink already migrate to log4j2 , I think we should at least support 
log4j2 and log4j.

As you said before, log4j2 do have different means to configure the log level 
depending on the log4j2 version, but I found the way to configure log level 
which in your shared link will support all the log4j2 version as i known. Here 
is 
[Log4j2ConfigWorker|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 which similar to storm's [LogConfigManager . 
|https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java]I
 also added the LogConfigWorkerFactory to show how to detect the logging 
backend.

In general
 * For a unsupported logging backend, Cluster will work as usual, since we do 
not depend on a specific implementation directly unless we detect an supported 
logging backend.
 * For an incompatible 
version([Log4j2ConfigWorke|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 should work for all log4j2 versions, here we assume there is an unexpected 
case.), dynamic log level setting may not work properly, also flink logging 
system may not work properly either.

And under the current design, I think we can easily support logback as well 
although i am not familiar with it yet.

 

3. As above, i looked into the design of apache storm, seems storm only support 
log4j2. I will continue to do the research.

 

 

> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: xiaodao
>Priority: Minor
>
> sometimes we may need to change loglevel to get more information to resolved 
> bug, now we need to stop it and modify conf/log4j.properties and resubmit it 
> ,i think 

[jira] [Commented] (FLINK-16478) add restApi to modify loglevel

2020-05-05 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099746#comment-17099746
 ] 

Xingxing Di commented on FLINK-16478:
-

Hi [~trohrmann], thanks for the comments.

1. About the scope thing, i totaly agree with you, we can start with the 
cluster wide log level, I've edit the google doc as well.

About the timer thing, i think a timer for reseting log level would be very 
helpful, user can simply config one time for a short time debug, no need to 
worry about forgetting to change it back. This referenced to the design of 
Apache Storm: 
[https://github.com/apache/storm/blob/master/docs/dynamic-log-level-settings.md]

2. Since flink already migrate to log4j2 , I think we should at least support 
log4j2 and log4j.

As you said before, log4j2 do have different means to configure the log level 
depending on the log4j2 version, but I found the way to configure log level 
which in your shared link will support all the log4j2 version as i known. Here 
is 
[Log4j2ConfigWorker|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 which similar to storm's [LogConfigManager . 
|https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java]I
 also added the LogConfigWorkerFactory to show how to detect the logging 
backend.

In general
 * For a unsupported logging backend, Cluster will work as usual, since we do 
not depend on a specific implementation directly unless we detect an supported 
logging backend.
 * For an incompatible 
version([Log4j2ConfigWorke|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY/edit#heading=h.fd0rccx9k6u7]
 should work for all log4j2 versions, here we assume there is an unexpected 
case.), dynamic log level setting may not work properly, also flink logging 
system may not work properly either.

And under the current design, I think we can easily support logback as well 
although i am not familiar with it yet.

 

3. As above, i looked into the design of apache storm, seems storm only support 
log4j2. I will continue to do the research.

 

 

> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: xiaodao
>Priority: Minor
>
> sometimes we may need to change loglevel to get more information to resolved 
> bug, now we need to stop it and modify conf/log4j.properties and resubmit it 
> ,i think it's better to add rest api to modify loglevel.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16478) add restApi to modify loglevel

2020-05-01 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17097714#comment-17097714
 ] 

Xingxing Di commented on FLINK-16478:
-

{quote}In general I think such a feature would be helpful for our users. What I 
would be interested in is how exactly it should be implemented. I guess a 
proper design with an implementation plan could help here.
{quote}
[~trohrmann] [~felixzheng] [~xiaodao], I wrote a new design doc with more 
details, could you help me to review it, I will impove it asap.

[Flink should support dynamic log level 
setting|https://docs.google.com/document/d/1Q02VSSBzlZaZzvxuChIo1uinw8KDQsyTZUut6_IDErY]
 

> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: xiaodao
>Priority: Minor
>
> sometimes we may need to change loglevel to get more information to resolved 
> bug, now we need to stop it and modify conf/log4j.properties and resubmit it 
> ,i think it's better to add rest api to modify loglevel.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17485) Add a thread dump REST API

2020-04-30 Thread Xingxing Di (Jira)
Xingxing Di created FLINK-17485:
---

 Summary: Add a thread dump REST API
 Key: FLINK-17485
 URL: https://issues.apache.org/jira/browse/FLINK-17485
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / REST
Reporter: Xingxing Di


My team build a streaming computing platform based on flink in our company 
internal.

As jobs and users grow, we spent lot's of time to help user with 
troubleshooting.

Currently we must logon the server which running task manager, find the right 
process through netstat -anp| grep "the flink data port", then run jstack 
command.

We think it will be very convenient if flink provide a REST API for thread 
dumping, with web UI support event better.

So we want to know:
 * If community is already working on this
 * Will this be a appropriate feature (add a REST API to dump threads), because 
on the other hand, thread dump may be "expensive"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16478) add restApi to modify loglevel

2020-04-29 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17095890#comment-17095890
 ] 

Xingxing Di commented on FLINK-16478:
-

 
{quote}[~xiaodao] [~dixingx...@yeah.net] Great work! Is it possible that users 
can change the log level in the PROCESS scope as well?
{quote}
[~felixzheng] Yes, we can also add a rest api for task manager,  which will 
work for just one  task manager.

> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: xiaodao
>Priority: Minor
>
> sometimes we may need to change loglevel to get more information to resolved 
> bug, now we need to stop it and modify conf/log4j.properties and resubmit it 
> ,i think it's better to add rest api to modify loglevel.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16478) add restApi to modify loglevel

2020-04-27 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17092471#comment-17092471
 ] 

Xingxing Di edited comment on FLINK-16478 at 4/27/20, 1:11 PM:
---

Hi [~xiaodao] , [~felixzheng], i am glad to see this issue, and i created an 
duplicated  issue 

[FLINK-17382|https://issues.apache.org/jira/projects/FLINK/issues/FLINK-17382] 
earlier. 

We've already added a rest api to support *log4j* dynamic log level setting in 
flink 1.9 , i'm also interested in working on this issue. 

The query parameter is :
{code:java}
?params=jobManager|org.apache.flink|debug|30|info{code}
*Which means:* Change job manager's "root" logger's level to "debug", and after 
30 seconds set it back to "info".

*More details:*
 type : jobManager or taskManager
 loggerName : the logger name, can be root or package name or full qualified 
class name
 level : ALL, TRACE, DEBUG, INFO, WARN, ERROR, FATAL ,OFF
 expireSecs : define a timer to set log level to the resetLevel after specific 
seconds
 resetLevel : the log level to set back, default is 'info'

Here is the code 
[1.9-log4j-level-setting|https://github.com/dixingxing0/flink/tree/1.9-log4j-level-setting].

It's only a simple version to support log4j, as the design by [~xiaodao], it 
should also work on logback and log4j2.

 


was (Author: dixingx...@yeah.net):
Hi [~xiaodao] , [~felixzheng], i am glad to see this issue, and i created an 
duplicated  issue 

[FLINK-17382|https://issues.apache.org/jira/projects/FLINK/issues/FLINK-17382] 
earlier. 

We've already added a rest api to support *log4j* dynamic log level setting in 
flink 1.9 , i'm also interested in working on this issue. 

The query parameter is :
{code:java}
?params=jobManager|root|debug|30|info{code}
*Which means:* Change job manager's "root" logger's level to "debug", and after 
30 seconds set it back to "info".

*More details:*
 type : jobManager or taskManager
 loggerName : the logger name
 level : ALL, TRACE, DEBUG, INFO, WARN, ERROR, FATAL ,OFF
 expireSecs : define a timer to set log level to the resetLevel after specific 
seconds
 resetLevel : the log level to set back, default is 'info'

Here is the code 
[1.9-log4j-level-setting|https://github.com/dixingxing0/flink/tree/1.9-log4j-level-setting].

It's only a simple version to support log4j, as the design by [~xiaodao], it 
should also work on logback and log4j2.

 

> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: xiaodao
>Priority: Minor
>
> sometimes we may need to change loglevel to get more information to resolved 
> bug, now we need to stop it and modify conf/log4j.properties and resubmit it 
> ,i think it's better to add rest api to modify loglevel.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16478) add restApi to modify loglevel

2020-04-27 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17093496#comment-17093496
 ] 

Xingxing Di commented on FLINK-16478:
-

[~fly_in_gis] Yes, you can use full qualified class name, e.g:

params=jobManager|org.apache.flink.runtime.resourcemanager.ResourceManager|debug|30|info

I will also edit my earlier comment to void misunderstanding.

 

> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: xiaodao
>Priority: Minor
>
> sometimes we may need to change loglevel to get more information to resolved 
> bug, now we need to stop it and modify conf/log4j.properties and resubmit it 
> ,i think it's better to add rest api to modify loglevel.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16478) add restApi to modify loglevel

2020-04-25 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17092478#comment-17092478
 ] 

Xingxing Di edited comment on FLINK-16478 at 4/26/20, 6:35 AM:
---

[~felixzheng] yes, that depends on how the logger name defined which usually 
defined by fully qualified class name.

So we just need to set loggerName part to "xxx.xxx.your.package/class".


was (Author: dixingx...@yeah.net):
[~felixzheng] yes, than depends on how the logger name defined which usually 
defined by fully qualified class name.

So we just need to set loggerName part to "xxx.xxx.your.package/class".

> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: xiaodao
>Priority: Minor
>
> sometimes we may need to change loglevel to get more information to resolved 
> bug, now we need to stop it and modify conf/log4j.properties and resubmit it 
> ,i think it's better to add rest api to modify loglevel.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16478) add restApi to modify loglevel

2020-04-25 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17092478#comment-17092478
 ] 

Xingxing Di commented on FLINK-16478:
-

[~felixzheng] yes, than depends on how the logger name defined which usually 
defined by fully qualified class name.

So we just need to set loggerName part to "xxx.xxx.your.package/class".

> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: xiaodao
>Priority: Minor
>
> sometimes we may need to change loglevel to get more information to resolved 
> bug, now we need to stop it and modify conf/log4j.properties and resubmit it 
> ,i think it's better to add rest api to modify loglevel.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16478) add restApi to modify loglevel

2020-04-25 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17092471#comment-17092471
 ] 

Xingxing Di edited comment on FLINK-16478 at 4/26/20, 4:13 AM:
---

Hi [~xiaodao] , [~felixzheng], i am glad to see this issue, and i created an 
duplicated  issue 

[FLINK-17382|https://issues.apache.org/jira/projects/FLINK/issues/FLINK-17382] 
earlier. 

We've already added a rest api to support *log4j* dynamic log level setting in 
flink 1.9 , i'm also interested in working on this issue. 

The query parameter is :
{code:java}
?params=jobManager|root|debug|30|info{code}
*Which means:* Change job manager's "root" logger's level to "debug", and after 
30 seconds set it back to "info".

*More details:*
 type : jobManager or taskManager
 loggerName : the logger name
 level : ALL, TRACE, DEBUG, INFO, WARN, ERROR, FATAL ,OFF
 expireSecs : define a timer to set log level to the resetLevel after specific 
seconds
 resetLevel : the log level to set back, default is 'info'

Here is the code 
[1.9-log4j-level-setting|https://github.com/dixingxing0/flink/tree/1.9-log4j-level-setting].

It's only a simple version to support log4j, as the design by [~xiaodao], it 
should also work on logback and log4j2.

 


was (Author: dixingx...@yeah.net):
Hi [~xiaodao] , [~felixzheng], i am glad to see this issue, and i created an 
duplicated  issue earlier 
([https://issues.apache.org/jira/projects/FLINK/issues/FLINK-17382]) 

We've already added a rest api to support *log4j* dynamic log level setting in 
flink 1.9 , i'm also interested in working on this issue. 

The query parameter is :
{code:java}
?params=jobManager|root|debug|30|info{code}
*Which means:* Change job manager's "root" logger's level to "debug", and after 
30 seconds set it back to "info".

*More details:*
 type : jobManager or taskManager
 loggerName : the logger name
 level : ALL, TRACE, DEBUG, INFO, WARN, ERROR, FATAL ,OFF
 expireSecs : define a timer to set log level to the resetLevel after specific 
seconds
 resetLevel : the log level to set back, default is 'info'

Here is the code: 
[https://github.com/dixingxing0/flink/tree/1.9-log4j-level-setting]

It's only a simple version to support log4j, as the design by [~xiaodao], it 
should also work on logback and log4j2.

 

> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: xiaodao
>Priority: Minor
>
> sometimes we may need to change loglevel to get more information to resolved 
> bug, now we need to stop it and modify conf/log4j.properties and resubmit it 
> ,i think it's better to add rest api to modify loglevel.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16478) add restApi to modify loglevel

2020-04-25 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17092471#comment-17092471
 ] 

Xingxing Di commented on FLINK-16478:
-

Hi [~xiaodao] , [~felixzheng], i am glad to see this issue, and i created an 
duplicated  issue earlier 
([https://issues.apache.org/jira/projects/FLINK/issues/FLINK-17382]) 

We've already added a rest api to support *log4j* dynamic log level setting in 
flink 1.9 , i'm also interested in working on this issue. 

The query parameter is :
{code:java}
?params=jobManager|root|debug|30|info{code}
*Which means:* Change job manager's "root" logger's level to "debug", and after 
30 seconds set it back to "info".

*More details:*
 type : jobManager or taskManager
 loggerName : the logger name
 level : ALL, TRACE, DEBUG, INFO, WARN, ERROR, FATAL ,OFF
 expireSecs : define a timer to set log level to the resetLevel after specific 
seconds
 resetLevel : the log level to set back, default is 'info'

Here is the code: 
[https://github.com/dixingxing0/flink/tree/1.9-log4j-level-setting]

It's only a simple version to support log4j, as the design by [~xiaodao], it 
should also work on logback and log4j2.

 

> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: xiaodao
>Priority: Minor
>
> sometimes we may need to change loglevel to get more information to resolved 
> bug, now we need to stop it and modify conf/log4j.properties and resubmit it 
> ,i think it's better to add rest api to modify loglevel.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-17382) Flink should support dynamic log level setting

2020-04-25 Thread Xingxing Di (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingxing Di closed FLINK-17382.
---
Resolution: Duplicate

> Flink should support dynamic log level setting
> --
>
> Key: FLINK-17382
> URL: https://issues.apache.org/jira/browse/FLINK-17382
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / REST, Runtime / Web Frontend
>Affects Versions: 1.7.2, 1.9.2
>Reporter: Xingxing Di
>Priority: Major
>
> Flink should support dynamic log level setting, currently we can not do that 
> through Flink UI.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17382) Flink should support dynamic log level setting

2020-04-25 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17092456#comment-17092456
 ] 

Xingxing Di commented on FLINK-17382:
-

[~felixzheng] thank you, it is duplicated.

> Flink should support dynamic log level setting
> --
>
> Key: FLINK-17382
> URL: https://issues.apache.org/jira/browse/FLINK-17382
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / REST, Runtime / Web Frontend
>Affects Versions: 1.7.2, 1.9.2
>Reporter: Xingxing Di
>Priority: Major
>
> Flink should support dynamic log level setting, currently we can not do that 
> through Flink UI.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17382) Flink should support dynamic log level setting

2020-04-25 Thread Xingxing Di (Jira)
Xingxing Di created FLINK-17382:
---

 Summary: Flink should support dynamic log level setting
 Key: FLINK-17382
 URL: https://issues.apache.org/jira/browse/FLINK-17382
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / REST, Runtime / Web Frontend
Affects Versions: 1.9.2, 1.7.2
Reporter: Xingxing Di


Flink should support dynamic log level setting, currently we can not do that 
through Flink UI.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-16819) Got KryoException while using UDAF in flink1.9

2020-04-18 Thread Xingxing Di (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingxing Di closed FLINK-16819.
---
Resolution: Fixed

the problem resolved by upgrading to 1.9.2

> Got KryoException while using UDAF in flink1.9
> --
>
> Key: FLINK-16819
> URL: https://issues.apache.org/jira/browse/FLINK-16819
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System, Table SQL / Planner
>Affects Versions: 1.9.1
> Environment: Flink1.9.1
> Apache hadoop 2.7.2
>Reporter: Xingxing Di
>Priority: Major
> Fix For: 1.9.2
>
>
> Recently,  we are trying to upgrade online *sql jobs* from flink1.7 to 
> flink1.9 , most jobs works fine, but some jobs got  KryoExceptions. 
> We found that UDAF will trigger this exception, btw ,we are using blink 
> planner.
> *Here is the full stack traces:*
>  2020-03-27 11:46:55
>  com.esotericsoftware.kryo.KryoException: 
> java.lang.IndexOutOfBoundsException: Index: 104, Size: 2
>  Serialization trace:
>  seed (java.util.Random)
>  gen (com.tdunning.math.stats.AVLTreeDigest)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>  at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>  at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>  at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
>  at 
> org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:536)
>  at 
> org.apache.flink.table.dataformat.BinaryGeneric.getJavaObjectFromBinaryGeneric(BinaryGeneric.java:86)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:628)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:633)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:320)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1293)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1257)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:302)
>  at GroupAggsHandler$71.setAccumulators(Unknown Source)
>  at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:151)
>  at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
>  at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.lang.IndexOutOfBoundsException: Index: 104, Size: 2
>  at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>  at java.util.ArrayList.get(ArrayList.java:433)
>  at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>  ... 26 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16819) Got KryoException while using UDAF in flink1.9

2020-04-18 Thread Xingxing Di (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingxing Di updated FLINK-16819:

Fix Version/s: 1.9.2

> Got KryoException while using UDAF in flink1.9
> --
>
> Key: FLINK-16819
> URL: https://issues.apache.org/jira/browse/FLINK-16819
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System, Table SQL / Planner
>Affects Versions: 1.9.1
> Environment: Flink1.9.1
> Apache hadoop 2.7.2
>Reporter: Xingxing Di
>Priority: Major
> Fix For: 1.9.2
>
>
> Recently,  we are trying to upgrade online *sql jobs* from flink1.7 to 
> flink1.9 , most jobs works fine, but some jobs got  KryoExceptions. 
> We found that UDAF will trigger this exception, btw ,we are using blink 
> planner.
> *Here is the full stack traces:*
>  2020-03-27 11:46:55
>  com.esotericsoftware.kryo.KryoException: 
> java.lang.IndexOutOfBoundsException: Index: 104, Size: 2
>  Serialization trace:
>  seed (java.util.Random)
>  gen (com.tdunning.math.stats.AVLTreeDigest)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>  at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>  at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>  at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
>  at 
> org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:536)
>  at 
> org.apache.flink.table.dataformat.BinaryGeneric.getJavaObjectFromBinaryGeneric(BinaryGeneric.java:86)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:628)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:633)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:320)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1293)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1257)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:302)
>  at GroupAggsHandler$71.setAccumulators(Unknown Source)
>  at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:151)
>  at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
>  at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.lang.IndexOutOfBoundsException: Index: 104, Size: 2
>  at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>  at java.util.ArrayList.get(ArrayList.java:433)
>  at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>  ... 26 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16819) Got KryoException while using UDAF in flink1.9

2020-04-18 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086465#comment-17086465
 ] 

Xingxing Di commented on FLINK-16819:
-

Hi [~jark] , the problem has been resolved by upgrading to 1.9.2, thanks a lot.

> Got KryoException while using UDAF in flink1.9
> --
>
> Key: FLINK-16819
> URL: https://issues.apache.org/jira/browse/FLINK-16819
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System, Table SQL / Planner
>Affects Versions: 1.9.1
> Environment: Flink1.9.1
> Apache hadoop 2.7.2
>Reporter: Xingxing Di
>Priority: Major
>
> Recently,  we are trying to upgrade online *sql jobs* from flink1.7 to 
> flink1.9 , most jobs works fine, but some jobs got  KryoExceptions. 
> We found that UDAF will trigger this exception, btw ,we are using blink 
> planner.
> *Here is the full stack traces:*
>  2020-03-27 11:46:55
>  com.esotericsoftware.kryo.KryoException: 
> java.lang.IndexOutOfBoundsException: Index: 104, Size: 2
>  Serialization trace:
>  seed (java.util.Random)
>  gen (com.tdunning.math.stats.AVLTreeDigest)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>  at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>  at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>  at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
>  at 
> org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:536)
>  at 
> org.apache.flink.table.dataformat.BinaryGeneric.getJavaObjectFromBinaryGeneric(BinaryGeneric.java:86)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:628)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:633)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:320)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1293)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1257)
>  at 
> org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:302)
>  at GroupAggsHandler$71.setAccumulators(Unknown Source)
>  at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:151)
>  at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
>  at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.lang.IndexOutOfBoundsException: Index: 104, Size: 2
>  at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>  at java.util.ArrayList.get(ArrayList.java:433)
>  at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)
>  at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>  ... 26 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17228) Streaming sql with nested GROUP BY got wrong results

2020-04-18 Thread Xingxing Di (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086464#comment-17086464
 ] 

Xingxing Di commented on FLINK-17228:
-

[~jark], Thanks a lot 

> Streaming sql with nested GROUP BY got wrong results
> 
>
> Key: FLINK-17228
> URL: https://issues.apache.org/jira/browse/FLINK-17228
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API, Table SQL / Runtime
>Affects Versions: 1.7.2
> Environment: Flink 1.7.2
> Parallelism is 1
>Reporter: Xingxing Di
>Priority: Blocker
>
> We are facing an special scenario, *we want to know if this feature is 
> supported*:
>  First count distinct deviceid for A,B dimensions, then sum up for just A 
> dimension.
>  Here is SQL:
> {code:java}
> SELECT dt, SUM(a.uv) AS uv
> FROM (
>  SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv
>  FROM streaming_log_event
>  WHERE action IN ('action1')
>  AND pvareaid NOT IN ('pv1', 'pv2')
>  AND pvareaid IS NOT NULL
>  GROUP BY dt, pvareaid
> ) a
> GROUP BY dt;{code}
> The question is the data emitted to sink was wrong, sink periodically got 
> smaller result ({color:#ff}86{color}) which was wrong,  here is the log:
> {code:java}
> 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
> data(false,0,86,20200417)
> 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
> data(true,0,130,20200417)
> 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
> data(false,0,130,20200417)
> 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive data(true,0,86,20200417)
> 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
> data(false,0,86,20200417)
> 2020-04-17 22:28:39,328INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
> data(true,0,131,20200417)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17228) Streaming sql with nested GROUP BY got wrong results

2020-04-17 Thread Xingxing Di (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingxing Di updated FLINK-17228:

Description: 
We are facing an special scenario, *we want to know if this feature is 
supported*:
 First count distinct deviceid for A,B dimensions, then sum up for just A 
dimension.
 Here is SQL:
{code:java}
SELECT dt, SUM(a.uv) AS uv
FROM (
 SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv
 FROM streaming_log_event
 WHERE action IN ('action1')
 AND pvareaid NOT IN ('pv1', 'pv2')
 AND pvareaid IS NOT NULL
 GROUP BY dt, pvareaid
) a
GROUP BY dt;{code}
The question is the data emitted to sink was wrong, sink periodically got 
smaller result ({color:#ff}86{color}) which was wrong,  here is the log:
{code:java}
2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,86,20200417)
2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,130,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,130,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,86,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,86,20200417)
2020-04-17 22:28:39,328INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,131,20200417)
{code}
 

  was:
We are facing an special scenario, *we want to know if this feature is 
supported*:
First count distinct deviceid for A,B dimensions, then sum up for just A 
dimension.
Here is SQL:
{code:java}
SELECT dt, SUM(a.uv) AS uv
FROM (
 SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv
 FROM streaming_log_event
 WHERE action IN ('action1')
 AND pvareaid NOT IN ('pv1', 'pv2')
 AND pvareaid IS NOT NULL
 GROUP BY dt, pvareaid
) a
GROUP BY dt;{code}
The question is the data emitted to sink was wrong, sink periodically got 
smaller result ({color:#FF}86{color}) which was wrong,  here is the log:
{code:java}

2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,86,20200417)
2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,130,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,130,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,86,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,86,20200417)
2020-04-17 22:28:39,328INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,131,20200417)
{code}
 


> Streaming sql with nested GROUP BY got wrong results
> 
>
> Key: FLINK-17228
> URL: https://issues.apache.org/jira/browse/FLINK-17228
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API, Table SQL / Runtime
>Affects Versions: 1.7.2
> Environment: Flink 1.7.2
> Parallelism is 1
>Reporter: Xingxing Di
>Priority: Blocker
>
> We are facing an special scenario, *we want to know if this feature is 
> supported*:
>  First count distinct deviceid for A,B dimensions, then sum up for just A 
> dimension.
>  Here is SQL:
> {code:java}
> SELECT dt, SUM(a.uv) AS uv
> FROM (
>  SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv
>  FROM streaming_log_event
>  WHERE action IN ('action1')
>  AND pvareaid NOT IN ('pv1', 'pv2')
>  AND pvareaid IS NOT NULL
>  GROUP BY dt, pvareaid
> ) a
> GROUP BY dt;{code}
> The question is the data emitted to sink was wrong, sink periodically got 
> smaller result ({color:#ff}86{color}) which was wrong,  here is the log:
> {code:java}
> 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
> data(false,0,86,20200417)
> 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
> data(true,0,130,20200417)
> 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
> data(false,0,130,20200417)
> 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: 

[jira] [Created] (FLINK-17228) Streaming sql with nested GROUP BY got wrong results

2020-04-17 Thread Xingxing Di (Jira)
Xingxing Di created FLINK-17228:
---

 Summary: Streaming sql with nested GROUP BY got wrong results
 Key: FLINK-17228
 URL: https://issues.apache.org/jira/browse/FLINK-17228
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API, Table SQL / Runtime
Affects Versions: 1.7.2
 Environment: Flink 1.7.2

Parallelism is 1
Reporter: Xingxing Di


We are facing an special scenario, *we want to know if this feature is 
supported*:
First count distinct deviceid for A,B dimensions, then sum up for just A 
dimension.
Here is SQL:
{code:java}
SELECT dt, SUM(a.uv) AS uv
FROM (
 SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv
 FROM streaming_log_event
 WHERE action IN ('action1')
 AND pvareaid NOT IN ('pv1', 'pv2')
 AND pvareaid IS NOT NULL
 GROUP BY dt, pvareaid
) a
GROUP BY dt;{code}
The question is the data emitted to sink was wrong, sink periodically got 
smaller result ({color:#FF}86{color}) which was wrong,  here is the log:
{code:java}

2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,86,20200417)
2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,130,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,130,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,86,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,86,20200417)
2020-04-17 22:28:39,328INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,131,20200417)
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16819) Got KryoException while using UDAF in flink1.9

2020-03-26 Thread Xingxing Di (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingxing Di updated FLINK-16819:

Description: 
Recently,  we are trying to upgrade online *sql jobs* from flink1.7 to flink1.9 
, most jobs works fine, but some jobs got  KryoExceptions. 

We found that UDAF will trigger this exception, btw ,we are using blink planner.

*Here is the full stack traces:*
 2020-03-27 11:46:55
 com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: 
Index: 104, Size: 2
 Serialization trace:
 seed (java.util.Random)
 gen (com.tdunning.math.stats.AVLTreeDigest)
 at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
 at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
 at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
 at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
 at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
 at 
org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:536)
 at 
org.apache.flink.table.dataformat.BinaryGeneric.getJavaObjectFromBinaryGeneric(BinaryGeneric.java:86)
 at 
org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:628)
 at 
org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:633)
 at 
org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:320)
 at 
org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1293)
 at 
org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1257)
 at 
org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:302)
 at GroupAggsHandler$71.setAccumulators(Unknown Source)
 at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:151)
 at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
 at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
 at java.lang.Thread.run(Thread.java:748)
 Caused by: java.lang.IndexOutOfBoundsException: Index: 104, Size: 2
 at java.util.ArrayList.rangeCheck(ArrayList.java:657)
 at java.util.ArrayList.get(ArrayList.java:433)
 at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
 at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)
 at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
 ... 26 more

  was:
Recently,  we are trying to upgrade online *sql jobs* from flink1.7 to flink1.9 
, most jobs works fine, but some jobs got  KryoExceptions. 

We found that UDAF will trigger this exception, btw ,we are using blink planner.

Here is the full stack trace:

```
 2020-03-27 11:46:55
 com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: 
Index: 104, Size: 2
 Serialization trace:
 seed (java.util.Random)
 gen (com.tdunning.math.stats.AVLTreeDigest)
 at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
 at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
 at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
 at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
 at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
 at 
org.apa

[jira] [Updated] (FLINK-16819) Got KryoException while using UDAF in flink1.9

2020-03-26 Thread Xingxing Di (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingxing Di updated FLINK-16819:

Description: 
Recently,  we are trying to upgrade online *sql jobs* from flink1.7 to flink1.9 
, most jobs works fine, but some jobs got  KryoExceptions. 

We found that UDAF will trigger this exception, btw ,we are using blink planner.

Here is the full stack trace:

```
 2020-03-27 11:46:55
 com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: 
Index: 104, Size: 2
 Serialization trace:
 seed (java.util.Random)
 gen (com.tdunning.math.stats.AVLTreeDigest)
 at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
 at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
 at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
 at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
 at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
 at 
org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:536)
 at 
org.apache.flink.table.dataformat.BinaryGeneric.getJavaObjectFromBinaryGeneric(BinaryGeneric.java:86)
 at 
org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:628)
 at 
org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:633)
 at 
org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:320)
 at 
org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1293)
 at 
org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1257)
 at 
org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:302)
 at GroupAggsHandler$71.setAccumulators(Unknown Source)
 at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:151)
 at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
 at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
 at java.lang.Thread.run(Thread.java:748)
 Caused by: java.lang.IndexOutOfBoundsException: Index: 104, Size: 2
 at java.util.ArrayList.rangeCheck(ArrayList.java:657)
 at java.util.ArrayList.get(ArrayList.java:433)
 at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
 at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)
 at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
 ... 26 more
  ```

  was:
Recently,  we are trying to upgrade online *sql jobs* from flink1.7 to flink1.9 
, most jobs works fine, but some jobs got  KryoExceptions. 

We found that UDAF will trigger this exception, btw ,we are using blink planner.

Here is the full stack trace:
2020-03-27 11:46:55
com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: 
Index: 104, Size: 2
Serialization trace:
seed (java.util.Random)
gen (com.tdunning.math.stats.AVLTreeDigest)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at 
org.apache.flink.

[jira] [Created] (FLINK-16819) Got KryoException while using UDAF in flink1.9

2020-03-26 Thread Xingxing Di (Jira)
Xingxing Di created FLINK-16819:
---

 Summary: Got KryoException while using UDAF in flink1.9
 Key: FLINK-16819
 URL: https://issues.apache.org/jira/browse/FLINK-16819
 Project: Flink
  Issue Type: Bug
  Components: API / Type Serialization System, Table SQL / Planner
Affects Versions: 1.9.1
 Environment: Flink1.9.1

Apache hadoop 2.7.2
Reporter: Xingxing Di


Recently,  we are trying to upgrade online *sql jobs* from flink1.7 to flink1.9 
, most jobs works fine, but some jobs got  KryoExceptions. 

We found that UDAF will trigger this exception, btw ,we are using blink planner.

Here is the full stack trace:
2020-03-27 11:46:55
com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: 
Index: 104, Size: 2
Serialization trace:
seed (java.util.Random)
gen (com.tdunning.math.stats.AVLTreeDigest)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at 
org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:536)
at 
org.apache.flink.table.dataformat.BinaryGeneric.getJavaObjectFromBinaryGeneric(BinaryGeneric.java:86)
at 
org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:628)
at 
org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:633)
at 
org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:320)
at 
org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1293)
at 
org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1257)
at 
org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:302)
at GroupAggsHandler$71.setAccumulators(Unknown Source)
at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:151)
at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: Index: 104, Size: 2
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 26 more
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)