[jira] (FLINK-34651) The HiveTableSink of Flink does not support writing to S3

2024-03-19 Thread shizhengchao (Jira)


[ https://issues.apache.org/jira/browse/FLINK-34651 ]


shizhengchao deleted comment on FLINK-34651:
--

was (Author: tinny):
a

> The HiveTableSink of Flink does not support writing to S3
> -
>
> Key: FLINK-34651
> URL: https://issues.apache.org/jira/browse/FLINK-34651
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.13.6, 1.14.6, 1.15.4, 1.16.3, 1.17.2, 1.18.1
>Reporter: shizhengchao
>Priority: Blocker
>
> My Hive table is located on S3. When I try to write to Hive using Flink 
> Streaming SQL, I find that it does not support writing to S3. Furthermore, 
> this issue has not been fixed in the latest version. The error I got is as 
> follows:
> {code:java}
> //代码占位符
> java.io.IOException: No FileSystem for scheme: s3
>     at 
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
>     at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
>     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
>     at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
>     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
>     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
>     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
>     at 
> org.apache.flink.connectors.hive.HadoopFileSystemFactory.create(HadoopFileSystemFactory.java:44)
>     at 
> org.apache.flink.table.filesystem.stream.StreamingSink.lambda$compactionWriter$8dbc1825$1(StreamingSink.java:95)
>     at 
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.initializeState(CompactCoordinator.java:102)
>     at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>     at java.lang.Thread.run(Thread.java:750)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34651) The HiveTableSink of Flink does not support writing to S3

2024-03-19 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828226#comment-17828226
 ] 

shizhengchao commented on FLINK-34651:
--

a

> The HiveTableSink of Flink does not support writing to S3
> -
>
> Key: FLINK-34651
> URL: https://issues.apache.org/jira/browse/FLINK-34651
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.13.6, 1.14.6, 1.15.4, 1.16.3, 1.17.2, 1.18.1
>Reporter: shizhengchao
>Priority: Blocker
>
> My Hive table is located on S3. When I try to write to Hive using Flink 
> Streaming SQL, I find that it does not support writing to S3. Furthermore, 
> this issue has not been fixed in the latest version. The error I got is as 
> follows:
> {code:java}
> //代码占位符
> java.io.IOException: No FileSystem for scheme: s3
>     at 
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
>     at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
>     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
>     at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
>     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
>     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
>     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
>     at 
> org.apache.flink.connectors.hive.HadoopFileSystemFactory.create(HadoopFileSystemFactory.java:44)
>     at 
> org.apache.flink.table.filesystem.stream.StreamingSink.lambda$compactionWriter$8dbc1825$1(StreamingSink.java:95)
>     at 
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.initializeState(CompactCoordinator.java:102)
>     at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>     at java.lang.Thread.run(Thread.java:750)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34651) The HiveTableSink of Flink does not support writing to S3

2024-03-19 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828225#comment-17828225
 ] 

shizhengchao commented on FLINK-34651:
--

[~easonqin]  It won't take effect because the HiveTableSink directly uses the 
HadoopFileSystemFactory.

> The HiveTableSink of Flink does not support writing to S3
> -
>
> Key: FLINK-34651
> URL: https://issues.apache.org/jira/browse/FLINK-34651
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.13.6, 1.14.6, 1.15.4, 1.16.3, 1.17.2, 1.18.1
>Reporter: shizhengchao
>Priority: Blocker
>
> My Hive table is located on S3. When I try to write to Hive using Flink 
> Streaming SQL, I find that it does not support writing to S3. Furthermore, 
> this issue has not been fixed in the latest version. The error I got is as 
> follows:
> {code:java}
> //代码占位符
> java.io.IOException: No FileSystem for scheme: s3
>     at 
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
>     at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
>     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
>     at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
>     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
>     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
>     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
>     at 
> org.apache.flink.connectors.hive.HadoopFileSystemFactory.create(HadoopFileSystemFactory.java:44)
>     at 
> org.apache.flink.table.filesystem.stream.StreamingSink.lambda$compactionWriter$8dbc1825$1(StreamingSink.java:95)
>     at 
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.initializeState(CompactCoordinator.java:102)
>     at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>     at java.lang.Thread.run(Thread.java:750)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34651) The HiveTableSink of Flink does not support writing to S3

2024-03-13 Thread shizhengchao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhengchao updated FLINK-34651:
-
Priority: Blocker  (was: Major)

> The HiveTableSink of Flink does not support writing to S3
> -
>
> Key: FLINK-34651
> URL: https://issues.apache.org/jira/browse/FLINK-34651
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.13.6, 1.14.6, 1.15.4, 1.16.3, 1.17.2, 1.18.1
>Reporter: shizhengchao
>Priority: Blocker
>
> My Hive table is located on S3. When I try to write to Hive using Flink 
> Streaming SQL, I find that it does not support writing to S3. Furthermore, 
> this issue has not been fixed in the latest version. The error I got is as 
> follows:
> {code:java}
> //代码占位符
> java.io.IOException: No FileSystem for scheme: s3
>     at 
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
>     at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
>     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
>     at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
>     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
>     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
>     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
>     at 
> org.apache.flink.connectors.hive.HadoopFileSystemFactory.create(HadoopFileSystemFactory.java:44)
>     at 
> org.apache.flink.table.filesystem.stream.StreamingSink.lambda$compactionWriter$8dbc1825$1(StreamingSink.java:95)
>     at 
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.initializeState(CompactCoordinator.java:102)
>     at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>     at java.lang.Thread.run(Thread.java:750)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34651) The HiveTableSink of Flink does not support writing to S3

2024-03-13 Thread shizhengchao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhengchao updated FLINK-34651:
-
Affects Version/s: 1.18.1
   1.17.2
   1.15.4
   1.14.6
   1.13.6

> The HiveTableSink of Flink does not support writing to S3
> -
>
> Key: FLINK-34651
> URL: https://issues.apache.org/jira/browse/FLINK-34651
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.13.6, 1.14.6, 1.15.4, 1.16.3, 1.17.2, 1.18.1
>Reporter: shizhengchao
>Priority: Major
>
> My Hive table is located on S3. When I try to write to Hive using Flink 
> Streaming SQL, I find that it does not support writing to S3. Furthermore, 
> this issue has not been fixed in the latest version. The error I got is as 
> follows:
> {code:java}
> //代码占位符
> java.io.IOException: No FileSystem for scheme: s3
>     at 
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
>     at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
>     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
>     at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
>     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
>     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
>     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
>     at 
> org.apache.flink.connectors.hive.HadoopFileSystemFactory.create(HadoopFileSystemFactory.java:44)
>     at 
> org.apache.flink.table.filesystem.stream.StreamingSink.lambda$compactionWriter$8dbc1825$1(StreamingSink.java:95)
>     at 
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.initializeState(CompactCoordinator.java:102)
>     at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>     at java.lang.Thread.run(Thread.java:750)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34651) The HiveTableSink of Flink does not support writing to S3

2024-03-13 Thread shizhengchao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhengchao updated FLINK-34651:
-
Affects Version/s: 1.16.3
   (was: 1.13.6)

> The HiveTableSink of Flink does not support writing to S3
> -
>
> Key: FLINK-34651
> URL: https://issues.apache.org/jira/browse/FLINK-34651
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.16.3
>Reporter: shizhengchao
>Priority: Major
>
> My Hive table is located on S3. When I try to write to Hive using Flink 
> Streaming SQL, I find that it does not support writing to S3. Furthermore, 
> this issue has not been fixed in the latest version. The error I got is as 
> follows:
> {code:java}
> //代码占位符
> java.io.IOException: No FileSystem for scheme: s3
>     at 
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
>     at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
>     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
>     at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
>     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
>     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
>     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
>     at 
> org.apache.flink.connectors.hive.HadoopFileSystemFactory.create(HadoopFileSystemFactory.java:44)
>     at 
> org.apache.flink.table.filesystem.stream.StreamingSink.lambda$compactionWriter$8dbc1825$1(StreamingSink.java:95)
>     at 
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.initializeState(CompactCoordinator.java:102)
>     at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>     at java.lang.Thread.run(Thread.java:750)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34651) The HiveTableSink of Flink does not support writing to S3

2024-03-11 Thread shizhengchao (Jira)
shizhengchao created FLINK-34651:


 Summary: The HiveTableSink of Flink does not support writing to S3
 Key: FLINK-34651
 URL: https://issues.apache.org/jira/browse/FLINK-34651
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Affects Versions: 1.13.6
Reporter: shizhengchao


My Hive table is located on S3. When I try to write to Hive using Flink 
Streaming SQL, I find that it does not support writing to S3. Furthermore, this 
issue has not been fixed in the latest version. The error I got is as follows:
{code:java}
//代码占位符
java.io.IOException: No FileSystem for scheme: s3
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
    at 
org.apache.flink.connectors.hive.HadoopFileSystemFactory.create(HadoopFileSystemFactory.java:44)
    at 
org.apache.flink.table.filesystem.stream.StreamingSink.lambda$compactionWriter$8dbc1825$1(StreamingSink.java:95)
    at 
org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.initializeState(CompactCoordinator.java:102)
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
    at java.lang.Thread.run(Thread.java:750)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-26541) SQL Client should support submitting SQL jobs in application mode

2023-07-28 Thread shizhengchao (Jira)


[ https://issues.apache.org/jira/browse/FLINK-26541 ]


shizhengchao deleted comment on FLINK-26541:
--

was (Author: tinny):
Why do not use session mode?If application mode supports sql, then I think 
there is no different whith session mode.

> SQL Client should support submitting SQL jobs in application mode
> -
>
> Key: FLINK-26541
> URL: https://issues.apache.org/jira/browse/FLINK-26541
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes, Deployment / YARN, Table SQL / 
> Client
>Reporter: Jark Wu
>Priority: Major
>
> Currently, the SQL Client only supports submitting jobs in session mode and 
> per-job mode. As the community going to drop the per-job mode (FLINK-26000), 
> SQL Client should support application mode as well. Otherwise, SQL Client can 
> only submit SQL in session mode then, but streaming jobs should be submitted 
> in per-job or application mode to have bettter resource isolation.
> Disucssions: https://lists.apache.org/thread/2yq351nb721x23rz1q8qlyf2tqrk147r



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26541) SQL Client should support submitting SQL jobs in application mode

2023-07-26 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17747428#comment-17747428
 ] 

shizhengchao commented on FLINK-26541:
--

Why do not use session mode?If application mode supports sql, then I think 
there is no different whith session mode.

> SQL Client should support submitting SQL jobs in application mode
> -
>
> Key: FLINK-26541
> URL: https://issues.apache.org/jira/browse/FLINK-26541
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes, Deployment / YARN, Table SQL / 
> Client
>Reporter: Jark Wu
>Priority: Major
>
> Currently, the SQL Client only supports submitting jobs in session mode and 
> per-job mode. As the community going to drop the per-job mode (FLINK-26000), 
> SQL Client should support application mode as well. Otherwise, SQL Client can 
> only submit SQL in session mode then, but streaming jobs should be submitted 
> in per-job or application mode to have bettter resource isolation.
> Disucssions: https://lists.apache.org/thread/2yq351nb721x23rz1q8qlyf2tqrk147r



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29772) Kafka table source scan blocked

2022-10-26 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624836#comment-17624836
 ] 

shizhengchao commented on FLINK-29772:
--

my case is kafka interval join,deadlock occurs when using rocksdb

> Kafka table source scan blocked
> ---
>
> Key: FLINK-29772
> URL: https://issues.apache.org/jira/browse/FLINK-29772
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.2
>Reporter: shizhengchao
>Priority: Major
>
> {code:java}
> //
> "Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, 
> watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, 
> cluster_name, server_time, server_time_s, client_time, client_time_s, imei, 
> request_id, owner_id, service_id, content_id, sign_id, receiver_type, 
> msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, 
> array_info_imei, phone, channel_id, process_time, code, msg, receiver, 
> content_type, android_version, apk_version]) -> Calc(select=[data_type, 
> server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS 
> proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> 
> Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', 
> _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), 
> (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
> SET "UTF-16LE") AND server_time IS NOT NULL), server_time, 
> CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
> 10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), 
> (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
> SET "UTF-16LE") AND client_time IS NOT NULL), client_time, 
> CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
> 10) AS client_time, IF(((data_type = 
> _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
> (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS 
> send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], 
> where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) 
> CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER 
> SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=77 
> BLOCKED on java.lang.Object@4aa3fe44 owned by "Legacy Source Thread - Source: 
> TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, 
> watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, 
> cluster_name, server_time, server_time_s, client_time, client_time_s, imei, 
> request_id, owner_id, service_id, content_id, sign_id, receiver_type, 
> msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, 
> array_info_imei, phone, channel_id, process_time, code, msg, receiver, 
> content_type, android_version, apk_version]) -> Calc(select=[data_type, 
> server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS 
> proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> 
> Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', 
> _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), 
> (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
> SET "UTF-16LE") AND server_time IS NOT NULL), server_time, 
> CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
> 10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), 
> (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
> SET "UTF-16LE") AND client_time IS NOT NULL), client_time, 
> CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
> 10) AS client_time, IF(((data_type = 
> _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
> (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS 
> send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], 
> where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) 
> CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER 
> SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=87
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> -blocked on java.lang.Object@4aa3fe44
> at 

[jira] (FLINK-29772) Kafka table source scan blocked

2022-10-26 Thread shizhengchao (Jira)


[ https://issues.apache.org/jira/browse/FLINK-29772 ]


shizhengchao deleted comment on FLINK-29772:
--

was (Author: tinny):
kafka interval join, deadlock occurs when using rocksdb

> Kafka table source scan blocked
> ---
>
> Key: FLINK-29772
> URL: https://issues.apache.org/jira/browse/FLINK-29772
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.2
>Reporter: shizhengchao
>Priority: Major
>
> {code:java}
> //
> "Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, 
> watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, 
> cluster_name, server_time, server_time_s, client_time, client_time_s, imei, 
> request_id, owner_id, service_id, content_id, sign_id, receiver_type, 
> msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, 
> array_info_imei, phone, channel_id, process_time, code, msg, receiver, 
> content_type, android_version, apk_version]) -> Calc(select=[data_type, 
> server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS 
> proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> 
> Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', 
> _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), 
> (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
> SET "UTF-16LE") AND server_time IS NOT NULL), server_time, 
> CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
> 10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), 
> (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
> SET "UTF-16LE") AND client_time IS NOT NULL), client_time, 
> CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
> 10) AS client_time, IF(((data_type = 
> _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
> (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS 
> send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], 
> where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) 
> CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER 
> SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=77 
> BLOCKED on java.lang.Object@4aa3fe44 owned by "Legacy Source Thread - Source: 
> TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, 
> watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, 
> cluster_name, server_time, server_time_s, client_time, client_time_s, imei, 
> request_id, owner_id, service_id, content_id, sign_id, receiver_type, 
> msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, 
> array_info_imei, phone, channel_id, process_time, code, msg, receiver, 
> content_type, android_version, apk_version]) -> Calc(select=[data_type, 
> server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS 
> proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> 
> Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', 
> _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), 
> (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
> SET "UTF-16LE") AND server_time IS NOT NULL), server_time, 
> CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
> 10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), 
> (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
> SET "UTF-16LE") AND client_time IS NOT NULL), client_time, 
> CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
> 10) AS client_time, IF(((data_type = 
> _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
> (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS 
> send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], 
> where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) 
> CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER 
> SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=87
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> -blocked on java.lang.Object@4aa3fe44
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> 

[jira] [Commented] (FLINK-29772) Kafka table source scan blocked

2022-10-26 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624834#comment-17624834
 ] 

shizhengchao commented on FLINK-29772:
--

kafka interval join, deadlock occurs when using rocksdb

> Kafka table source scan blocked
> ---
>
> Key: FLINK-29772
> URL: https://issues.apache.org/jira/browse/FLINK-29772
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.2
>Reporter: shizhengchao
>Priority: Major
>
> {code:java}
> //
> "Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, 
> watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, 
> cluster_name, server_time, server_time_s, client_time, client_time_s, imei, 
> request_id, owner_id, service_id, content_id, sign_id, receiver_type, 
> msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, 
> array_info_imei, phone, channel_id, process_time, code, msg, receiver, 
> content_type, android_version, apk_version]) -> Calc(select=[data_type, 
> server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS 
> proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> 
> Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', 
> _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), 
> (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
> SET "UTF-16LE") AND server_time IS NOT NULL), server_time, 
> CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
> 10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), 
> (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
> SET "UTF-16LE") AND client_time IS NOT NULL), client_time, 
> CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
> 10) AS client_time, IF(((data_type = 
> _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
> (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS 
> send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], 
> where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) 
> CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER 
> SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=77 
> BLOCKED on java.lang.Object@4aa3fe44 owned by "Legacy Source Thread - Source: 
> TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, 
> watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, 
> cluster_name, server_time, server_time_s, client_time, client_time_s, imei, 
> request_id, owner_id, service_id, content_id, sign_id, receiver_type, 
> msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, 
> array_info_imei, phone, channel_id, process_time, code, msg, receiver, 
> content_type, android_version, apk_version]) -> Calc(select=[data_type, 
> server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS 
> proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> 
> Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', 
> _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), 
> (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
> SET "UTF-16LE") AND server_time IS NOT NULL), server_time, 
> CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
> 10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), 
> (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
> SET "UTF-16LE") AND client_time IS NOT NULL), client_time, 
> CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
> 10) AS client_time, IF(((data_type = 
> _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
> (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS 
> send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], 
> where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) 
> CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER 
> SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=87
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> -blocked on java.lang.Object@4aa3fe44
> at 

[jira] [Updated] (FLINK-29772) Kafka table source scan blocked

2022-10-26 Thread shizhengchao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhengchao updated FLINK-29772:
-
Description: 
{code:java}
//
"Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, 
watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, 
cluster_name, server_time, server_time_s, client_time, client_time_s, imei, 
request_id, owner_id, service_id, content_id, sign_id, receiver_type, msg_type, 
handle_type, reach_type, source_type, create_time, msg_id, imsi, 
array_info_imei, phone, channel_id, process_time, code, msg, receiver, 
content_type, android_version, apk_version]) -> Calc(select=[data_type, 
server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS 
proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> 
Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', 
_UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), 
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
SET "UTF-16LE") AND server_time IS NOT NULL), server_time, 
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), 
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
SET "UTF-16LE") AND client_time IS NOT NULL), client_time, 
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
10) AS client_time, IF(((data_type = 
_UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
(code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS 
send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], 
where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) 
CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER SET 
"UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=77 BLOCKED on 
java.lang.Object@4aa3fe44 owned by "Legacy Source Thread - Source: 
TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, 
watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, 
cluster_name, server_time, server_time_s, client_time, client_time_s, imei, 
request_id, owner_id, service_id, content_id, sign_id, receiver_type, msg_type, 
handle_type, reach_type, source_type, create_time, msg_id, imsi, 
array_info_imei, phone, channel_id, process_time, code, msg, receiver, 
content_type, android_version, apk_version]) -> Calc(select=[data_type, 
server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS 
proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> 
Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', 
_UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), 
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
SET "UTF-16LE") AND server_time IS NOT NULL), server_time, 
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), 
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
SET "UTF-16LE") AND client_time IS NOT NULL), client_time, 
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
10) AS client_time, IF(((data_type = 
_UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
(code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS 
send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], 
where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) 
CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER SET 
"UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=87
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
-blocked on java.lang.Object@4aa3fe44
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
at 

[jira] [Updated] (FLINK-29772) Kafka table source scan blocked

2022-10-26 Thread shizhengchao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhengchao updated FLINK-29772:
-
Description: 
{code:java}
//
"Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, 
watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, 
cluster_name, server_time, server_time_s, client_time, client_time_s, imei, 
request_id, owner_id, service_id, content_id, sign_id, receiver_type, msg_type, 
handle_type, reach_type, source_type, create_time, msg_id, imsi, 
array_info_imei, phone, channel_id, process_time, code, msg, receiver, 
content_type, android_version, apk_version]) -> Calc(select=[data_type, 
server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS 
proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> 
Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', 
_UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), 
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
SET "UTF-16LE") AND server_time IS NOT NULL), server_time, 
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), 
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
SET "UTF-16LE") AND client_time IS NOT NULL), client_time, 
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
10) AS client_time, IF(((data_type = 
_UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
(code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS 
send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], 
where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) 
CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER SET 
"UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=77 
{color:red}BLOCKED on java.lang.Object@4aa3fe44 owned by{color} "Legacy Source 
Thread - Source: TableSourceScan(table=[[ostream, user_mart, 
dwd_ads_isms_msgmiddle, watermark=[-(toTimeStamps($2), 1:INTERVAL 
SECOND)]]], fields=[data_type, cluster_name, server_time, server_time_s, 
client_time, client_time_s, imei, request_id, owner_id, service_id, content_id, 
sign_id, receiver_type, msg_type, handle_type, reach_type, source_type, 
create_time, msg_id, imsi, array_info_imei, phone, channel_id, process_time, 
code, msg, receiver, content_type, android_version, apk_version]) -> 
Calc(select=[data_type, server_time, client_time, msg_id, array_info_imei, 
code, PROCTIME() AS proctime, Reinterpret(toTimeStamps(server_time)) AS 
rowtime]) -> Calc(select=[array_info_imei AS imei, REPLACE(msg_id, 
_UTF-16LE'#', _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, 
Sarg[(-∞.._UTF-16LE'NULL'), (_UTF-16LE'NULL'.._UTF-16LE'null'), 
(_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER SET "UTF-16LE") AND server_time IS NOT 
NULL), server_time, 
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), 
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
SET "UTF-16LE") AND client_time IS NOT NULL), client_time, 
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
10) AS client_time, IF(((data_type = 
_UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
(code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS 
send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], 
where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) 
CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER SET 
"UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=87
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
-{color:red} blocked on java.lang.Object@4aa3fe44{color}
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
at 

[jira] [Created] (FLINK-29772) Kafka table source scan blocked

2022-10-26 Thread shizhengchao (Jira)
shizhengchao created FLINK-29772:


 Summary: Kafka table source scan blocked
 Key: FLINK-29772
 URL: https://issues.apache.org/jira/browse/FLINK-29772
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.13.2
Reporter: shizhengchao


{code:java}
//
"Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, 
watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, 
cluster_name, server_time, server_time_s, client_time, client_time_s, imei, 
request_id, owner_id, service_id, content_id, sign_id, receiver_type, msg_type, 
handle_type, reach_type, source_type, create_time, msg_id, imsi, 
array_info_imei, phone, channel_id, process_time, code, msg, receiver, 
content_type, android_version, apk_version]) -> Calc(select=[data_type, 
server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS 
proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> 
Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', 
_UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), 
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
SET "UTF-16LE") AND server_time IS NOT NULL), server_time, 
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), 
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
SET "UTF-16LE") AND client_time IS NOT NULL), client_time, 
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
10) AS client_time, IF(((data_type = 
_UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
(code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS 
send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], 
where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) 
CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER SET 
"UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=77 BLOCKED on 
java.lang.Object@4aa3fe44 owned by "Legacy Source Thread - Source: 
TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, 
watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, 
cluster_name, server_time, server_time_s, client_time, client_time_s, imei, 
request_id, owner_id, service_id, content_id, sign_id, receiver_type, msg_type, 
handle_type, reach_type, source_type, create_time, msg_id, imsi, 
array_info_imei, phone, channel_id, process_time, code, msg, receiver, 
content_type, android_version, apk_version]) -> Calc(select=[data_type, 
server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS 
proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> 
Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', 
_UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), 
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
SET "UTF-16LE") AND server_time IS NOT NULL), server_time, 
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), 
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER 
SET "UTF-16LE") AND client_time IS NOT NULL), client_time, 
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 
10) AS client_time, IF(((data_type = 
_UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
(code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS 
send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], 
where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) 
CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER SET 
"UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=87
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
- blocked on java.lang.Object@4aa3fe44
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
at 

[jira] [Comment Edited] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-03-29 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17514050#comment-17514050
 ] 

shizhengchao edited comment on FLINK-26033 at 3/30/22, 3:06 AM:


[~renqs] [ |https://github.com/PatrickRen] I combined multiple commits into 
one,and created another 2 PRs to cherry-pick the commit onto release-1.13 and 
release-1.14


was (Author: tinny):
master: 2049f849a130e738759001c5a6b9d85834da08d0
1.13: 3e77dc114261cc91cc80159c95fa27e74b25b8ed
1.14: 6ae8ee89898f3fbaca435a599595d065e71cbab8

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3
>Reporter: shizhengchao
>Assignee: shizhengchao
>Priority: Major
>  Labels: pull-request-available
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key        
> return stickyPartitionCache.partition(topic, cluster);
> } 
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap topicCounterMap = new 
> ConcurrentHashMap<>();
> public void configure(Map configs) {}
> /**
>  * Compute the partition for the given record.
>  *
>  * @param topic The topic name
>  * @param key The key to partition on (or null if no key)
>  * @param keyBytes serialized key to partition on (or null if no key)
>  * @param value The value to partition on or null
>  * @param valueBytes serialized value to partition on or null
>  * @param cluster The current cluster metadata
>  */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = nextValue(topic);
> List availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
> if (!availablePartitions.isEmpty()) {
> int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
> return availablePartitions.get(part).partition();
> } else {
> // no partitions are available, give a non-available partition
> return Utils.toPositive(nextValue) % numPartitions;
> }
> }
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
> return new AtomicInteger(0);
> });
> return 

[jira] [Comment Edited] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-03-29 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17514050#comment-17514050
 ] 

shizhengchao edited comment on FLINK-26033 at 3/30/22, 2:24 AM:


master: 2049f849a130e738759001c5a6b9d85834da08d0
1.13: 3e77dc114261cc91cc80159c95fa27e74b25b8ed
1.14: 6ae8ee89898f3fbaca435a599595d065e71cbab8


was (Author: tinny):
master: 2049f849a130e738759001c5a6b9d85834da08d0
1.13: 8b18e74cec71c9c480bb747fb2f77a55f675ca2a
1.14: d4f374237b21aab93f5d185cb2d1e15a29c9c537

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3
>Reporter: shizhengchao
>Assignee: shizhengchao
>Priority: Major
>  Labels: pull-request-available
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key        
> return stickyPartitionCache.partition(topic, cluster);
> } 
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap topicCounterMap = new 
> ConcurrentHashMap<>();
> public void configure(Map configs) {}
> /**
>  * Compute the partition for the given record.
>  *
>  * @param topic The topic name
>  * @param key The key to partition on (or null if no key)
>  * @param keyBytes serialized key to partition on (or null if no key)
>  * @param value The value to partition on or null
>  * @param valueBytes serialized value to partition on or null
>  * @param cluster The current cluster metadata
>  */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = nextValue(topic);
> List availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
> if (!availablePartitions.isEmpty()) {
> int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
> return availablePartitions.get(part).partition();
> } else {
> // no partitions are available, give a non-available partition
> return Utils.toPositive(nextValue) % numPartitions;
> }
> }
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
> return new AtomicInteger(0);
> });
> return counter.getAndIncrement();
>

[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-03-29 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17514050#comment-17514050
 ] 

shizhengchao commented on FLINK-26033:
--

master: 2049f849a130e738759001c5a6b9d85834da08d0
1.13: 8b18e74cec71c9c480bb747fb2f77a55f675ca2a
1.14: d4f374237b21aab93f5d185cb2d1e15a29c9c537

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3
>Reporter: shizhengchao
>Assignee: shizhengchao
>Priority: Major
>  Labels: pull-request-available
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key        
> return stickyPartitionCache.partition(topic, cluster);
> } 
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap topicCounterMap = new 
> ConcurrentHashMap<>();
> public void configure(Map configs) {}
> /**
>  * Compute the partition for the given record.
>  *
>  * @param topic The topic name
>  * @param key The key to partition on (or null if no key)
>  * @param keyBytes serialized key to partition on (or null if no key)
>  * @param value The value to partition on or null
>  * @param valueBytes serialized value to partition on or null
>  * @param cluster The current cluster metadata
>  */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = nextValue(topic);
> List availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
> if (!availablePartitions.isEmpty()) {
> int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
> return availablePartitions.get(part).partition();
> } else {
> // no partitions are available, give a non-available partition
> return Utils.toPositive(nextValue) % numPartitions;
> }
> }
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
> return new AtomicInteger(0);
> });
> return counter.getAndIncrement();
> }
> public void close() {}
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26527) ClassCastException in TemporaryClassLoaderContext

2022-03-08 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17502790#comment-17502790
 ] 

shizhengchao commented on FLINK-26527:
--

[~martijnvisser] Thanks, I've got it, it's not a problem.

The userclassloader should not include flink related dependencies. So I will 
separate userclassloader and application classloader(parent) to solve this 
problem

> ClassCastException in TemporaryClassLoaderContext
> -
>
> Key: FLINK-26527
> URL: https://issues.apache.org/jira/browse/FLINK-26527
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.13.5, 1.14.3
>Reporter: shizhengchao
>Priority: Major
>
> When I try to run sql using flink's classloader, I get the following 
> exception:
> {code:java}
> Exception in thread "main" java.lang.ClassCastException: 
> org.codehaus.janino.CompilerFactory cannot be cast to 
> org.codehaus.commons.compiler.ICompilerFactory
>     at 
> org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
>  
> ……{code}
> my code is like this:
> {code:java}
> Configuration configuration = new Configuration();
> configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "child-first");
> List dependencies = 
> FlinkClassLoader.getFlinkDependencies(FLINK_HOME/lib);
> URLClassLoader classLoader = ClientUtils.buildUserCodeClassLoader(
> dependencies,
> Collections.emptyList(),
> SessionContext.class.getClassLoader(),
> configuration);
> try (TemporaryClassLoaderContext ignored = 
> TemporaryClassLoaderContext.of(classLoader)) {     
>tableEnv.explainSql(sql);
>  
> //CompilerFactoryFactory.getCompilerFactory("org.codehaus.janino.CompilerFactory");
> } {code}
> But, if you change `classloader.resolve-order` to `parent-first`, everything 
> works fine



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26527) ClassCastException in TemporaryClassLoaderContext

2022-03-07 Thread shizhengchao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhengchao updated FLINK-26527:
-
Description: 
When I try to run sql using flink's classloader, I get the following exception:
{code:java}
Exception in thread "main" java.lang.ClassCastException: 
org.codehaus.janino.CompilerFactory cannot be cast to 
org.codehaus.commons.compiler.ICompilerFactory
    at 
org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
 
……{code}
my code is like this:
{code:java}
Configuration configuration = new Configuration();
configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "child-first");
List dependencies = FlinkClassLoader.getFlinkDependencies(FLINK_HOME/lib);
URLClassLoader classLoader = ClientUtils.buildUserCodeClassLoader(
dependencies,
Collections.emptyList(),
SessionContext.class.getClassLoader(),
configuration);
try (TemporaryClassLoaderContext ignored = 
TemporaryClassLoaderContext.of(classLoader)) {     
   tableEnv.explainSql(sql);
 
//CompilerFactoryFactory.getCompilerFactory("org.codehaus.janino.CompilerFactory");
} {code}
But, if you change `classloader.resolve-order` to `parent-first`, everything 
works fine

  was:
When I try to run sql using flink's classloader, I get the following exception:
{code:java}
Exception in thread "main" java.lang.ClassCastException: 
org.codehaus.janino.CompilerFactory cannot be cast to 
org.codehaus.commons.compiler.ICompilerFactory
    at 
org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
 
……{code}
my code is like this:
{code:java}
Configuration configuration = new Configuration();
configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "child-first");
List dependencies = FlinkClassLoader.getFlinkDependencies(
{code}
{color:#91}${FLINK_HOME}/lib{color}
{code:java}
);
URLClassLoader classLoader = ClientUtils.buildUserCodeClassLoader(
dependencies,
Collections.emptyList(),
SessionContext.class.getClassLoader(),
configuration);
try (TemporaryClassLoaderContext ignored = 
TemporaryClassLoaderContext.of(classLoader)) {     
   tableEnv.explainSql(sql);
 
//CompilerFactoryFactory.getCompilerFactory("org.codehaus.janino.CompilerFactory");
} {code}
But, if you change `classloader.resolve-order` to `parent-first`, everything 
works fine


> ClassCastException in TemporaryClassLoaderContext
> -
>
> Key: FLINK-26527
> URL: https://issues.apache.org/jira/browse/FLINK-26527
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.13.5, 1.14.3
>Reporter: shizhengchao
>Priority: Major
>
> When I try to run sql using flink's classloader, I get the following 
> exception:
> {code:java}
> Exception in thread "main" java.lang.ClassCastException: 
> org.codehaus.janino.CompilerFactory cannot be cast to 
> org.codehaus.commons.compiler.ICompilerFactory
>     at 
> org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
>  
> ……{code}
> my code is like this:
> {code:java}
> Configuration configuration = new Configuration();
> configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "child-first");
> List dependencies = 
> FlinkClassLoader.getFlinkDependencies(FLINK_HOME/lib);
> URLClassLoader classLoader = ClientUtils.buildUserCodeClassLoader(
> dependencies,
> Collections.emptyList(),
> SessionContext.class.getClassLoader(),
> configuration);
> try (TemporaryClassLoaderContext ignored = 
> TemporaryClassLoaderContext.of(classLoader)) {     
>tableEnv.explainSql(sql);
>  
> //CompilerFactoryFactory.getCompilerFactory("org.codehaus.janino.CompilerFactory");
> } {code}
> But, if you change `classloader.resolve-order` to `parent-first`, everything 
> works fine



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26527) ClassCastException in TemporaryClassLoaderContext

2022-03-07 Thread shizhengchao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhengchao updated FLINK-26527:
-
Description: 
When I try to run sql using flink's classloader, I get the following exception:
{code:java}
Exception in thread "main" java.lang.ClassCastException: 
org.codehaus.janino.CompilerFactory cannot be cast to 
org.codehaus.commons.compiler.ICompilerFactory
    at 
org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
 
……{code}
my code is like this:
{code:java}
Configuration configuration = new Configuration();
configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "child-first");
List dependencies = FlinkClassLoader.getFlinkDependencies(
{code}
{color:#91}${FLINK_HOME}/lib{color}
{code:java}
);
URLClassLoader classLoader = ClientUtils.buildUserCodeClassLoader(
dependencies,
Collections.emptyList(),
SessionContext.class.getClassLoader(),
configuration);
try (TemporaryClassLoaderContext ignored = 
TemporaryClassLoaderContext.of(classLoader)) {     
   tableEnv.explainSql(sql);
 
//CompilerFactoryFactory.getCompilerFactory("org.codehaus.janino.CompilerFactory");
} {code}
But, if you change `classloader.resolve-order` to `parent-first`, everything 
works fine

  was:
When I try to run sql using flink's classloader, I get the following exception:
{code:java}
Exception in thread "main" java.lang.ClassCastException: 
org.codehaus.janino.CompilerFactory cannot be cast to 
org.codehaus.commons.compiler.ICompilerFactory
    at 
org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
 
……{code}
my code is like this:
{code:java}
Configuration configuration = new Configuration();
configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "child-first");
List dependencies = 
FlinkClassLoader.getFlinkDependencies(System.getenv(ConfigConstants.ENV_FLINK_HOME_DIR));
URLClassLoader classLoader = ClientUtils.buildUserCodeClassLoader(
dependencies,
Collections.emptyList(),
SessionContext.class.getClassLoader(),
configuration);
try (TemporaryClassLoaderContext ignored = 
TemporaryClassLoaderContext.of(classLoader)) {     
   tableEnv.explainSql(sql);
 
//CompilerFactoryFactory.getCompilerFactory("org.codehaus.janino.CompilerFactory");
} {code}
But, if you change `classloader.resolve-order` to `parent-first`, everything 
works fine


> ClassCastException in TemporaryClassLoaderContext
> -
>
> Key: FLINK-26527
> URL: https://issues.apache.org/jira/browse/FLINK-26527
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.13.5, 1.14.3
>Reporter: shizhengchao
>Priority: Major
>
> When I try to run sql using flink's classloader, I get the following 
> exception:
> {code:java}
> Exception in thread "main" java.lang.ClassCastException: 
> org.codehaus.janino.CompilerFactory cannot be cast to 
> org.codehaus.commons.compiler.ICompilerFactory
>     at 
> org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
>  
> ……{code}
> my code is like this:
> {code:java}
> Configuration configuration = new Configuration();
> configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "child-first");
> List dependencies = FlinkClassLoader.getFlinkDependencies(
> {code}
> {color:#91}${FLINK_HOME}/lib{color}
> {code:java}
> );
> URLClassLoader classLoader = ClientUtils.buildUserCodeClassLoader(
> dependencies,
> Collections.emptyList(),
> SessionContext.class.getClassLoader(),
> configuration);
> try (TemporaryClassLoaderContext ignored = 
> TemporaryClassLoaderContext.of(classLoader)) {     
>tableEnv.explainSql(sql);
>  
> //CompilerFactoryFactory.getCompilerFactory("org.codehaus.janino.CompilerFactory");
> } {code}
> But, if you change `classloader.resolve-order` to `parent-first`, everything 
> works fine



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26527) ClassCastException in TemporaryClassLoaderContext

2022-03-07 Thread shizhengchao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhengchao updated FLINK-26527:
-
Description: 
When I try to run sql using flink's classloader, I get the following exception:
{code:java}
Exception in thread "main" java.lang.ClassCastException: 
org.codehaus.janino.CompilerFactory cannot be cast to 
org.codehaus.commons.compiler.ICompilerFactory
    at 
org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
 
……{code}
my code is like this:
{code:java}
Configuration configuration = new Configuration();
configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "child-first");
List dependencies = 
FlinkClassLoader.getFlinkDependencies(System.getenv(ConfigConstants.ENV_FLINK_HOME_DIR));
URLClassLoader classLoader = ClientUtils.buildUserCodeClassLoader(
dependencies,
Collections.emptyList(),
SessionContext.class.getClassLoader(),
configuration);
try (TemporaryClassLoaderContext ignored = 
TemporaryClassLoaderContext.of(classLoader)) {     
   tableEnv.explainSql(sql);
 
//CompilerFactoryFactory.getCompilerFactory("org.codehaus.janino.CompilerFactory");
} {code}
But, if you change `classloader.resolve-order` to `parent-first`, everything 
works fine

  was:
When I try to run sql using flink's classloader, I get the following exception:
{code:java}
Exception in thread "main" java.lang.ClassCastException: 
org.codehaus.janino.CompilerFactory cannot be cast to 
org.codehaus.commons.compiler.ICompilerFactory
    at 
org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
 
……{code}
my code is like this:
{code:java}
Configuration configuration = new Configuration();
configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "child-first");
List dependencies = 
FlinkClassLoader.getFlinkDependencies(System.getenv(ConfigConstants.ENV_FLINK_HOME_DIR));
URLClassLoader classLoader = ClientUtils.buildUserCodeClassLoader(
dependencies,
Collections.emptyList(),
SessionContext.class.getClassLoader(),
configuration);
try (TemporaryClassLoaderContext ignored = 
TemporaryClassLoaderContext.of(classLoader)) {     tableEnv.explainSql(sql);
 
//CompilerFactoryFactory.getCompilerFactory("org.codehaus.janino.CompilerFactory");
} {code}
But, if you change `classloader.resolve-order` to `parent-first`, everything 
works fine


> ClassCastException in TemporaryClassLoaderContext
> -
>
> Key: FLINK-26527
> URL: https://issues.apache.org/jira/browse/FLINK-26527
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.13.5, 1.14.3
>Reporter: shizhengchao
>Priority: Major
>
> When I try to run sql using flink's classloader, I get the following 
> exception:
> {code:java}
> Exception in thread "main" java.lang.ClassCastException: 
> org.codehaus.janino.CompilerFactory cannot be cast to 
> org.codehaus.commons.compiler.ICompilerFactory
>     at 
> org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
>  
> ……{code}
> my code is like this:
> {code:java}
> Configuration configuration = new Configuration();
> configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "child-first");
> List dependencies = 
> FlinkClassLoader.getFlinkDependencies(System.getenv(ConfigConstants.ENV_FLINK_HOME_DIR));
> URLClassLoader classLoader = ClientUtils.buildUserCodeClassLoader(
> dependencies,
> Collections.emptyList(),
> SessionContext.class.getClassLoader(),
> configuration);
> try (TemporaryClassLoaderContext ignored = 
> TemporaryClassLoaderContext.of(classLoader)) {     
>tableEnv.explainSql(sql);
>  
> //CompilerFactoryFactory.getCompilerFactory("org.codehaus.janino.CompilerFactory");
> } {code}
> But, if you change `classloader.resolve-order` to `parent-first`, everything 
> works fine



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26527) ClassCastException in TemporaryClassLoaderContext

2022-03-07 Thread shizhengchao (Jira)
shizhengchao created FLINK-26527:


 Summary: ClassCastException in TemporaryClassLoaderContext
 Key: FLINK-26527
 URL: https://issues.apache.org/jira/browse/FLINK-26527
 Project: Flink
  Issue Type: Bug
  Components: Client / Job Submission
Affects Versions: 1.14.3, 1.13.5
Reporter: shizhengchao


When I try to run sql using flink's classloader, I get the following exception:
{code:java}
Exception in thread "main" java.lang.ClassCastException: 
org.codehaus.janino.CompilerFactory cannot be cast to 
org.codehaus.commons.compiler.ICompilerFactory
    at 
org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
 
……{code}
my code is like this:
{code:java}
Configuration configuration = new Configuration();
configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "child-first");
List dependencies = 
FlinkClassLoader.getFlinkDependencies(System.getenv(ConfigConstants.ENV_FLINK_HOME_DIR));
URLClassLoader classLoader = ClientUtils.buildUserCodeClassLoader(
dependencies,
Collections.emptyList(),
SessionContext.class.getClassLoader(),
configuration);
try (TemporaryClassLoaderContext ignored = 
TemporaryClassLoaderContext.of(classLoader)) {     tableEnv.explainSql(sql);
 
//CompilerFactoryFactory.getCompilerFactory("org.codehaus.janino.CompilerFactory");
} {code}
But, if you change `classloader.resolve-order` to `parent-first`, everything 
works fine



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-03-02 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500479#comment-17500479
 ] 

shizhengchao edited comment on FLINK-26033 at 3/3/22, 3:36 AM:
---

[~MartijnVisser] [~renqs]   I got through testing that the 
RoundRobinPartitioner built into kafka does not work either, that is it can't 
distribute the writes to all partitions equally, due to abortForNewBatch. For 
example, there are 10 partitions, 
`org.apache.kafka.clients.producer.RoundRobinPartitioner` only send data to 
even partitions, due to abortForNewBatch is true.  So we should implement a 
round-robin partitioner in flink, and need to automatically discover the 
partition design


was (Author: tinny):
[~MartijnVisser] [~renqs]   I got through testing that the 
RoundRobinPartitioner built into kafka does not work either, that is it can't 
distribute the writes to all partitions equally, due to abortForNewBatch. For 
example, there are 10 partitions, `

org.apache.kafka.clients.producer.RoundRobinPartitioner` only send data to even 
partitions, due to abortForNewBatch is true.  So we should implement a 
round-robin partitioner in flink, and need to automatically discover the 
partition design

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3
>Reporter: shizhengchao
>Assignee: shizhengchao
>Priority: Major
>  Labels: pull-request-available
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key        
> return stickyPartitionCache.partition(topic, cluster);
> } 
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap topicCounterMap = new 
> ConcurrentHashMap<>();
> public void configure(Map configs) {}
> /**
>  * Compute the partition for the given record.
>  *
>  * @param topic The topic name
>  * @param key The key to partition on (or null if no key)
>  * @param keyBytes serialized key to partition on (or null if no key)
>  * @param value The value to partition on or null
>  * @param valueBytes serialized value to partition on or null
>  * @param cluster The current cluster metadata
>  */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = 

[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-03-02 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500479#comment-17500479
 ] 

shizhengchao commented on FLINK-26033:
--

[~MartijnVisser] [~renqs]   I got through testing that the 
RoundRobinPartitioner built into kafka does not work either, that is it can't 
distribute the writes to all partitions equally, due to abortForNewBatch. For 
example, there are 10 partitions, `

org.apache.kafka.clients.producer.RoundRobinPartitioner` only send data to even 
partitions, due to abortForNewBatch is true.  So we should implement a 
round-robin partitioner in flink, and need to automatically discover the 
partition design

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3
>Reporter: shizhengchao
>Assignee: shizhengchao
>Priority: Major
>  Labels: pull-request-available
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key        
> return stickyPartitionCache.partition(topic, cluster);
> } 
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap topicCounterMap = new 
> ConcurrentHashMap<>();
> public void configure(Map configs) {}
> /**
>  * Compute the partition for the given record.
>  *
>  * @param topic The topic name
>  * @param key The key to partition on (or null if no key)
>  * @param keyBytes serialized key to partition on (or null if no key)
>  * @param value The value to partition on or null
>  * @param valueBytes serialized value to partition on or null
>  * @param cluster The current cluster metadata
>  */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = nextValue(topic);
> List availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
> if (!availablePartitions.isEmpty()) {
> int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
> return availablePartitions.get(part).partition();
> } else {
> // no partitions are available, give a non-available partition
> return Utils.toPositive(nextValue) % numPartitions;
> }
> }
> private int nextValue(String topic) {
> AtomicInteger counter = 

[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-02-17 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17494327#comment-17494327
 ] 

shizhengchao commented on FLINK-26033:
--

[~renqs]  I plan to add a robin class that implements the FlinkKafkaPartitioner 
interface 

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3
>Reporter: shizhengchao
>Priority: Major
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key        
> return stickyPartitionCache.partition(topic, cluster);
> } 
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap topicCounterMap = new 
> ConcurrentHashMap<>();
> public void configure(Map configs) {}
> /**
>  * Compute the partition for the given record.
>  *
>  * @param topic The topic name
>  * @param key The key to partition on (or null if no key)
>  * @param keyBytes serialized key to partition on (or null if no key)
>  * @param value The value to partition on or null
>  * @param valueBytes serialized value to partition on or null
>  * @param cluster The current cluster metadata
>  */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = nextValue(topic);
> List availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
> if (!availablePartitions.isEmpty()) {
> int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
> return availablePartitions.get(part).partition();
> } else {
> // no partitions are available, give a non-available partition
> return Utils.toPositive(nextValue) % numPartitions;
> }
> }
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
> return new AtomicInteger(0);
> });
> return counter.getAndIncrement();
> }
> public void close() {}
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-02-17 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17494323#comment-17494323
 ] 

shizhengchao commented on FLINK-26033:
--

[~renqs] I 'm interested in fix this bug, please assign it to me

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3
>Reporter: shizhengchao
>Priority: Major
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key        
> return stickyPartitionCache.partition(topic, cluster);
> } 
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap topicCounterMap = new 
> ConcurrentHashMap<>();
> public void configure(Map configs) {}
> /**
>  * Compute the partition for the given record.
>  *
>  * @param topic The topic name
>  * @param key The key to partition on (or null if no key)
>  * @param keyBytes serialized key to partition on (or null if no key)
>  * @param value The value to partition on or null
>  * @param valueBytes serialized value to partition on or null
>  * @param cluster The current cluster metadata
>  */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = nextValue(topic);
> List availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
> if (!availablePartitions.isEmpty()) {
> int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
> return availablePartitions.get(part).partition();
> } else {
> // no partitions are available, give a non-available partition
> return Utils.toPositive(nextValue) % numPartitions;
> }
> }
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
> return new AtomicInteger(0);
> });
> return counter.getAndIncrement();
> }
> public void close() {}
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-02-09 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17489345#comment-17489345
 ] 

shizhengchao commented on FLINK-26033:
--

[~MartijnVisser] This is not a problem, but in higher versions of kafka-clinet, 
such as 2.4.1, there will be problems

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.14.3
>Reporter: shizhengchao
>Priority: Major
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key        
> return stickyPartitionCache.partition(topic, cluster);
> } 
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap topicCounterMap = new 
> ConcurrentHashMap<>();
> public void configure(Map configs) {}
> /**
>  * Compute the partition for the given record.
>  *
>  * @param topic The topic name
>  * @param key The key to partition on (or null if no key)
>  * @param keyBytes serialized key to partition on (or null if no key)
>  * @param value The value to partition on or null
>  * @param valueBytes serialized value to partition on or null
>  * @param cluster The current cluster metadata
>  */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = nextValue(topic);
> List availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
> if (!availablePartitions.isEmpty()) {
> int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
> return availablePartitions.get(part).partition();
> } else {
> // no partitions are available, give a non-available partition
> return Utils.toPositive(nextValue) % numPartitions;
> }
> }
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
> return new AtomicInteger(0);
> });
> return counter.getAndIncrement();
> }
> public void close() {}
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-02-09 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17489342#comment-17489342
 ] 

shizhengchao commented on FLINK-26033:
--

[~MartijnVisser] I made a mistake,I re-checked, the documentation only applies 
to older kafka versions, such as kafka-client-2.0.1, and the problem I'm 
talking about is kafka-client-2.4.1. 
{code:java}
//kafka-2.0.1
public int partition(String topic, Object key, byte[] keyBytes, Object value, 
byte[] valueBytes, Cluster cluster) {
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List availablePartitions = 
cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}

// kafka-2.4.1
public int partition(String topic, Object key, byte[] keyBytes, Object value, 
byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
} 
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

public int nextPartition(String topic, Cluster cluster, int prevPartition) {
List partitions = cluster.partitionsForTopic(topic);
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
// Check that the current sticky partition for the topic is either not set 
or that the partition that 
// triggered the new batch matches the sticky partition that needs to be 
changed.
if (oldPart == null || oldPart == prevPartition) {
List availablePartitions = 
cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() < 1) {
Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) {
newPart = availablePartitions.get(0).partition();
} else {
while (newPart == null || newPart.equals(oldPart)) {
Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % 
availablePartitions.size()).partition();
}
}
// Only change the sticky partition if it is null or prevPartition 
matches the current sticky partition.
if (oldPart == null) {
indexCache.putIfAbsent(topic, newPart);
} else {
indexCache.replace(topic, prevPartition, newPart);
}
return indexCache.get(topic);
}
return indexCache.get(topic);
}

{code}

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.14.3
>Reporter: shizhengchao
>Priority: Major
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> 

[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-02-08 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17489298#comment-17489298
 ] 

shizhengchao commented on FLINK-26033:
--

[~jark] [~libenchao] Can you check this question?

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.14.3
>Reporter: shizhengchao
>Priority: Major
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key        
> return stickyPartitionCache.partition(topic, cluster);
> } 
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap topicCounterMap = new 
> ConcurrentHashMap<>();
> public void configure(Map configs) {}
> /**
>  * Compute the partition for the given record.
>  *
>  * @param topic The topic name
>  * @param key The key to partition on (or null if no key)
>  * @param keyBytes serialized key to partition on (or null if no key)
>  * @param value The value to partition on or null
>  * @param valueBytes serialized value to partition on or null
>  * @param cluster The current cluster metadata
>  */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = nextValue(topic);
> List availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
> if (!availablePartitions.isEmpty()) {
> int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
> return availablePartitions.get(part).partition();
> } else {
> // no partitions are available, give a non-available partition
> return Utils.toPositive(nextValue) % numPartitions;
> }
> }
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
> return new AtomicInteger(0);
> });
> return counter.getAndIncrement();
> }
> public void close() {}
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-02-08 Thread shizhengchao (Jira)
shizhengchao created FLINK-26033:


 Summary: In KafkaConnector, when 'sink.partitioner' is configured 
as 'round-robin', it does not take effect
 Key: FLINK-26033
 URL: https://issues.apache.org/jira/browse/FLINK-26033
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.14.3, 1.13.3
Reporter: shizhengchao


In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
does not take effect. Flink treats 'default' and 'round-robin' as the same 
strategy.
{code:java}
//代码占位符
public static Optional> getFlinkKafkaPartitioner(
ReadableConfig tableOptions, ClassLoader classLoader) {
return tableOptions
.getOptional(SINK_PARTITIONER)
.flatMap(
(String partitioner) -> {
switch (partitioner) {
case SINK_PARTITIONER_VALUE_FIXED:
return Optional.of(new 
FlinkFixedPartitioner<>());
case SINK_PARTITIONER_VALUE_DEFAULT:
case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
return Optional.empty();
// Default fallback to full class name of the 
partitioner.
default:
return Optional.of(
initializePartitioner(partitioner, 
classLoader));
}
});
} {code}
They both use kafka's default partitioner, but the actual There are two 
scenarios for the partition on DefaultPartitioner:
1. Random when there is no key
2. When there is a key, take the modulo according to the key
{code:java}
// org.apache.kafka.clients.producer.internals.DefaultPartitioner
public int partition(String topic, Object key, byte[] keyBytes, Object value, 
byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
// Random when there is no key        
return stickyPartitionCache.partition(topic, cluster);
} 
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
} {code}
Therefore, KafkaConnector does not have a round-robin strategy.But we can 
borrow from kafka's RoundRobinPartitioner
{code:java}
//代码占位符
public class RoundRobinPartitioner implements Partitioner {
private final ConcurrentMap topicCounterMap = new 
ConcurrentHashMap<>();

public void configure(Map configs) {}

/**
 * Compute the partition for the given record.
 *
 * @param topic The topic name
 * @param key The key to partition on (or null if no key)
 * @param keyBytes serialized key to partition on (or null if no key)
 * @param value The value to partition on or null
 * @param valueBytes serialized value to partition on or null
 * @param cluster The current cluster metadata
 */
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object 
value, byte[] valueBytes, Cluster cluster) {
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = nextValue(topic);
List availablePartitions = 
cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
}

private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
return new AtomicInteger(0);
});
return counter.getAndIncrement();
}

public void close() {}

} {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25703) In Batch, I think .stagingPath should be created when the task is running to prevent permission issues caused by inconsistency of hadoop usernames

2022-01-19 Thread shizhengchao (Jira)
shizhengchao created FLINK-25703:


 Summary: In Batch, I think .stagingPath should be created when the 
task is running to prevent permission issues caused by inconsistency of hadoop 
usernames
 Key: FLINK-25703
 URL: https://issues.apache.org/jira/browse/FLINK-25703
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.14.3
Reporter: shizhengchao


If the HADOOP_USER_NAME set on the client side is user_a, but when the task is 
running in the TaskManager is user_b(this is achievable), then the Batch task 
will cause permission problems, as follows:

 
{code:java}
//代码占位符
Caused by: org.apache.hadoop.security.AccessControlException: Permission 
denied: user=user_b, access=WRITE, 
inode="/where/to/whatever/.staging__1642575264185":user_a:supergroup:drwxr-xr-x{code}
because the ./staging__1642575264185 is created by user_a。

 

So we should move the code that creates the .staging directory into the 
FileSystemOutputFormat#open method :
{code:java}
//代码占位符
@Override
public void open(int taskNumber, int numTasks) throws IOException {
try {
// check temppath and create it
checkOrCreateTmpPath();
PartitionTempFileManager fileManager =
new PartitionTempFileManager(
fsFactory, tmpPath, taskNumber, CHECKPOINT_ID, 
outputFileConfig);
PartitionWriter.Context context =
new PartitionWriter.Context<>(parameters, formatFactory);
writer =
PartitionWriterFactory.get(
partitionColumns.length - 
staticPartitions.size() > 0,
dynamicGrouped,
staticPartitions)
.create(context, fileManager, computer);
} catch (Exception e) {
throw new TableException("Exception in open", e);
}
}

private void checkOrCreateTmpPath() throws Exception {
FileSystem fs = tmpPath.getFileSystem();
Preconditions.checkState(fs.exists(tmpPath) || fs.mkdirs(tmpPath),
"Failed to create staging dir " + tmpPath);
}
 {code}
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25124) A deadlock occurs when the jdbc sink uses two consecutive dimension tables to associate

2021-11-30 Thread shizhengchao (Jira)
shizhengchao created FLINK-25124:


 Summary: A deadlock occurs when the jdbc sink uses two consecutive 
dimension tables to associate
 Key: FLINK-25124
 URL: https://issues.apache.org/jira/browse/FLINK-25124
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Affects Versions: 1.13.1
Reporter: shizhengchao


 

The sql statement is as follows:
{code:java}
//代码占位符
INSERT INTO imei_phone_domestic_realtime
  SELECT
  t.data.imei AS imei,
  CAST(t.data.register_date_key AS bigint) AS register_date_key,
  c.agent_type AS channel_name,
  c.agent_short_name,
  c.agent_name,
  c.agent_chinese_name,
  c.isforeign AS agent_market_type,
  p.seriename AS series_name,
  p.salename AS sale_name,
  p.devname AS dev_name,
  p.devnamesource AS dev_name_source,
  p.color,
  p.isforeign AS product_market_type,
  p.carrier,
  p.lcname AS life_cycle,
  IFNULL(p.shipping_price,0) AS shipping_price,
  IFNULL(p.retail_price,0) AS  retail_price
  FROM kafka_imei_phone_domestic_realtime AS t
  LEFT JOIN dim_product FOR SYSTEM_TIME AS OF t.proctime AS p ON 
p.pn=t.item_code
  LEFT JOIN dim_customer FOR SYSTEM_TIME AS OF t.proctime AS c ON 
c.customer_code=t.customer_code
  where t.eventType='update'; {code}
There will be a probability of deadlock:
{code:java}
//代码占位符
"jdbc-upsert-output-format-thread-1" Id=84 BLOCKED on 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat@649788af 
owned by "Legacy Source Thread - Source: 
TableSourceScan(table=[[default_catalog, default_database, 
kafka_imei_phone_domestic_realtime]], fields=[data, eventType]) -> 
Calc(select=[data, data.item_code AS $f3], where=[(eventType = 
_UTF-16LE'update':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> 
LookupJoin(table=[default_catalog.default_database.dim_product], 
joinType=[LeftOuterJoin], async=[false], lookup=[pn=$f3], select=[data, $f3, 
pn, color, isforeign, devname, salename, seriename, lcname, carrier, 
devnamesource, shipping_price, retail_price]) -> Calc(select=[data, color, 
isforeign, devname, salename, seriename, lcname, carrier, devnamesource, 
shipping_price, retail_price, data.customer_code AS $f31]) -> 
LookupJoin(table=[default_catalog.default_database.dim_customer], 
joinType=[LeftOuterJoin], async=[false], lookup=[customer_code=$f31], 
select=[data, color, isforeign, devname, salename, seriename, lcname, carrier, 
devnamesource, shipping_price, retail_price, $f31, customer_code, 
agent_short_name, agent_name, isforeign, agent_type, agent_chinese_name]) -> 
Calc(select=[data.imei AS imei, CAST(data.register_date_key) AS 
register_date_key, agent_type AS channel_name, agent_short_name, agent_name, 
agent_chinese_name, isforeign0 AS agent_market_type, seriename AS series_name, 
salename AS sale_name, devname AS dev_name, devnamesource AS dev_name_source, 
color, isforeign AS product_market_type, carrier, lcname AS life_cycle, 
IFNULL(shipping_price, 0:DECIMAL(10, 0)) AS shipping_price, 
IFNULL(retail_price, 0:DECIMAL(10, 0)) AS retail_price]) -> 
NotNullEnforcer(fields=[imei]) -> Sink: 
Sink(table=[default_catalog.default_database.imei_phone_domestic_realtime], 
fields=[imei, register_date_key, channel_name, agent_short_name, agent_name, 
agent_chinese_name, agent_market_type, series_name, sale_name, dev_name, 
dev_name_source, color, product_market_type, carrier, life_cycle, 
shipping_price, retail_price]) (6/12)#0" Id=82
    at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:124)
    -  blocked on 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat@649788af
    at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat$$Lambda$344/21845506.run(Unknown
 Source)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ...    Number of locked synchronizers = 1
    - java.util.concurrent.ThreadPoolExecutor$Worker@325612a2 {code}
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-23096) HiveParser could not attach the sessionstate of hive

2021-06-23 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367969#comment-17367969
 ] 

shizhengchao commented on FLINK-23096:
--

The  *clearSessionState* method catches IOException, which causes the root 
cause to be overwritten。When I caught the Exception, I got the root cause:
{code:java}
Caused by: org.apache.hadoop.security.AccessControlException: Permission 
denied: user=service, access=WRITE, inode="/tmp":hdfs:supergroup:drwxr-xr-x
at 
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:280)
   ...
at 
org.apache.hadoop.hive.ql.exec.Utilities.createDirsWithPermission(Utilities.java:3678)
at 
org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:597)
at 
org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:554)
at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:508)
{code}

> HiveParser could not attach the sessionstate of hive
> 
>
> Key: FLINK-23096
> URL: https://issues.apache.org/jira/browse/FLINK-23096
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.13.1
>Reporter: shizhengchao
>Priority: Major
>
> My sql code is as follows:
> {code:java}
> //代码占位符
> CREATE CATALOG myhive WITH (
> 'type' = 'hive',
> 'default-database' = 'default',
> 'hive-conf-dir' = '/home/service/upload-job-file/1624269463008'
> );
> use catalog hive;
> set 'table.sql-dialect' = 'hive';
> create view if not exists view_test as
> select
>   cast(goods_id as string) as goods_id,
>   cast(depot_id as string) as depot_id,
>   cast(product_id as string) as product_id,
>   cast(tenant_code as string) as tenant_code
> from edw.dim_yezi_whse_goods_base_info/*+ 
> OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */;
> {code}
> and the exception is as follows:
> {code:java}
> //代码占位符
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Conf non-local session path expected to be non-null
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at 
> org.apache.flink.client.cli.CliFrontend$$Lambda$68/330382173.call(Unknown 
> Source)
> at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext$$Lambda$69/680712932.run(Unknown
>  Source)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: java.lang.NullPointerException: Conf non-local session path 
> expected to be non-null
> at 
> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:208)
> at 
> org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:669)
> at 
> org.apache.flink.table.planner.delegation.hive.HiveParser.clearSessionState(HiveParser.java:376)
> at 
> org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:219)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
> at 
> com.shizhengchao.io.FlinkSqlStreamingPlatform.callFlinkSql(FlinkSqlStreamingPlatform.java:157)
> at 
> com.shizhengchao.io.FlinkSqlStreamingPlatform.callCommand(FlinkSqlStreamingPlatform.java:129)
> at 
> com.shizhengchao.io.FlinkSqlStreamingPlatform.run(FlinkSqlStreamingPlatform.java:91)
> at 
> com.shizhengchao.io.FlinkSqlStreamingPlatform.main(FlinkSqlStreamingPlatform.java:66)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> 

[jira] [Comment Edited] (FLINK-23096) HiveParser could not attach the sessionstate of hive

2021-06-22 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367832#comment-17367832
 ] 

shizhengchao edited comment on FLINK-23096 at 6/23/21, 3:29 AM:


[~Leonard Xu] [~lirui]
{code:java}
//代码占位符
private void clearSessionState(HiveConf hiveConf) {
SessionState sessionState = SessionState.get();
if (sessionState != null) {
try {
sessionState.close();
List toDelete = new ArrayList<>();
toDelete.add(SessionState.getHDFSSessionPath(hiveConf));
toDelete.add(SessionState.getLocalSessionPath(hiveConf));
for (Path path : toDelete) {
FileSystem fs = path.getFileSystem(hiveConf);
fs.delete(path, true);
}
} catch (IOException e) {
LOG.warn("Error closing SessionState", e);
}
}

public static Path getHDFSSessionPath(Configuration conf) {
SessionState ss = SessionState.get();
if (ss == null) {
  String sessionPathString = conf.get(HDFS_SESSION_PATH_KEY);
  Preconditions.checkNotNull(sessionPathString,
  "Conf non-local session path expected to be non-null");
  return new Path(sessionPathString);
}
Preconditions.checkNotNull(ss.hdfsSessionPath,
"Non-local session path expected to be non-null");
return ss.hdfsSessionPath;
  }
}{code}

beacuse the session is closed,so SessionState.get() is null.


was (Author: tinny):
[~Leonard Xu] [~lirui]
{code:java}
//代码占位符
private void clearSessionState(HiveConf hiveConf) {
SessionState sessionState = SessionState.get();
if (sessionState != null) {
try {
sessionState.close();
List toDelete = new ArrayList<>();
toDelete.add(SessionState.getHDFSSessionPath(hiveConf));
toDelete.add(SessionState.getLocalSessionPath(hiveConf));
for (Path path : toDelete) {
FileSystem fs = path.getFileSystem(hiveConf);
fs.delete(path, true);
}
} catch (IOException e) {
LOG.warn("Error closing SessionState", e);
}
}

public static Path getHDFSSessionPath(Configuration conf) {
SessionState ss = SessionState.get();
if (ss == null) {
  String sessionPathString = conf.get(HDFS_SESSION_PATH_KEY);
  Preconditions.checkNotNull(sessionPathString,
  "Conf non-local session path expected to be non-null");
  return new Path(sessionPathString);
}
Preconditions.checkNotNull(ss.hdfsSessionPath,
"Non-local session path expected to be non-null");
return ss.hdfsSessionPath;
  }
}{code}

beacuse the session is closed,so SessionState.get() will be cause an NPE.

> HiveParser could not attach the sessionstate of hive
> 
>
> Key: FLINK-23096
> URL: https://issues.apache.org/jira/browse/FLINK-23096
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.13.1
>Reporter: shizhengchao
>Priority: Major
>
> My sql code is as follows:
> {code:java}
> //代码占位符
> CREATE CATALOG myhive WITH (
> 'type' = 'hive',
> 'default-database' = 'default',
> 'hive-conf-dir' = '/home/service/upload-job-file/1624269463008'
> );
> use catalog hive;
> set 'table.sql-dialect' = 'hive';
> create view if not exists view_test as
> select
>   cast(goods_id as string) as goods_id,
>   cast(depot_id as string) as depot_id,
>   cast(product_id as string) as product_id,
>   cast(tenant_code as string) as tenant_code
> from edw.dim_yezi_whse_goods_base_info/*+ 
> OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */;
> {code}
> and the exception is as follows:
> {code:java}
> //代码占位符
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Conf non-local session path expected to be non-null
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at 
> org.apache.flink.client.cli.CliFrontend$$Lambda$68/330382173.call(Unknown 
> Source)
> at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext$$Lambda$69/680712932.run(Unknown
>  Source)
> at 

[jira] [Comment Edited] (FLINK-23096) HiveParser could not attach the sessionstate of hive

2021-06-22 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367832#comment-17367832
 ] 

shizhengchao edited comment on FLINK-23096 at 6/23/21, 3:10 AM:


[~Leonard Xu] [~lirui]
{code:java}
//代码占位符
private void clearSessionState(HiveConf hiveConf) {
SessionState sessionState = SessionState.get();
if (sessionState != null) {
try {
sessionState.close();
List toDelete = new ArrayList<>();
toDelete.add(SessionState.getHDFSSessionPath(hiveConf));
toDelete.add(SessionState.getLocalSessionPath(hiveConf));
for (Path path : toDelete) {
FileSystem fs = path.getFileSystem(hiveConf);
fs.delete(path, true);
}
} catch (IOException e) {
LOG.warn("Error closing SessionState", e);
}
}

public static Path getHDFSSessionPath(Configuration conf) {
SessionState ss = SessionState.get();
if (ss == null) {
  String sessionPathString = conf.get(HDFS_SESSION_PATH_KEY);
  Preconditions.checkNotNull(sessionPathString,
  "Conf non-local session path expected to be non-null");
  return new Path(sessionPathString);
}
Preconditions.checkNotNull(ss.hdfsSessionPath,
"Non-local session path expected to be non-null");
return ss.hdfsSessionPath;
  }
}{code}

beacuse the session is closed,so SessionState.get() will be cause an NPE.


was (Author: tinny):
[~Leonard Xu] [~lirui]
{code:java}
//代码占位符
private void clearSessionState(HiveConf hiveConf) {
SessionState sessionState = SessionState.get();
if (sessionState != null) {
try {
sessionState.close();
List toDelete = new ArrayList<>();
toDelete.add(SessionState.getHDFSSessionPath(hiveConf));
toDelete.add(SessionState.getLocalSessionPath(hiveConf));
for (Path path : toDelete) {
FileSystem fs = path.getFileSystem(hiveConf);
fs.delete(path, true);
}
} catch (IOException e) {
LOG.warn("Error closing SessionState", e);
}
}
}{code}

beacuse the session is closed,so SessionState.getHDFSSessionPath(hiveConf) will 
be cause an NPE.

> HiveParser could not attach the sessionstate of hive
> 
>
> Key: FLINK-23096
> URL: https://issues.apache.org/jira/browse/FLINK-23096
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.13.1
>Reporter: shizhengchao
>Priority: Major
>
> My sql code is as follows:
> {code:java}
> //代码占位符
> CREATE CATALOG myhive WITH (
> 'type' = 'hive',
> 'default-database' = 'default',
> 'hive-conf-dir' = '/home/service/upload-job-file/1624269463008'
> );
> use catalog hive;
> set 'table.sql-dialect' = 'hive';
> create view if not exists view_test as
> select
>   cast(goods_id as string) as goods_id,
>   cast(depot_id as string) as depot_id,
>   cast(product_id as string) as product_id,
>   cast(tenant_code as string) as tenant_code
> from edw.dim_yezi_whse_goods_base_info/*+ 
> OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */;
> {code}
> and the exception is as follows:
> {code:java}
> //代码占位符
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Conf non-local session path expected to be non-null
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at 
> org.apache.flink.client.cli.CliFrontend$$Lambda$68/330382173.call(Unknown 
> Source)
> at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext$$Lambda$69/680712932.run(Unknown
>  Source)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: java.lang.NullPointerException: Conf 

[jira] [Comment Edited] (FLINK-23096) HiveParser could not attach the sessionstate of hive

2021-06-22 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367832#comment-17367832
 ] 

shizhengchao edited comment on FLINK-23096 at 6/23/21, 3:09 AM:


[~Leonard Xu] [~lirui]
{code:java}
//代码占位符
private void clearSessionState(HiveConf hiveConf) {
SessionState sessionState = SessionState.get();
if (sessionState != null) {
try {
sessionState.close();
List toDelete = new ArrayList<>();
toDelete.add(SessionState.getHDFSSessionPath(hiveConf));
toDelete.add(SessionState.getLocalSessionPath(hiveConf));
for (Path path : toDelete) {
FileSystem fs = path.getFileSystem(hiveConf);
fs.delete(path, true);
}
} catch (IOException e) {
LOG.warn("Error closing SessionState", e);
}
}
}{code}

beacuse the session is closed,so SessionState.getHDFSSessionPath(hiveConf) will 
be cause an NPE.


was (Author: tinny):
[~Leonard Xu] [~lirui]
{code:java}
//代码占位符private void clearSessionState(HiveConf hiveConf) {
SessionState sessionState = SessionState.get();
if (sessionState != null) {
try {
sessionState.close();
List toDelete = new ArrayList<>();
toDelete.add(SessionState.getHDFSSessionPath(hiveConf));
toDelete.add(SessionState.getLocalSessionPath(hiveConf));
for (Path path : toDelete) {
FileSystem fs = path.getFileSystem(hiveConf);
fs.delete(path, true);
}
} catch (IOException e) {
LOG.warn("Error closing SessionState", e);
}
}
}{code}

beacuse the session is closed,so SessionState.getHDFSSessionPath(hiveConf) will 
be cause an NPE.

> HiveParser could not attach the sessionstate of hive
> 
>
> Key: FLINK-23096
> URL: https://issues.apache.org/jira/browse/FLINK-23096
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.13.1
>Reporter: shizhengchao
>Priority: Major
>
> My sql code is as follows:
> {code:java}
> //代码占位符
> CREATE CATALOG myhive WITH (
> 'type' = 'hive',
> 'default-database' = 'default',
> 'hive-conf-dir' = '/home/service/upload-job-file/1624269463008'
> );
> use catalog hive;
> set 'table.sql-dialect' = 'hive';
> create view if not exists view_test as
> select
>   cast(goods_id as string) as goods_id,
>   cast(depot_id as string) as depot_id,
>   cast(product_id as string) as product_id,
>   cast(tenant_code as string) as tenant_code
> from edw.dim_yezi_whse_goods_base_info/*+ 
> OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */;
> {code}
> and the exception is as follows:
> {code:java}
> //代码占位符
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Conf non-local session path expected to be non-null
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at 
> org.apache.flink.client.cli.CliFrontend$$Lambda$68/330382173.call(Unknown 
> Source)
> at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext$$Lambda$69/680712932.run(Unknown
>  Source)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: java.lang.NullPointerException: Conf non-local session path 
> expected to be non-null
> at 
> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:208)
> at 
> org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:669)
> at 
> org.apache.flink.table.planner.delegation.hive.HiveParser.clearSessionState(HiveParser.java:376)
> at 
> org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:219)
> at 
> 

[jira] [Comment Edited] (FLINK-23096) HiveParser could not attach the sessionstate of hive

2021-06-22 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367832#comment-17367832
 ] 

shizhengchao edited comment on FLINK-23096 at 6/23/21, 3:07 AM:


[~Leonard Xu] [~lirui]
{code:java}
//代码占位符private void clearSessionState(HiveConf hiveConf) {
SessionState sessionState = SessionState.get();
if (sessionState != null) {
try {
sessionState.close();
List toDelete = new ArrayList<>();
toDelete.add(SessionState.getHDFSSessionPath(hiveConf));
toDelete.add(SessionState.getLocalSessionPath(hiveConf));
for (Path path : toDelete) {
FileSystem fs = path.getFileSystem(hiveConf);
fs.delete(path, true);
}
} catch (IOException e) {
LOG.warn("Error closing SessionState", e);
}
}
}{code}

beacuse the session is closed,so SessionState.getHDFSSessionPath(hiveConf) will 
be cause an NPE.


was (Author: tinny):
[~Leonard Xu] [~lirui]

> HiveParser could not attach the sessionstate of hive
> 
>
> Key: FLINK-23096
> URL: https://issues.apache.org/jira/browse/FLINK-23096
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.13.1
>Reporter: shizhengchao
>Priority: Major
>
> My sql code is as follows:
> {code:java}
> //代码占位符
> CREATE CATALOG myhive WITH (
> 'type' = 'hive',
> 'default-database' = 'default',
> 'hive-conf-dir' = '/home/service/upload-job-file/1624269463008'
> );
> use catalog hive;
> set 'table.sql-dialect' = 'hive';
> create view if not exists view_test as
> select
>   cast(goods_id as string) as goods_id,
>   cast(depot_id as string) as depot_id,
>   cast(product_id as string) as product_id,
>   cast(tenant_code as string) as tenant_code
> from edw.dim_yezi_whse_goods_base_info/*+ 
> OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */;
> {code}
> and the exception is as follows:
> {code:java}
> //代码占位符
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Conf non-local session path expected to be non-null
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at 
> org.apache.flink.client.cli.CliFrontend$$Lambda$68/330382173.call(Unknown 
> Source)
> at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext$$Lambda$69/680712932.run(Unknown
>  Source)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: java.lang.NullPointerException: Conf non-local session path 
> expected to be non-null
> at 
> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:208)
> at 
> org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:669)
> at 
> org.apache.flink.table.planner.delegation.hive.HiveParser.clearSessionState(HiveParser.java:376)
> at 
> org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:219)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
> at 
> com.shizhengchao.io.FlinkSqlStreamingPlatform.callFlinkSql(FlinkSqlStreamingPlatform.java:157)
> at 
> com.shizhengchao.io.FlinkSqlStreamingPlatform.callCommand(FlinkSqlStreamingPlatform.java:129)
> at 
> com.shizhengchao.io.FlinkSqlStreamingPlatform.run(FlinkSqlStreamingPlatform.java:91)
> at 
> com.shizhengchao.io.FlinkSqlStreamingPlatform.main(FlinkSqlStreamingPlatform.java:66)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> 

[jira] [Commented] (FLINK-23096) HiveParser could not attach the sessionstate of hive

2021-06-22 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367832#comment-17367832
 ] 

shizhengchao commented on FLINK-23096:
--

[~Leonard Xu] [~lirui]

> HiveParser could not attach the sessionstate of hive
> 
>
> Key: FLINK-23096
> URL: https://issues.apache.org/jira/browse/FLINK-23096
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.13.1
>Reporter: shizhengchao
>Priority: Major
>
> My sql code is as follows:
> {code:java}
> //代码占位符
> CREATE CATALOG myhive WITH (
> 'type' = 'hive',
> 'default-database' = 'default',
> 'hive-conf-dir' = '/home/service/upload-job-file/1624269463008'
> );
> use catalog hive;
> set 'table.sql-dialect' = 'hive';
> create view if not exists view_test as
> select
>   cast(goods_id as string) as goods_id,
>   cast(depot_id as string) as depot_id,
>   cast(product_id as string) as product_id,
>   cast(tenant_code as string) as tenant_code
> from edw.dim_yezi_whse_goods_base_info/*+ 
> OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */;
> {code}
> and the exception is as follows:
> {code:java}
> //代码占位符
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Conf non-local session path expected to be non-null
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at 
> org.apache.flink.client.cli.CliFrontend$$Lambda$68/330382173.call(Unknown 
> Source)
> at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext$$Lambda$69/680712932.run(Unknown
>  Source)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: java.lang.NullPointerException: Conf non-local session path 
> expected to be non-null
> at 
> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:208)
> at 
> org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:669)
> at 
> org.apache.flink.table.planner.delegation.hive.HiveParser.clearSessionState(HiveParser.java:376)
> at 
> org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:219)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
> at 
> com.shizhengchao.io.FlinkSqlStreamingPlatform.callFlinkSql(FlinkSqlStreamingPlatform.java:157)
> at 
> com.shizhengchao.io.FlinkSqlStreamingPlatform.callCommand(FlinkSqlStreamingPlatform.java:129)
> at 
> com.shizhengchao.io.FlinkSqlStreamingPlatform.run(FlinkSqlStreamingPlatform.java:91)
> at 
> com.shizhengchao.io.FlinkSqlStreamingPlatform.main(FlinkSqlStreamingPlatform.java:66)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> ... 13 common frames omitted
> {code}
> My guess is that sessionstate is not set to threadlocal:
> {code:java}
> //代码占位符
> // @see org.apache.hadoop.hive.ql.session.SessionState.setCurrentSessionState
> public static void setCurrentSessionState(SessionState startSs) {
>   tss.get().attach(startSs);
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23096) HiveParser could not attach the sessionstate of hive

2021-06-22 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367327#comment-17367327
 ] 

shizhengchao commented on FLINK-23096:
--

This is the location of the code exception I found:

 
{code:java}
//代码占位符

// @see org.apache.flink.table.planner.delegation.hive.HiveParser
private void clearSessionState(HiveConf hiveConf) {
SessionState sessionState = SessionState.get();
if (sessionState != null) {
try {
sessionState.close();
List toDelete = new ArrayList<>();
toDelete.add(SessionState.getHDFSSessionPath(hiveConf));
toDelete.add(SessionState.getLocalSessionPath(hiveConf));
for (Path path : toDelete) {
FileSystem fs = path.getFileSystem(hiveConf);
fs.delete(path, true);
}
} catch (IOException e) {
LOG.warn("Error closing SessionState", e);
}
}
}

// @see org.apache.hadoop.hive.ql.session.SessionState
public static Path getHDFSSessionPath(Configuration conf) {
  SessionState ss = SessionState.get();
  if (ss == null) {
String sessionPathString = conf.get(HDFS_SESSION_PATH_KEY);
Preconditions.checkNotNull(sessionPathString,
"Conf non-local session path expected to be non-null");
return new Path(sessionPathString);
  }
  Preconditions.checkNotNull(ss.hdfsSessionPath,
  "Non-local session path expected to be non-null");
  return ss.hdfsSessionPath;
}
{code}
 

 

> HiveParser could not attach the sessionstate of hive
> 
>
> Key: FLINK-23096
> URL: https://issues.apache.org/jira/browse/FLINK-23096
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.13.1
>Reporter: shizhengchao
>Priority: Major
>
> My sql code is as follows:
> {code:java}
> //代码占位符
> CREATE CATALOG myhive WITH (
> 'type' = 'hive',
> 'default-database' = 'default',
> 'hive-conf-dir' = '/home/service/upload-job-file/1624269463008'
> );
> use catalog hive;
> set 'table.sql-dialect' = 'hive';
> create view if not exists view_test as
> select
>   cast(goods_id as string) as goods_id,
>   cast(depot_id as string) as depot_id,
>   cast(product_id as string) as product_id,
>   cast(tenant_code as string) as tenant_code
> from edw.dim_yezi_whse_goods_base_info/*+ 
> OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */;
> {code}
> and the exception is as follows:
> {code:java}
> //代码占位符
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Conf non-local session path expected to be non-null
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at 
> org.apache.flink.client.cli.CliFrontend$$Lambda$68/330382173.call(Unknown 
> Source)
> at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext$$Lambda$69/680712932.run(Unknown
>  Source)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: java.lang.NullPointerException: Conf non-local session path 
> expected to be non-null
> at 
> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:208)
> at 
> org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:669)
> at 
> org.apache.flink.table.planner.delegation.hive.HiveParser.clearSessionState(HiveParser.java:376)
> at 
> org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:219)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
> at 
> com.shizhengchao.io.FlinkSqlStreamingPlatform.callFlinkSql(FlinkSqlStreamingPlatform.java:157)
> at 
> 

[jira] [Updated] (FLINK-23096) HiveParser could not attach the sessionstate of hive

2021-06-22 Thread shizhengchao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhengchao updated FLINK-23096:
-
Description: 
My sql code is as follows:
{code:java}
//代码占位符
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/home/service/upload-job-file/1624269463008'
);

use catalog hive;

set 'table.sql-dialect' = 'hive';

create view if not exists view_test as
select
  cast(goods_id as string) as goods_id,
  cast(depot_id as string) as depot_id,
  cast(product_id as string) as product_id,
  cast(tenant_code as string) as tenant_code
from edw.dim_yezi_whse_goods_base_info/*+ 
OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */;
{code}
and the exception is as follows:
{code:java}
//代码占位符
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Conf non-local session path expected to be non-null
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at 
org.apache.flink.client.cli.CliFrontend$$Lambda$68/330382173.call(Unknown 
Source)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext$$Lambda$69/680712932.run(Unknown
 Source)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.NullPointerException: Conf non-local session path expected 
to be non-null
at 
com.google.common.base.Preconditions.checkNotNull(Preconditions.java:208)
at 
org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:669)
at 
org.apache.flink.table.planner.delegation.hive.HiveParser.clearSessionState(HiveParser.java:376)
at 
org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:219)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
at 
com.shizhengchao.io.FlinkSqlStreamingPlatform.callFlinkSql(FlinkSqlStreamingPlatform.java:157)
at 
com.shizhengchao.io.FlinkSqlStreamingPlatform.callCommand(FlinkSqlStreamingPlatform.java:129)
at 
com.shizhengchao.io.FlinkSqlStreamingPlatform.run(FlinkSqlStreamingPlatform.java:91)
at 
com.shizhengchao.io.FlinkSqlStreamingPlatform.main(FlinkSqlStreamingPlatform.java:66)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 13 common frames omitted
{code}
My guess is that sessionstate is not set to threadlocal:
{code:java}
//代码占位符
// @see org.apache.hadoop.hive.ql.session.SessionState.setCurrentSessionState
public static void setCurrentSessionState(SessionState startSs) {
  tss.get().attach(startSs);
}
{code}
 

  was:
My sql code is as follows:
{code:java}
//代码占位符
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/home/service/upload-job-file/1624269463008'
);

use catalog hive;

set 'table.sql-dialect' = 'hive';

create view if not exists view_test as
select
  cast(goods_id as string) as goods_id,
  cast(depot_id as string) as depot_id,
  cast(product_id as string) as product_id,
  cast(tenant_code as string) as tenant_code
from edw.dim_yezi_whse_goods_base_info/*+ 
OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */;
{code}
and the exception is as follows:
{code:java}
//代码占位符
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Conf non-local session path expected to be non-null
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at 

[jira] [Updated] (FLINK-23096) HiveParser could not attach the sessionstate of hive

2021-06-22 Thread shizhengchao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhengchao updated FLINK-23096:
-
Description: 
My sql code is as follows:
{code:java}
//代码占位符
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/home/service/upload-job-file/1624269463008'
);

use catalog hive;

set 'table.sql-dialect' = 'hive';

create view if not exists view_test as
select
  cast(goods_id as string) as goods_id,
  cast(depot_id as string) as depot_id,
  cast(product_id as string) as product_id,
  cast(tenant_code as string) as tenant_code
from edw.dim_yezi_whse_goods_base_info/*+ 
OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */;
{code}
and the exception is as follows:
{code:java}
//代码占位符
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Conf non-local session path expected to be non-null
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at 
org.apache.flink.client.cli.CliFrontend$$Lambda$68/330382173.call(Unknown 
Source)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext$$Lambda$69/680712932.run(Unknown
 Source)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.NullPointerException: Conf non-local session path expected 
to be non-null
at 
com.google.common.base.Preconditions.checkNotNull(Preconditions.java:208)
at 
org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:669)
at 
org.apache.flink.table.planner.delegation.hive.HiveParser.clearSessionState(HiveParser.java:376)
at 
org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:219)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
at 
com.shizhengchao.io.FlinkSqlStreamingPlatform.callFlinkSql(FlinkSqlStreamingPlatform.java:157)
at 
com.shizhengchao.io.FlinkSqlStreamingPlatform.callCommand(FlinkSqlStreamingPlatform.java:129)
at 
com.shizhengchao.io.FlinkSqlStreamingPlatform.run(FlinkSqlStreamingPlatform.java:91)
at 
com.shizhengchao.io.FlinkSqlStreamingPlatform.main(FlinkSqlStreamingPlatform.java:66)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 13 common frames omitted
{code}

  was:
My sql code is as follows:
{code:java}
//代码占位符
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/home/service/upload-job-file/1624269463008'
);

use catalog hive;

set 'table.sql-dialect' = 'hive';

create view if not exists view_test as
select
  cast(goods_id as string) as goods_id,
  cast(depot_id as string) as depot_id,
  cast(product_id as string) as product_id,
  cast(tenant_code as string) as tenant_code
from edw.dim_yezi_whse_goods_base_info/*+ 
OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */;
{code}


> HiveParser could not attach the sessionstate of hive
> 
>
> Key: FLINK-23096
> URL: https://issues.apache.org/jira/browse/FLINK-23096
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.13.1
>Reporter: shizhengchao
>Priority: Major
>
> My sql code is as follows:
> {code:java}
> //代码占位符
> CREATE CATALOG myhive WITH (
> 'type' = 'hive',
> 'default-database' = 'default',
> 'hive-conf-dir' = 

[jira] [Created] (FLINK-23096) HiveParser could not attach the sessionstate of hive

2021-06-22 Thread shizhengchao (Jira)
shizhengchao created FLINK-23096:


 Summary: HiveParser could not attach the sessionstate of hive
 Key: FLINK-23096
 URL: https://issues.apache.org/jira/browse/FLINK-23096
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Affects Versions: 1.13.1
Reporter: shizhengchao


My sql code is as follows:
{code:java}
//代码占位符
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/home/service/upload-job-file/1624269463008'
);

use catalog hive;

set 'table.sql-dialect' = 'hive';

create view if not exists view_test as
select
  cast(goods_id as string) as goods_id,
  cast(depot_id as string) as depot_id,
  cast(product_id as string) as product_id,
  cast(tenant_code as string) as tenant_code
from edw.dim_yezi_whse_goods_base_info/*+ 
OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */;
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19446) canal-json has a situation that -U and +U are equal, when updating the null field to be non-null

2021-01-19 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268342#comment-17268342
 ] 

shizhengchao commented on FLINK-19446:
--

[~Balro],i think your patch would be work well.

> canal-json has a situation that -U and +U are equal, when updating the null 
> field to be non-null
> 
>
> Key: FLINK-19446
> URL: https://issues.apache.org/jira/browse/FLINK-19446
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.1
>Reporter: shizhengchao
>Assignee: Nicholas Jiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> line 118 in CanalJsonDeserializationSchema#deserialize method:
> {code:java}
> GenericRowData after = (GenericRowData) data.getRow(i, fieldCount);
> GenericRowData before = (GenericRowData) old.getRow(i, fieldCount);
> for (int f = 0; f < fieldCount; f++) {
>   if (before.isNullAt(f)) {
>   // not null fields in "old" (before) means the fields are 
> changed
>   // null/empty fields in "old" (before) means the fields are not 
> changed
>   // so we just copy the not changed fields into before
>   before.setField(f, after.getField(f));
>   }
> }
> before.setRowKind(RowKind.UPDATE_BEFORE);
> after.setRowKind(RowKind.UPDATE_AFTER);
> {code}
> if a field is null before update,it will cause -U and +U to be equal



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20995) Calling execute() on org.apache.flink.table.api.Table Throws Exception

2021-01-19 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17267996#comment-17267996
 ] 

shizhengchao commented on FLINK-20995:
--

Did you submit the task through the web interface ?

> Calling execute() on org.apache.flink.table.api.Table Throws Exception
> --
>
> Key: FLINK-20995
> URL: https://issues.apache.org/jira/browse/FLINK-20995
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0
> Environment: Flink version 1.12.0
> Kubernetes Standalone Cluster (Session Mode)
> uname -a
> Linux flink-jobmanager-664c4b8f46-77llc 3.10.0-957.el7.x86_64 #1 SMP Thu Nov 
> 8 23:39:32 UTC 2018 x86_64 GNU/Linux
>Reporter: Robert Cullen
>Priority: Blocker
> Fix For: 1.13.0
>
>
> Exception on this line:
> {code:java}
> try (CloseableIterator iterator = log_counts.execute().collect()) {
> ...
> {code}
> Here's the code snippet: (See Stack Trace below)
> {code:java}
> ...
>  
> final EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inStreamingMode().build();
> final TableEnvironment tEnv = TableEnvironment.create(settings);
> String ddl = "CREATE TABLE log_counts (\n" +
> "  msg_id STRING,\n" +
> "  hostname STRING,\n" +
> "  last_updated TIMESTAMP(3),\n" +
> "  WATERMARK FOR last_updated AS last_updated - INTERVAL '5' 
> SECOND\n" +
> ") WITH (\n" +
> "  'connector.type' = 'jdbc',\n" +
> "  'connector.url' = 
> 'jdbc:postgresql://cmdaa-postgres.cmdaa.svc.cluster.local:5432/postgres',\n" +
> "  'connector.table' = 'chi_logger_intake',\n" +
> "  'connector.driver' = 'org.postgresql.Driver',\n" +
> "  'connector.username' = 'user',\n" +
> "  'connector.password' = 'password'\n" +
> ")";
> tEnv.executeSql(ddl);
> Table log_counts = tEnv.from("log_counts")
> .filter($("hostname").isNotNull()
> .and($("hostname").isNotEqual("")))
> .window(Tumble
> .over(lit(5).minutes())
> .on($("last_updated")).as("w"))
> .groupBy($("msg_id"), $("hostname"), $("w"))
> .select($("msg_id"),
> $("hostname"),
> $("msg_id").count().as("cnt"));
> try (CloseableIterator iterator = 
> log_counts.execute().collect()) {
> final List materializedUpdates = new ArrayList<>();
> iterator.forEachRemaining(
> row -> {
> final RowKind kind = row.getKind();
> switch (kind) {
> case INSERT:
> case UPDATE_AFTER:
> row.setKind(RowKind.INSERT); // for full 
> equality
> materializedUpdates.add(row);
> break;
> case UPDATE_BEFORE:
> case DELETE:
> row.setKind(RowKind.INSERT); // for full 
> equality
> materializedUpdates.remove(row);
> break;
> }
> });
> // show the final output table if the result is bounded,
> // the output should exclude San Antonio because it has a smaller 
> population than
> // Houston or Dallas in Texas (TX)
> materializedUpdates.forEach(System.out::println);
> }{code}
>  
> Stack Trace:
> {code:java}
> 2021-01-15 16:52:00,628 WARN 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Configuring 
> the job submission via query parameters is deprecated. Please migrate to 
> submitting a JSON request instead.
> 2021-01-15 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - 
> Starting program (detached: true)
> 2021-01-15 16:52:00,678 INFO 
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted.
> 2021-01-15 16:52:00,678 INFO 
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1.
> 2021-01-15 16:52:00,830 INFO 
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received 
> JobGraph submission 84c9f12fe943bc7f32ee637666ed3bc1 (collect).
> 2021-01-15 16:52:00,830 INFO 
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting job 
> 84c9f12fe943bc7f32ee637666ed3bc1 (collect).
> 

[jira] [Commented] (FLINK-20660) Time window operator with computed column triggers an exception in batch mode

2020-12-18 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17251616#comment-17251616
 ] 

shizhengchao commented on FLINK-20660:
--

[~sujun1020], i think the problem is the longToTimestamp method can not convert 
a long to timestamp(3), so you can try use this way:

> Time window operator with computed column triggers an exception in batch mode
> -
>
> Key: FLINK-20660
> URL: https://issues.apache.org/jira/browse/FLINK-20660
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.0
>Reporter: sujun
>Priority: Minor
>
> {{Time window operator with computed column triggers an exception in batch 
> mode, it may be a bug in BatchExecWindowAggregateRule.}}
> {{My test code:}}
> {code:java}
> public class WindowAggWithBigintTest {
>   public static void main(String[] args) throws Exception {
> EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> TableEnvironment tEnv = TableEnvironment.create(settings);
> 
> tEnv.registerFunction("longToTimestamp",new LongToTimestamp());   
>  
> String ddl = "CREATE TABLE source(occur_time bigint,rowtime AS 
> longToTimestamp(occur_time)) WITH ('connector' = 'filesystem', 'format' = 
> 'orc', 'path' = '/path/to/orc')";
> tEnv.executeSql(ddl);
> Table table = tEnv.sqlQuery("select TUMBLE_START(rowtime, 
> INTERVAL '1' HOUR) as ts,count(1) as ct from source group by TUMBLE(rowtime, 
> INTERVAL '1' HOUR)");
> DiscardingOutputFormat outputFormat = new 
> DiscardingOutputFormat();
> TableResultSink tableResultSink = new 
> TableResultSink(table.getSchema(), outputFormat);
> tEnv.registerTableSink("sink",tableResultSink);
> table.insertInto("sink");
> tEnv.execute("test");
>   }  
>   private static class TableResultSink implements StreamTableSink 
> {
> private final TableSchema schema;
> private final DataType rowType;
> private final OutputFormat outputFormat; 
>
> TableResultSink(TableSchema schema, OutputFormat 
> outputFormat) {
>   this.schema = schema;
>   this.rowType = schema.toRowDataType();
>   this.outputFormat = outputFormat;
> }
> @Override
> public DataType getConsumedDataType() {
>   return rowType;
> }
> @Override
> public TableSchema getTableSchema() {
>   return schema;
> }
> @Override
> public TableSink configure(String[] fieldNames, 
> TypeInformation[] fieldTypes) {
>   throw new UnsupportedOperationException(
>   "This sink is configured by passing a static 
> schema when initiating");
> }
> @Override
> public DataStreamSink consumeDataStream(DataStream 
> dataStream) {
>   return 
> dataStream.writeUsingOutputFormat(outputFormat).setParallelism(1).name("tableResult");
> }
>   }
> }
> {code}
>  
> Exception:
>  
> {code:java}
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule BatchExecWindowAggregateRule, args 
> [rel#264:FlinkLogicalWindowAggregate.LOGICAL.any.[](input=RelSubset#263,group={},ct=COUNT(),window=TumblingGroupWindow('w$,
>  $f0, 360),properties=w$start, w$end, w$rowtime), 
> rel#250:FlinkLogicalLegacyTableSourceScan.LOGICAL.any.[](table=[default_catalog,
>  default_database, source, source: [FileSystemTableSource(occur_time, 
> rowtime)]],fields=occur_time, rowtime)]
>       at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:244)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:636)
>       at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at 

[jira] [Comment Edited] (FLINK-20660) Time window operator with computed column triggers an exception in batch mode

2020-12-18 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17251616#comment-17251616
 ] 

shizhengchao edited comment on FLINK-20660 at 12/18/20, 9:08 AM:
-

[~sujun1020], i think the problem is the longToTimestamp method can not convert 
a long to timestamp(3), so you can try use this way:

{code:java}
`rowtime AS CAST(occur_time AS TIMESTAMP(3))`
{code}



was (Author: tinny):
[~sujun1020], i think the problem is the longToTimestamp method can not convert 
a long to timestamp(3), so you can try use this way:

> Time window operator with computed column triggers an exception in batch mode
> -
>
> Key: FLINK-20660
> URL: https://issues.apache.org/jira/browse/FLINK-20660
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.0
>Reporter: sujun
>Priority: Minor
>
> {{Time window operator with computed column triggers an exception in batch 
> mode, it may be a bug in BatchExecWindowAggregateRule.}}
> {{My test code:}}
> {code:java}
> public class WindowAggWithBigintTest {
>   public static void main(String[] args) throws Exception {
> EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> TableEnvironment tEnv = TableEnvironment.create(settings);
> 
> tEnv.registerFunction("longToTimestamp",new LongToTimestamp());   
>  
> String ddl = "CREATE TABLE source(occur_time bigint,rowtime AS 
> longToTimestamp(occur_time)) WITH ('connector' = 'filesystem', 'format' = 
> 'orc', 'path' = '/path/to/orc')";
> tEnv.executeSql(ddl);
> Table table = tEnv.sqlQuery("select TUMBLE_START(rowtime, 
> INTERVAL '1' HOUR) as ts,count(1) as ct from source group by TUMBLE(rowtime, 
> INTERVAL '1' HOUR)");
> DiscardingOutputFormat outputFormat = new 
> DiscardingOutputFormat();
> TableResultSink tableResultSink = new 
> TableResultSink(table.getSchema(), outputFormat);
> tEnv.registerTableSink("sink",tableResultSink);
> table.insertInto("sink");
> tEnv.execute("test");
>   }  
>   private static class TableResultSink implements StreamTableSink 
> {
> private final TableSchema schema;
> private final DataType rowType;
> private final OutputFormat outputFormat; 
>
> TableResultSink(TableSchema schema, OutputFormat 
> outputFormat) {
>   this.schema = schema;
>   this.rowType = schema.toRowDataType();
>   this.outputFormat = outputFormat;
> }
> @Override
> public DataType getConsumedDataType() {
>   return rowType;
> }
> @Override
> public TableSchema getTableSchema() {
>   return schema;
> }
> @Override
> public TableSink configure(String[] fieldNames, 
> TypeInformation[] fieldTypes) {
>   throw new UnsupportedOperationException(
>   "This sink is configured by passing a static 
> schema when initiating");
> }
> @Override
> public DataStreamSink consumeDataStream(DataStream 
> dataStream) {
>   return 
> dataStream.writeUsingOutputFormat(outputFormat).setParallelism(1).name("tableResult");
> }
>   }
> }
> {code}
>  
> Exception:
>  
> {code:java}
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule BatchExecWindowAggregateRule, args 
> [rel#264:FlinkLogicalWindowAggregate.LOGICAL.any.[](input=RelSubset#263,group={},ct=COUNT(),window=TumblingGroupWindow('w$,
>  $f0, 360),properties=w$start, w$end, w$rowtime), 
> rel#250:FlinkLogicalLegacyTableSourceScan.LOGICAL.any.[](table=[default_catalog,
>  default_database, source, source: [FileSystemTableSource(occur_time, 
> rowtime)]],fields=occur_time, rowtime)]
>       at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:244)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:636)
>       at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>       at 
> 

[jira] [Created] (FLINK-20607) a wrong example in udfs page.

2020-12-14 Thread shizhengchao (Jira)
shizhengchao created FLINK-20607:


 Summary: a wrong example in udfs page.
 Key: FLINK-20607
 URL: https://issues.apache.org/jira/browse/FLINK-20607
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.11.1, 1.12.0
Reporter: shizhengchao


Demonstration error of multiple input types in FunctionHint: 
{code:java}
@FunctionHint(
  input = [@DataTypeHint("INT"), @DataTypeHint("INT")],
  output = @DataTypeHint("INT")
)
{code}

should be 
{code:java}
@FunctionHint(
  input = {@DataTypeHint("INT"), @DataTypeHint("INT")},
  output = @DataTypeHint("INT")
)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20570) The `NOTE` tip style is different from the others in process_function page.

2020-12-11 Thread shizhengchao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhengchao updated FLINK-20570:
-
Description: 
in `/docs/stream/operators/process_function.md`, line 252.  The `NOTE` css 
style is different from the others.
{code:java}
current is: **NOTE:**

and another style is : Note
{code}

beside: 
`{% top %}`  appears in the middle,need to remove it

  was:
in `/docs/stream/operators/process_function.md`, line 252.  The `NOTE` css 
style is different from the others.
{code:java}
current is: **NOTE:**

and another style is : Note
{code}


> The `NOTE` tip style is different from the others in process_function page.
> ---
>
> Key: FLINK-20570
> URL: https://issues.apache.org/jira/browse/FLINK-20570
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Documentation
>Affects Versions: 1.12.0
>Reporter: shizhengchao
>Priority: Minor
>  Labels: pull-request-available
>
> in `/docs/stream/operators/process_function.md`, line 252.  The `NOTE` css 
> style is different from the others.
> {code:java}
> current is: **NOTE:**
> and another style is : Note
> {code}
> beside: 
> `{% top %}`  appears in the middle,need to remove it



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20570) `/docs/stream/operators/process_function.md`, line 252. The `NOTE tip` css style is different from the others.

2020-12-10 Thread shizhengchao (Jira)
shizhengchao created FLINK-20570:


 Summary:  `/docs/stream/operators/process_function.md`, line 252. 
The `NOTE tip` css style is different from the others.
 Key: FLINK-20570
 URL: https://issues.apache.org/jira/browse/FLINK-20570
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.12.0
Reporter: shizhengchao


in `/docs/stream/operators/process_function.md`, line 252.  The `NOTE` css 
style is different from the others.
{code:java}
current is: **NOTE:**

and another style is : Note
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20200) SQL Hints are not supported in "Create View" syntax

2020-11-17 Thread shizhengchao (Jira)
shizhengchao created FLINK-20200:


 Summary: SQL Hints are not supported in  "Create View" syntax
 Key: FLINK-20200
 URL: https://issues.apache.org/jira/browse/FLINK-20200
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.11.2
Reporter: shizhengchao


I have aready set the config option `table.dynamic-table-options.enabled` to be 
true, but "SQL Hints" are not supported in View syntax. I got an error:
{code:java}
Exception in thread "main" java.lang.UnsupportedOperationException: class 
org.apache.calcite.sql.SqlSyntax$6: SPECIAL
at org.apache.calcite.util.Util.needToImplement(Util.java:967)
at org.apache.calcite.sql.SqlSyntax$6.unparse(SqlSyntax.java:116)
at org.apache.calcite.sql.SqlOperator.unparse(SqlOperator.java:333)
at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:470)
at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:101)
at 
org.apache.calcite.sql.SqlSelectOperator.unparse(SqlSelectOperator.java:176)
at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:470)
at org.apache.calcite.sql.SqlSelect.unparse(SqlSelect.java:246)
at org.apache.calcite.sql.SqlNode.toSqlString(SqlNode.java:151)
at org.apache.calcite.sql.SqlNode.toSqlString(SqlNode.java:173)
at org.apache.calcite.sql.SqlNode.toSqlString(SqlNode.java:182)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.getQuotedSqlString(SqlToOperationConverter.java:784)
at 
org.apache.flink.table.planner.utils.Expander$Expanded.substitute(Expander.java:169)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertViewQuery(SqlToOperationConverter.java:694)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateView(SqlToOperationConverter.java:665)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:228)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
{code}

The sql code is as follows:

{code:java}
drop table if exists SourceA;
create table SourceA (
  idstring,
  name  string
) with (
  'connector' = 'kafka-0.11',
  'topic' = 'MyTopic',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'Test',
  'scan.startup.mode' = 'group-offsets',
  'format' = 'csv'
);

drop table if exists print;
create table print (
  idstring,
  name  string
) with (
  'connector' = 'print'
);

drop view if exists test_view;
create view test_view as
select
  *
from SourceA /*+ OPTIONS('properties.group.id'='NewGroup') */;

insert into print
select * from test_view;
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19723) The retry mechanism of jdbc connector has the risk of data duplication

2020-10-29 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17222726#comment-17222726
 ] 

shizhengchao commented on FLINK-19723:
--

Hi [~jark], after testing, this is not a bug, could you close the issue?:D

Generally, after executing executeBatch in jdbc, regardless of success or 
failure, the list of preparedStatements  will be cleared

> The retry mechanism of jdbc connector has the risk of data duplication
> --
>
> Key: FLINK-19723
> URL: https://issues.apache.org/jira/browse/FLINK-19723
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.11.1
>Reporter: shizhengchao
>Assignee: shizhengchao
>Priority: Major
>  Labels: pull-request-available
>
> for example,  if statement.executeBatch() failur for some reason, but the 
> "statement" was not closed in the retry, there is a risk of data duplication:
> {code:java}
> for (int i = 1; i <= executionOptions.getMaxRetries(); i++) {
>   try {
>   attemptFlush();
>   batchCount = 0;
>   break;
>   } catch (SQLException e) {
>   LOG.error("JDBC executeBatch error, retry times = {}", i, e);
>   if (i >= executionOptions.getMaxRetries()) {
>   throw new IOException(e);
>   }
>   try {
>   if 
> (!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) {
>   connection = 
> connectionProvider.reestablishConnection();
> jdbcStatementExecutor.closeStatements();
>   
> jdbcStatementExecutor.prepareStatements(connection);
>   }
>   } catch (Exception excpetion) {
>   LOG.error("JDBC connection is not valid, and 
> reestablish connection failed.", excpetion);
>   throw new IOException("Reestablish JDBC connection 
> failed", excpetion);
>   }
>   try {
>   Thread.sleep(1000 * i);
>   } catch (InterruptedException ex) {
>   Thread.currentThread().interrupt();
>   throw new IOException("unable to flush; interrupted 
> while doing another attempt", e);
>   }
>   }
> }
> {code}
> the correct code should be:
> {code:java}
> try {
>   if 
> (!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) {
>   connection = 
> connectionProvider.reestablishConnection();
>   }
> jdbcStatementExecutor.closeStatements();
>   jdbcStatementExecutor.prepareStatements(connection);
>   } catch (Exception excpetion) {
>   LOG.error("JDBC connection is not valid, and 
> reestablish connection failed.", excpetion);
>   throw new IOException("Reestablish JDBC connection 
> failed", excpetion);
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19818) ArrayIndexOutOfBoundsException occus when the source table have nest json

2020-10-26 Thread shizhengchao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhengchao updated FLINK-19818:
-
Description: 
I get an *ArrayIndexOutOfBoundsException* , when my table source have nest 
json. as the follows is my test:
{code:sql}
CREATE TABLE Orders (
  nest ROW<
idBIGINT,
consumerName  STRING,
price DECIMAL(10, 5),
productName   STRING
  >,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka-0.11',
  'topic' = 'Orders',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);

CREATE TABLE print (
  orderId   BIGINT,
  consumerName  STRING,
  price DECIMAL(10, 5),
  productName   STRING
) WITH (
  'connector' = 'print'
);

CREATE VIEW testView AS
SELECT
  id,
  consumerName,
  price,
  productName
FROM (
  SELECT * FROM Orders
);

INSERT INTO print
SELECT
  *
FROM testView;
{code}
The following is the exception of flink:
{code}
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1
at java.util.ArrayList.elementData(ArrayList.java:422)
at java.util.ArrayList.get(ArrayList.java:435)
at 
org.apache.calcite.sql.validate.SelectNamespace.getMonotonicity(SelectNamespace.java:73)
at 
org.apache.calcite.sql.SqlIdentifier.getMonotonicity(SqlIdentifier.java:375)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4132)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertViewQuery(SqlToOperationConverter.java:696)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateView(SqlToOperationConverter.java:665)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:228)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
at 
com.fcbox.streaming.sql.submit.StreamingJob.callExecuteSql(StreamingJob.java:239)
at 
com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:207)
at 
com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:133)
at 
com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:77)
{code}

  was:
I get an *ArrayIndexOutOfBoundsException* , when my table source have nest 
json. as the follows is my test:
{code:sql}
CREATE TABLE Orders (
  nest ROW<
idBIGINT,
consumerName  STRING,
price DECIMAL(10, 5),
productName   STRING
  >,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka-0.11',
  'topic' = 'Orders',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);

CREATE TABLE print (
  orderId   BIGINT,
  consumerName  STRING,
  price DECIMAL(10, 5),
  productName   STRING
) WITH (
  'connector' = 'print'
);

CREATE VIEW testView AS
SELECT
  id,
  consumerName,
  price,
  productName
FROM (
  SELECT * FROM Orders
);

INSERT INTO print
SELECT
  *
FROM testView;
{code}
The following is the exception of flink:
{code}
Unable to find source-code formatter for language: log. Available languages 
are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, 
groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, 
php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, 
yamlException in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1
at java.util.ArrayList.elementData(ArrayList.java:422)
at java.util.ArrayList.get(ArrayList.java:435)
at 
org.apache.calcite.sql.validate.SelectNamespace.getMonotonicity(SelectNamespace.java:73)
at 
org.apache.calcite.sql.SqlIdentifier.getMonotonicity(SqlIdentifier.java:375)
at 

[jira] [Updated] (FLINK-19818) ArrayIndexOutOfBoundsException occus when the source table have nest json

2020-10-26 Thread shizhengchao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhengchao updated FLINK-19818:
-
Description: 
I get an *ArrayIndexOutOfBoundsException* , when my table source have nest 
json. as the follows is my test:
{code:sql}
CREATE TABLE Orders (
  nest ROW<
idBIGINT,
consumerName  STRING,
price DECIMAL(10, 5),
productName   STRING
  >,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka-0.11',
  'topic' = 'Orders',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);

CREATE TABLE print (
  orderId   BIGINT,
  consumerName  STRING,
  price DECIMAL(10, 5),
  productName   STRING
) WITH (
  'connector' = 'print'
);

CREATE VIEW testView AS
SELECT
  id,
  consumerName,
  price,
  productName
FROM (
  SELECT * FROM Orders
);

INSERT INTO print
SELECT
  *
FROM testView;
{code}
The following is the exception of flink:
{code}
Unable to find source-code formatter for language: log. Available languages 
are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, 
groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, 
php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, 
yamlException in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1
at java.util.ArrayList.elementData(ArrayList.java:422)
at java.util.ArrayList.get(ArrayList.java:435)
at 
org.apache.calcite.sql.validate.SelectNamespace.getMonotonicity(SelectNamespace.java:73)
at 
org.apache.calcite.sql.SqlIdentifier.getMonotonicity(SqlIdentifier.java:375)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4132)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertViewQuery(SqlToOperationConverter.java:696)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateView(SqlToOperationConverter.java:665)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:228)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
at 
com.fcbox.streaming.sql.submit.StreamingJob.callExecuteSql(StreamingJob.java:239)
at 
com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:207)
at 
com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:133)
at 
com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:77)
{code}

  was:
I get an *ArrayIndexOutOfBoundsException* in *Interval Joins*, when my table 
source have nest json. as the follows is my test: 

{code:sql}
CREATE TABLE Orders (
  nest ROW<
idBIGINT,
consumerName  STRING,
price DECIMAL(10, 5),
productName   STRING
  >,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka-0.11',
  'topic' = 'Orders',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);

DROP TABLE IF EXISTS Shipments;
CREATE TABLE Shipments (
  idBIGINT,
  orderId   BIGINT,
  originSTRING,
  destnationSTRING,
  isArrived BOOLEAN,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka-0.11',
  'topic' = 'Shipments',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);

DROP TABLE IF EXISTS print;
CREATE TABLE print (
  orderId   BIGINT,
  consumerName  STRING,
  price DECIMAL(10, 5),
  productName   STRING,
  originSTRING,
  destnationSTRING,
  isArrived BOOLEAN
) WITH (
  'connector' = 'print'
);

DROP VIEW IF EXISTS IntervalJoinView;
CREATE VIEW IntervalJoinView AS
SELECT
  o.id,
  o.consumerName,
  o.price,
  o.productName,
  

[jira] [Updated] (FLINK-19818) ArrayIndexOutOfBoundsException occus when the source table have nest json

2020-10-26 Thread shizhengchao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhengchao updated FLINK-19818:
-
Summary: ArrayIndexOutOfBoundsException occus  when the source table have 
nest json  (was: ArrayIndexOutOfBoundsException occus in 'Interval Joins' when 
the source table have nest json)

> ArrayIndexOutOfBoundsException occus  when the source table have nest json
> --
>
> Key: FLINK-19818
> URL: https://issues.apache.org/jira/browse/FLINK-19818
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.2
>Reporter: shizhengchao
>Priority: Major
>
> I get an *ArrayIndexOutOfBoundsException* in *Interval Joins*, when my table 
> source have nest json. as the follows is my test: 
> {code:sql}
> CREATE TABLE Orders (
>   nest ROW<
> idBIGINT,
> consumerName  STRING,
> price DECIMAL(10, 5),
> productName   STRING
>   >,
>   proctime AS PROCTIME()
> ) WITH (
>   'connector' = 'kafka-0.11',
>   'topic' = 'Orders',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'json'
> );
> DROP TABLE IF EXISTS Shipments;
> CREATE TABLE Shipments (
>   idBIGINT,
>   orderId   BIGINT,
>   originSTRING,
>   destnationSTRING,
>   isArrived BOOLEAN,
>   proctime AS PROCTIME()
> ) WITH (
>   'connector' = 'kafka-0.11',
>   'topic' = 'Shipments',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'json'
> );
> DROP TABLE IF EXISTS print;
> CREATE TABLE print (
>   orderId   BIGINT,
>   consumerName  STRING,
>   price DECIMAL(10, 5),
>   productName   STRING,
>   originSTRING,
>   destnationSTRING,
>   isArrived BOOLEAN
> ) WITH (
>   'connector' = 'print'
> );
> DROP VIEW IF EXISTS IntervalJoinView;
> CREATE VIEW IntervalJoinView AS
> SELECT
>   o.id,
>   o.consumerName,
>   o.price,
>   o.productName,
>   s.origin,
>   s.destnation,
>   s.isArrived
> FROM
>   (SELECT * FROM Orders) o,
>   (SELECT * FROM Shipments) s
> WHERE s.orderId = o.id AND o.proctime BETWEEN s.proctime - INTERVAL '4' HOUR 
> AND s.proctime;
> INSERT INTO print
> SELECT
>   id,
>   consumerName,
>   price,
>   productName,
>   origin,
>   destnation,
>   isArrived
> FROM IntervalJoinView;
> {code}
> The following is the exception of flink:
> {code:log}
> Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1
>   at java.util.ArrayList.elementData(ArrayList.java:422)
>   at java.util.ArrayList.get(ArrayList.java:435)
>   at 
> org.apache.calcite.sql.validate.SelectNamespace.getMonotonicity(SelectNamespace.java:73)
>   at 
> org.apache.calcite.sql.SqlIdentifier.getMonotonicity(SqlIdentifier.java:375)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4132)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertViewQuery(SqlToOperationConverter.java:696)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateView(SqlToOperationConverter.java:665)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:228)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>   at 
> com.fcbox.streaming.sql.submit.StreamingJob.callExecuteSql(StreamingJob.java:239)
>   at 
> com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:207)
>   at 
> com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:133)
>   at 
> com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:77)
> {code}

[jira] [Commented] (FLINK-19818) ArrayIndexOutOfBoundsException occus in 'Interval Joins' when the source table have nest json

2020-10-26 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17221118#comment-17221118
 ] 

shizhengchao commented on FLINK-19818:
--

After my test, only  `select * from Orders`   will report an error,  while use 
`select id, consumerName,  price,  productName,  proctime from Orders`  will be 
OK

> ArrayIndexOutOfBoundsException occus in 'Interval Joins' when the source 
> table have nest json
> -
>
> Key: FLINK-19818
> URL: https://issues.apache.org/jira/browse/FLINK-19818
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.2
>Reporter: shizhengchao
>Priority: Major
>
> I get an *ArrayIndexOutOfBoundsException* in *Interval Joins*, when my table 
> source have nest json. as the follows is my test: 
> {code:sql}
> CREATE TABLE Orders (
>   nest ROW<
> idBIGINT,
> consumerName  STRING,
> price DECIMAL(10, 5),
> productName   STRING
>   >,
>   proctime AS PROCTIME()
> ) WITH (
>   'connector' = 'kafka-0.11',
>   'topic' = 'Orders',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'json'
> );
> DROP TABLE IF EXISTS Shipments;
> CREATE TABLE Shipments (
>   idBIGINT,
>   orderId   BIGINT,
>   originSTRING,
>   destnationSTRING,
>   isArrived BOOLEAN,
>   proctime AS PROCTIME()
> ) WITH (
>   'connector' = 'kafka-0.11',
>   'topic' = 'Shipments',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'json'
> );
> DROP TABLE IF EXISTS print;
> CREATE TABLE print (
>   orderId   BIGINT,
>   consumerName  STRING,
>   price DECIMAL(10, 5),
>   productName   STRING,
>   originSTRING,
>   destnationSTRING,
>   isArrived BOOLEAN
> ) WITH (
>   'connector' = 'print'
> );
> DROP VIEW IF EXISTS IntervalJoinView;
> CREATE VIEW IntervalJoinView AS
> SELECT
>   o.id,
>   o.consumerName,
>   o.price,
>   o.productName,
>   s.origin,
>   s.destnation,
>   s.isArrived
> FROM
>   (SELECT * FROM Orders) o,
>   (SELECT * FROM Shipments) s
> WHERE s.orderId = o.id AND o.proctime BETWEEN s.proctime - INTERVAL '4' HOUR 
> AND s.proctime;
> INSERT INTO print
> SELECT
>   id,
>   consumerName,
>   price,
>   productName,
>   origin,
>   destnation,
>   isArrived
> FROM IntervalJoinView;
> {code}
> The following is the exception of flink:
> {code:log}
> Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1
>   at java.util.ArrayList.elementData(ArrayList.java:422)
>   at java.util.ArrayList.get(ArrayList.java:435)
>   at 
> org.apache.calcite.sql.validate.SelectNamespace.getMonotonicity(SelectNamespace.java:73)
>   at 
> org.apache.calcite.sql.SqlIdentifier.getMonotonicity(SqlIdentifier.java:375)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4132)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertViewQuery(SqlToOperationConverter.java:696)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateView(SqlToOperationConverter.java:665)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:228)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>   at 
> com.fcbox.streaming.sql.submit.StreamingJob.callExecuteSql(StreamingJob.java:239)
>   at 
> com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:207)
>   at 
> com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:133)
>   at 
> 

[jira] [Created] (FLINK-19818) ArrayIndexOutOfBoundsException occus in 'Interval Joins' when the source table have nest json

2020-10-26 Thread shizhengchao (Jira)
shizhengchao created FLINK-19818:


 Summary: ArrayIndexOutOfBoundsException occus in 'Interval Joins' 
when the source table have nest json
 Key: FLINK-19818
 URL: https://issues.apache.org/jira/browse/FLINK-19818
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.11.2
Reporter: shizhengchao


I get an *ArrayIndexOutOfBoundsException* in *Interval Joins*, when my table 
source have nest json. as the follows is my test: 

{code:sql}
CREATE TABLE Orders (
  nest ROW<
idBIGINT,
consumerName  STRING,
price DECIMAL(10, 5),
productName   STRING
  >,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka-0.11',
  'topic' = 'Orders',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);

DROP TABLE IF EXISTS Shipments;
CREATE TABLE Shipments (
  idBIGINT,
  orderId   BIGINT,
  originSTRING,
  destnationSTRING,
  isArrived BOOLEAN,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka-0.11',
  'topic' = 'Shipments',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);

DROP TABLE IF EXISTS print;
CREATE TABLE print (
  orderId   BIGINT,
  consumerName  STRING,
  price DECIMAL(10, 5),
  productName   STRING,
  originSTRING,
  destnationSTRING,
  isArrived BOOLEAN
) WITH (
  'connector' = 'print'
);

DROP VIEW IF EXISTS IntervalJoinView;
CREATE VIEW IntervalJoinView AS
SELECT
  o.id,
  o.consumerName,
  o.price,
  o.productName,
  s.origin,
  s.destnation,
  s.isArrived
FROM
  (SELECT * FROM Orders) o,
  (SELECT * FROM Shipments) s
WHERE s.orderId = o.id AND o.proctime BETWEEN s.proctime - INTERVAL '4' HOUR 
AND s.proctime;

INSERT INTO print
SELECT
  id,
  consumerName,
  price,
  productName,
  origin,
  destnation,
  isArrived
FROM IntervalJoinView;
{code}

The following is the exception of flink:

{code:log}
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1
at java.util.ArrayList.elementData(ArrayList.java:422)
at java.util.ArrayList.get(ArrayList.java:435)
at 
org.apache.calcite.sql.validate.SelectNamespace.getMonotonicity(SelectNamespace.java:73)
at 
org.apache.calcite.sql.SqlIdentifier.getMonotonicity(SqlIdentifier.java:375)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4132)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertViewQuery(SqlToOperationConverter.java:696)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateView(SqlToOperationConverter.java:665)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:228)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
at 
com.fcbox.streaming.sql.submit.StreamingJob.callExecuteSql(StreamingJob.java:239)
at 
com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:207)
at 
com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:133)
at 
com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:77)
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19629) Avro format cause NullPointException,as null value in MAP type's value type

2020-10-21 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218199#comment-17218199
 ] 

shizhengchao commented on FLINK-19629:
--

Hi, [~jark] Can you help me review the code, thank you very mutch.

> Avro format cause NullPointException,as null value in MAP type's  value type
> 
>
> Key: FLINK-19629
> URL: https://issues.apache.org/jira/browse/FLINK-19629
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.2
>Reporter: shizhengchao
>Assignee: shizhengchao
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> create table tableA (
>   name  STRING,
>   hobly MAP,
>   phone STRING
> ) with (
>   'connector' = 'kafka-0.11',
>   'topic' = 'ShizcTest',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'ShizcTest',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'avro'
> );
> if hobly have an null value like this:
> {"name": "shizc", "hobly": {"key1":null}, "phone": "1104564"}
> cause an NullPointException:
> {code:java}
> java.io.IOException: Failed to deserialize Avro record.
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
>   at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:147)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> Caused by: java.lang.NullPointerException: null
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createConverter$57e941b$5(AvroRowDataDeserializationSchema.java:252)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createMapConverter$7941d275$1(AvroRowDataDeserializationSchema.java:315)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:221)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:206)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:148)
>   ... 8 common frames omitted
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19735) TableFunction can not work in Flink View

2020-10-20 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217535#comment-17217535
 ] 

shizhengchao commented on FLINK-19735:
--

Hi [~jark], my flink version is 1.11.1, I found that this problem has been 
resolved in version 1.11.2, see  FLINK-18750

> TableFunction can not work in Flink View
> 
>
> Key: FLINK-19735
> URL: https://issues.apache.org/jira/browse/FLINK-19735
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.1
>Reporter: shizhengchao
>Priority: Major
>
> TableFunction can't be work in Flink Sql. Here is my code:
> {code:sql}
> CREATE TABLE test (
>   myField   STRING,
>   name  STRING
> ) WITH (
>   'connector' = 'kafka-0.11',
>   'topic' = '',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'mygroup',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'csv'
> );
> CREATE TABLE print (
>   myField   STRING,
>   newWord   STRING,
>   newLength INT
> ) WITH (
>   'connector' = 'print'
> );
> CREATE VIEW test_view AS
> SELECT myField, newWord, newLength FROM test, LATERAL 
> TABLE(SplitFunction(myField));
> INSERT INTO print
> SELECT * FROM test_view;
> {code}
> And the function code as this:
> {code:java}
> @FunctionHint(output = @DataTypeHint("ROW"))
> public class SplitFunction extends TableFunction {
> public void eval(String str) {
> for (String s : str.split(" ")) {
> collect(Row.of(s, s.length()));
> }
> }
> }
> {code}
> run the sql,cause an error:
> {code}
> Unable to find source-code formatter for language: log. Available languages 
> are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, 
> groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, 
> perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, 
> yamlException in thread "main" 
> org.apache.flink.table.api.ValidationException: SQL validation failed. From 
> line 2, column 17 to line 2, column 23: Column 'newWord' not found in any 
> table
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>   at 
> org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52)
>   at 
> com.fcbox.streaming.sql.submit.StreamingJob.callInsertInto(StreamingJob.java:208)
>   at 
> com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:200)
>   at 
> com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:129)
>   at 
> com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:73)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 2, 
> column 17 to line 2, column 23: Column 'newWord' not found in any table
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6154)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6140)
>   at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5574)
>   

[jira] [Updated] (FLINK-19735) TableFunction can not work in Flink View

2020-10-20 Thread shizhengchao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhengchao updated FLINK-19735:
-
Description: 
TableFunction can't be work in Flink Sql. Here is my code:
{code:sql}
CREATE TABLE test (
  myField   STRING,
  name  STRING
) WITH (
  'connector' = 'kafka-0.11',
  'topic' = '',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'mygroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'csv'
);
CREATE TABLE print (
  myField   STRING,
  newWord   STRING,
  newLength INT
) WITH (
  'connector' = 'print'
);

CREATE VIEW test_view AS
SELECT myField, newWord, newLength FROM test, LATERAL 
TABLE(SplitFunction(myField));

INSERT INTO print
SELECT * FROM test_view;
{code}
And the function code as this:
{code:java}
@FunctionHint(output = @DataTypeHint("ROW"))
public class SplitFunction extends TableFunction {

public void eval(String str) {
for (String s : str.split(" ")) {
collect(Row.of(s, s.length()));
}
}
}
{code}
run the sql,cause an error:
{code}
Unable to find source-code formatter for language: log. Available languages 
are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, 
groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, 
php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, 
yamlException in thread "main" org.apache.flink.table.api.ValidationException: 
SQL validation failed. From line 2, column 17 to line 2, column 23: Column 
'newWord' not found in any table
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52)
at 
com.fcbox.streaming.sql.submit.StreamingJob.callInsertInto(StreamingJob.java:208)
at 
com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:200)
at 
com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:129)
at 
com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:73)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 2, 
column 17 to line 2, column 23: Column 'newWord' not found in any table
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
at 
org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6154)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6140)
at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5574)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:452)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4255)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3523)
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at 

[jira] [Updated] (FLINK-19735) TableFunction can not work in Flink View

2020-10-20 Thread shizhengchao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhengchao updated FLINK-19735:
-
Description: 
TableFunction can't be work in Flink Sql. Here is my code:
{code:sql}
CREATE TABLE test (
  myField   STRING,
  name  STRING
) WITH (
  'connector' = 'kafka-0.11',
  'topic' = '',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'mygroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'csv'
);
CREATE TABLE print (
  myField   STRING,
  newWord   STRING,
  newLength INT
) WITH (
  'connector' = 'print'
);

CREATE VIEW test_view AS
SELECT myField, newWord, newLength FROM test, LATERAL 
TABLE(SplitFunction(myField));

INSERT INTO print
SELECT * FROM test_view;
{code}
And the function code as this:
{code:java}
@FunctionHint(output = @DataTypeHint("ROW"))
public class SplitFunction extends TableFunction {

public void eval(String str) {
for (String s : str.split(" ")) {
collect(Row.of(s, s.length()));
}
}
}
{code}
run the sql,cause an error:
{code}
Unable to find source-code formatter for language: log. Available languages 
are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, 
groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, 
php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, 
yamlException in thread "main" org.apache.flink.table.api.ValidationException: 
SQL validation failed. From line 2, column 17 to line 2, column 23: Column 
'newWord' not found in any table
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52)
at 
com.fcbox.streaming.sql.submit.StreamingJob.callInsertInto(StreamingJob.java:208)
at 
com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:200)
at 
com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:129)
at 
com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:73)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 2, 
column 17 to line 2, column 23: Column 'newWord' not found in any table
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
at 
org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6154)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6140)
at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5574)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:452)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4255)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3523)
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at 

[jira] [Commented] (FLINK-19735) TableFunction can not work in Flink View

2020-10-20 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217527#comment-17217527
 ] 

shizhengchao commented on FLINK-19735:
--

Hi, [~jark] , currently, TableFunction only work effect in "INSERT INTO" 
statement

> TableFunction can not work in Flink View
> 
>
> Key: FLINK-19735
> URL: https://issues.apache.org/jira/browse/FLINK-19735
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.1
>Reporter: shizhengchao
>Priority: Major
>
> TableFunction can't be work in Flink Sql. Here is my code:
> {code:sql}
> CREATE TABLE test (
>   myField   STRING,
>   name  STRING
> ) WITH (
>   'connector' = 'kafka-0.11',
>   'topic' = '',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'mygroup',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'csv'
> );
> CREATE TABLE print (
>   myField   STRING,
>   newWord   STRING,
>   newLength INT
> ) WITH (
>   'connector' = 'print'
> );
> CREATE VIEW test_view AS
> SELECT myField, newWord, newLength FROM test, LATERAL 
> TABLE(SplitFunction(myField));
> INSERT INTO print
> SELECT * FROM test_view;
> {code}
> And the function code as this:
> {code:java}
> @FunctionHint(output = @DataTypeHint("ROW"))
> public class SplitFunction extends TableFunction {
> public void eval(String str) {
> for (String s : str.split(" ")) {
> collect(Row.of(s, s.length()));
> }
> }
> }
> {code}
> run the sql,cause an error:
> {code}
> Unable to find source-code formatter for language: log. Available languages 
> are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, 
> groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, 
> perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, 
> yamlException in thread "main" 
> org.apache.flink.table.api.ValidationException: SQL validation failed. From 
> line 2, column 17 to line 2, column 23: Column 'newWord' not found in any 
> table
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>   at 
> org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52)
>   at 
> com.fcbox.streaming.sql.submit.StreamingJob.callInsertInto(StreamingJob.java:208)
>   at 
> com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:200)
>   at 
> com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:129)
>   at 
> com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:73)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 2, 
> column 17 to line 2, column 23: Column 'newWord' not found in any table
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6154)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6140)
>   at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5574)
>   at 
> 

[jira] [Created] (FLINK-19735) TableFunction can not work in Flink View

2020-10-20 Thread shizhengchao (Jira)
shizhengchao created FLINK-19735:


 Summary: TableFunction can not work in Flink View
 Key: FLINK-19735
 URL: https://issues.apache.org/jira/browse/FLINK-19735
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.11.1
Reporter: shizhengchao


TableFunction can't be work in Flink Sql. Here is my code:
{code:sql}
CREATE TABLE test (
  myField   STRING,
  name  STRING
) WITH (
  'connector' = 'kafka-0.11',
  'topic' = '',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'mygroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'csv'
);
CREATE TABLE print (
  myField   STRING,
  newWord   STRING,
  newLength INT
) WITH (
  'connector' = 'print'
);

CREATE VIEW test_view AS
SELECT myField, newWord, newLength FROM test, LATERAL 
TABLE(SplitFunction(myField));

INSERT INTO print
SELECT * FROM test_view;
{code}
And the function code as this:
{code:java}
@FunctionHint(output = @DataTypeHint("ROW"))
public class SplitFunction extends TableFunction {

public void eval(String str) {
for (String s : str.split(" ")) {
collect(Row.of(s, s.length()));
}
}
}
{code}
run the sql,cause an error:
{code}
Unable to find source-code formatter for language: log. Available languages 
are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, 
groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, 
php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, 
yamlException in thread "main" org.apache.flink.table.api.ValidationException: 
SQL validation failed. From line 2, column 17 to line 2, column 23: Column 
'newWord' not found in any table
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52)
at 
com.fcbox.streaming.sql.submit.StreamingJob.callInsertInto(StreamingJob.java:208)
at 
com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:200)
at 
com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:129)
at 
com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:73)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 2, 
column 17 to line 2, column 23: Column 'newWord' not found in any table
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
at 
org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6154)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6140)
at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5574)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:452)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4255)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3523)
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at 

[jira] [Created] (FLINK-19723) The retry mechanism of jdbc connector has the risk of data duplication

2020-10-19 Thread shizhengchao (Jira)
shizhengchao created FLINK-19723:


 Summary: The retry mechanism of jdbc connector has the risk of 
data duplication
 Key: FLINK-19723
 URL: https://issues.apache.org/jira/browse/FLINK-19723
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Affects Versions: 1.11.1
Reporter: shizhengchao


for example,  if statement.executeBatch() failur for some reason, but the 
"statement" was not closed in the retry, there is a risk of data duplication:
{code:java}
for (int i = 1; i <= executionOptions.getMaxRetries(); i++) {
try {
attemptFlush();
batchCount = 0;
break;
} catch (SQLException e) {
LOG.error("JDBC executeBatch error, retry times = {}", i, e);
if (i >= executionOptions.getMaxRetries()) {
throw new IOException(e);
}
try {
if 
(!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) {
connection = 
connectionProvider.reestablishConnection();
jdbcStatementExecutor.closeStatements();

jdbcStatementExecutor.prepareStatements(connection);
}
} catch (Exception excpetion) {
LOG.error("JDBC connection is not valid, and 
reestablish connection failed.", excpetion);
throw new IOException("Reestablish JDBC connection 
failed", excpetion);
}
try {
Thread.sleep(1000 * i);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException("unable to flush; interrupted 
while doing another attempt", e);
}
}
}
{code}

the correct code should be:
{code:java}
try {
if 
(!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) {
connection = 
connectionProvider.reestablishConnection();
}
jdbcStatementExecutor.closeStatements();
jdbcStatementExecutor.prepareStatements(connection);
} catch (Exception excpetion) {
LOG.error("JDBC connection is not valid, and 
reestablish connection failed.", excpetion);
throw new IOException("Reestablish JDBC connection 
failed", excpetion);
}
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18545) Sql api cannot specify flink job name

2020-10-15 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17214576#comment-17214576
 ] 

shizhengchao commented on FLINK-18545:
--

How about adding a tableoptions, for example "job.name", and use it in 
*TableEnvironmentImpl#executeInternal* :

{code:java}
@Override
public TableResult executeInternal(List operations) {
List> transformations = translate(operations);
List sinkIdentifierNames = 
extractSinkIdentifierNames(operations);
String jobName = "insert-into_" + String.join(",", sinkIdentifierNames);

String name = tableConfig.getConfiguration().getString("job.name", 
jobName);
Pipeline pipeline = execEnv.createPipeline(transformations, 
tableConfig, name);
...
}
{code}


> Sql api cannot specify flink job name
> -
>
> Key: FLINK-18545
> URL: https://issues.apache.org/jira/browse/FLINK-18545
> Project: Flink
>  Issue Type: Improvement
>  Components: Client / Job Submission, Table SQL / API
>Affects Versions: 1.11.0
> Environment: execute sql : 
> StreamTableEnvironment.executeSql("insert into user_log_sink select user_id, 
> item_id, category_id, behavior, ts from user_log")
> current job name :  org.apache.flink.table.api.internal.TableEnvironmentImpl
> {code:java}
> public TableResult executeInternal(List operations) {
> List> transformations = translate(operations);
> List sinkIdentifierNames = extractSinkIdentifierNames(operations);
> String jobName = "insert-into_" + String.join(",", sinkIdentifierNames);
> Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, 
> jobName);
> try {
> JobClient jobClient = execEnv.executeAsync(pipeline);
> TableSchema.Builder builder = TableSchema.builder();
> Object[] affectedRowCounts = new Long[operations.size()];
> for (int i = 0; i < operations.size(); ++i) {
> // use sink identifier name as field name
> builder.field(sinkIdentifierNames.get(i), DataTypes.BIGINT());
> affectedRowCounts[i] = -1L;
> }return TableResultImpl.builder()
> .jobClient(jobClient)
> .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
> .tableSchema(builder.build())
> .data(Collections.singletonList(Row.of(affectedRowCounts)))
> .build();
> } catch (Exception e) {
> throw new TableException("Failed to execute sql", e);
> }
> }
> {code}
>Reporter: venn wu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.3
>
>
> In Flink 1.11.0, {color:#172b4d}StreamTableEnvironment.executeSql(sql) 
> {color}will explan and execute job Immediately, The job name will special as 
> "insert-into_sink-table-name".  But we have Multiple sql job will insert into 
> a same sink table, this is not very friendly. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-19629) Avro format cause NullPointException,as null value in MAP type's value type

2020-10-14 Thread shizhengchao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhengchao updated FLINK-19629:
-
Comment: was deleted

(was: Avro union(something, null)  spelled incorrectly, should be Avro 
unions(something, null), not union. 
and i think, an example nullable types should be provided. So the complete 
documentation should be like this: 

In addition to the types listed above, Flink supports reading/writing nullable 
types, e.g "behavior STRING NULL" . Flink maps nullable types to Avro 
unions(something, null), where something is the Avro type converted from Flink 
type.)

> Avro format cause NullPointException,as null value in MAP type's  value type
> 
>
> Key: FLINK-19629
> URL: https://issues.apache.org/jira/browse/FLINK-19629
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.2
>Reporter: shizhengchao
>Assignee: shizhengchao
>Priority: Critical
> Fix For: 1.12.0
>
>
> create table tableA (
>   name  STRING,
>   hobly MAP,
>   phone STRING
> ) with (
>   'connector' = 'kafka-0.11',
>   'topic' = 'ShizcTest',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'ShizcTest',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'avro'
> );
> if hobly have an null value like this:
> {"name": "shizc", "hobly": {"key1":null}, "phone": "1104564"}
> cause an NullPointException:
> {code:java}
> java.io.IOException: Failed to deserialize Avro record.
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
>   at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:147)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> Caused by: java.lang.NullPointerException: null
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createConverter$57e941b$5(AvroRowDataDeserializationSchema.java:252)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createMapConverter$7941d275$1(AvroRowDataDeserializationSchema.java:315)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:221)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:206)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:148)
>   ... 8 common frames omitted
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19638) AvroUnionLogicalSerializerTest class compile error

2020-10-14 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17213777#comment-17213777
 ] 

shizhengchao commented on FLINK-19638:
--

[~dwysakowicz] thank you , i change avro-tool.jar to 1.10.0.jar, and the tests 
is normal

> AvroUnionLogicalSerializerTest class compile error
> --
>
> Key: FLINK-19638
> URL: https://issues.apache.org/jira/browse/FLINK-19638
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.11.2
>Reporter: shizhengchao
>Priority: Minor
>
> the constructor parameter is java.lang.Long of class UnionLogicalType , but 
> the tests is java.time.Instant
> {code:java}
> @Override
>   protected UnionLogicalType[] getTestData() {
>   final Random rnd = new Random();
>   final UnionLogicalType[] data = new UnionLogicalType[20];
>   for (int i = 0; i < data.length; i++) {
>   data[i] = new 
> UnionLogicalType(Instant.ofEpochMilli(rnd.nextLong()));
>   }
>   return data;
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19638) AvroUnionLogicalSerializerTest class compile error

2020-10-14 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17213774#comment-17213774
 ] 

shizhengchao commented on FLINK-19638:
--

Flink version is 1.12-SNAPSHOT.

I generated the code with avro-tools-1.7.7.jar, to generated the code of the 
package 'org.apache.flink.formats.avro.generated'

> AvroUnionLogicalSerializerTest class compile error
> --
>
> Key: FLINK-19638
> URL: https://issues.apache.org/jira/browse/FLINK-19638
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.11.2
>Reporter: shizhengchao
>Priority: Minor
>
> the constructor parameter is java.lang.Long of class UnionLogicalType , but 
> the tests is java.time.Instant
> {code:java}
> @Override
>   protected UnionLogicalType[] getTestData() {
>   final Random rnd = new Random();
>   final UnionLogicalType[] data = new UnionLogicalType[20];
>   for (int i = 0; i < data.length; i++) {
>   data[i] = new 
> UnionLogicalType(Instant.ofEpochMilli(rnd.nextLong()));
>   }
>   return data;
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19638) AvroUnionLogicalSerializerTest class compile error

2020-10-14 Thread shizhengchao (Jira)
shizhengchao created FLINK-19638:


 Summary: AvroUnionLogicalSerializerTest class compile error
 Key: FLINK-19638
 URL: https://issues.apache.org/jira/browse/FLINK-19638
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.11.2
Reporter: shizhengchao


the constructor parameter is java.lang.Long of class UnionLogicalType , but the 
tests is java.time.Instant
{code:java}
@Override
protected UnionLogicalType[] getTestData() {
final Random rnd = new Random();
final UnionLogicalType[] data = new UnionLogicalType[20];

for (int i = 0; i < data.length; i++) {
data[i] = new 
UnionLogicalType(Instant.ofEpochMilli(rnd.nextLong()));
}

return data;
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19629) Avro format cause NullPointException,as null value in MAP type's value type

2020-10-14 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17213667#comment-17213667
 ] 

shizhengchao commented on FLINK-19629:
--

This is a duplicate of FLINK-19622

> Avro format cause NullPointException,as null value in MAP type's  value type
> 
>
> Key: FLINK-19629
> URL: https://issues.apache.org/jira/browse/FLINK-19629
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.2
>Reporter: shizhengchao
>Priority: Critical
> Fix For: 1.12.0
>
>
> create table tableA (
>   name  STRING,
>   hobly MAP,
>   phone STRING
> ) with (
>   'connector' = 'kafka-0.11',
>   'topic' = 'ShizcTest',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'ShizcTest',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'avro'
> );
> if hobly have an null value like this:
> {"name": "shizc", "hobly": {"key1":null}, "phone": "1104564"}
> cause an NullPointException:
> {code:java}
> java.io.IOException: Failed to deserialize Avro record.
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
>   at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:147)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> Caused by: java.lang.NullPointerException: null
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createConverter$57e941b$5(AvroRowDataDeserializationSchema.java:252)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createMapConverter$7941d275$1(AvroRowDataDeserializationSchema.java:315)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:221)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:206)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:148)
>   ... 8 common frames omitted
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19629) Avro format cause NullPointException,as null value in MAP type's value type

2020-10-14 Thread shizhengchao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhengchao updated FLINK-19629:
-
Description: 
create table tableA (
  name  STRING,
  hobly MAP,
  phone STRING
) with (
  'connector' = 'kafka-0.11',
  'topic' = 'ShizcTest',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'ShizcTest',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'avro'
);
if hobly have an null value like this:

{"name": "shizc", "hobly": {"key1":null}, "phone": "1104564"}

cause an NullPointException:

{code:java}
java.io.IOException: Failed to deserialize Avro record.
at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150)
at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:147)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused by: java.lang.NullPointerException: null
at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createConverter$57e941b$5(AvroRowDataDeserializationSchema.java:252)
at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createMapConverter$7941d275$1(AvroRowDataDeserializationSchema.java:315)
at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:221)
at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:206)
at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:148)
... 8 common frames omitted
{code}


  was:
the docs Connectors/Table & SQL Connectors/Formats/Avro:
 In addition to the types listed above, Flink supports reading/writing nullable 
types. Flink maps nullable types to Avro union(something, null), where 
something is the Avro type converted from Flink type.

avro have no union type, should be unions:
 Avro unions(something, null)

by the way, an example should be provided that reading/writing nullable types, 
such as this:
{code:java}
CREATE TABLE user_behavior (
  behavior STRING NULL
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'avro'
)
{code}


> Avro format cause NullPointException,as null value in MAP type's  value type
> 
>
> Key: FLINK-19629
> URL: https://issues.apache.org/jira/browse/FLINK-19629
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.2
>Reporter: shizhengchao
>Priority: Critical
> Fix For: 1.12.0
>
>
> create table tableA (
>   name  STRING,
>   hobly MAP,
>   phone STRING
> ) with (
>   'connector' = 'kafka-0.11',
>   'topic' = 'ShizcTest',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'ShizcTest',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'avro'
> );
> if hobly have an null value like this:
> {"name": "shizc", "hobly": {"key1":null}, "phone": "1104564"}
> cause an NullPointException:
> {code:java}
> java.io.IOException: Failed to deserialize Avro record.
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
>   at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>   at 
> 

[jira] [Updated] (FLINK-19629) Avro format cause NullPointException,as null value in MAP type's value type

2020-10-14 Thread shizhengchao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhengchao updated FLINK-19629:
-
Component/s: (was: Documentation)
 Formats (JSON, Avro, Parquet, ORC, SequenceFile)

> Avro format cause NullPointException,as null value in MAP type's  value type
> 
>
> Key: FLINK-19629
> URL: https://issues.apache.org/jira/browse/FLINK-19629
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.11.2
>Reporter: shizhengchao
>Priority: Critical
> Fix For: 1.12.0
>
>
> the docs Connectors/Table & SQL Connectors/Formats/Avro:
>  In addition to the types listed above, Flink supports reading/writing 
> nullable types. Flink maps nullable types to Avro union(something, null), 
> where something is the Avro type converted from Flink type.
> avro have no union type, should be unions:
>  Avro unions(something, null)
> by the way, an example should be provided that reading/writing nullable 
> types, such as this:
> {code:java}
> CREATE TABLE user_behavior (
>   behavior STRING NULL
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'testGroup',
>  'format' = 'avro'
> )
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19629) Avro format cause NullPointException,as null value in MAP type's value type

2020-10-14 Thread shizhengchao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhengchao updated FLINK-19629:
-
Summary: Avro format cause NullPointException,as null value in MAP type's  
value type  (was: English words are spelled incorrectly and an example is not 
provided)

> Avro format cause NullPointException,as null value in MAP type's  value type
> 
>
> Key: FLINK-19629
> URL: https://issues.apache.org/jira/browse/FLINK-19629
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.11.2
>Reporter: shizhengchao
>Priority: Critical
> Fix For: 1.12.0
>
>
> the docs Connectors/Table & SQL Connectors/Formats/Avro:
>  In addition to the types listed above, Flink supports reading/writing 
> nullable types. Flink maps nullable types to Avro union(something, null), 
> where something is the Avro type converted from Flink type.
> avro have no union type, should be unions:
>  Avro unions(something, null)
> by the way, an example should be provided that reading/writing nullable 
> types, such as this:
> {code:java}
> CREATE TABLE user_behavior (
>   behavior STRING NULL
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'testGroup',
>  'format' = 'avro'
> )
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-19629) English words are spelled incorrectly and an example is not provided

2020-10-14 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17213633#comment-17213633
 ] 

shizhengchao edited comment on FLINK-19629 at 10/14/20, 6:00 AM:
-

[~jark] Thank you for reminding me that found this bug code: 

{code:java}
private static AvroToRowDataConverter createMapConverter(LogicalType type) {
final AvroToRowDataConverter keyConverter = 
createConverter(DataTypes.STRING().getLogicalType());
final AvroToRowDataConverter valueConverter = 
createConverter(extractValueTypeToAvroMap(type));

return avroObject -> {
final Map map = (Map) avroObject;
Map result = new HashMap<>();
for (Map.Entry entry : map.entrySet()) {
Object key = 
keyConverter.convert(entry.getKey());
Object value = 
valueConverter.convert(entry.getValue());
result.put(key, value);
}
return new GenericMapData(result);
};
}
{code}

 if you don’t have time to fix it, I can fix it and submit the code after 
verification.:D




was (Author: tinny):
[~jark] Thank you for reminding me that found this bug code,  if you don’t have 
time to fix it, I can fix it and submit the code after verification.:D

> English words are spelled incorrectly and an example is not provided
> 
>
> Key: FLINK-19629
> URL: https://issues.apache.org/jira/browse/FLINK-19629
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.11.2
>Reporter: shizhengchao
>Priority: Critical
> Fix For: 1.12.0
>
>
> the docs Connectors/Table & SQL Connectors/Formats/Avro:
>  In addition to the types listed above, Flink supports reading/writing 
> nullable types. Flink maps nullable types to Avro union(something, null), 
> where something is the Avro type converted from Flink type.
> avro have no union type, should be unions:
>  Avro unions(something, null)
> by the way, an example should be provided that reading/writing nullable 
> types, such as this:
> {code:java}
> CREATE TABLE user_behavior (
>   behavior STRING NULL
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'testGroup',
>  'format' = 'avro'
> )
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19629) English words are spelled incorrectly and an example is not provided

2020-10-14 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17213633#comment-17213633
 ] 

shizhengchao commented on FLINK-19629:
--

[~jark] Thank you for reminding me that found this bug code,  if you don’t have 
time to fix it, I can fix it and submit the code after verification.:D

> English words are spelled incorrectly and an example is not provided
> 
>
> Key: FLINK-19629
> URL: https://issues.apache.org/jira/browse/FLINK-19629
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.11.2
>Reporter: shizhengchao
>Priority: Critical
> Fix For: 1.12.0
>
>
> the docs Connectors/Table & SQL Connectors/Formats/Avro:
>  In addition to the types listed above, Flink supports reading/writing 
> nullable types. Flink maps nullable types to Avro union(something, null), 
> where something is the Avro type converted from Flink type.
> avro have no union type, should be unions:
>  Avro unions(something, null)
> by the way, an example should be provided that reading/writing nullable 
> types, such as this:
> {code:java}
> CREATE TABLE user_behavior (
>   behavior STRING NULL
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'testGroup',
>  'format' = 'avro'
> )
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19629) English words are spelled incorrectly and an example is not provided

2020-10-13 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17213569#comment-17213569
 ] 

shizhengchao commented on FLINK-19629:
--

Avro union(something, null)  spelled incorrectly, should be Avro 
unions(something, null), not union. 
and i think, an example nullable types should be provided. So the complete 
documentation should be like this: 

In addition to the types listed above, Flink supports reading/writing nullable 
types, e.g "behavior STRING NULL" . Flink maps nullable types to Avro 
unions(something, null), where something is the Avro type converted from Flink 
type.

> English words are spelled incorrectly and an example is not provided
> 
>
> Key: FLINK-19629
> URL: https://issues.apache.org/jira/browse/FLINK-19629
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.11.2
>Reporter: shizhengchao
>Priority: Critical
> Fix For: 1.12.0
>
>
> the docs Connectors/Table & SQL Connectors/Formats/Avro:
>  In addition to the types listed above, Flink supports reading/writing 
> nullable types. Flink maps nullable types to Avro union(something, null), 
> where something is the Avro type converted from Flink type.
> avro have no union type, should be unions:
>  Avro unions(something, null)
> by the way, an example should be provided that reading/writing nullable 
> types, such as this:
> {code:java}
> CREATE TABLE user_behavior (
>   behavior STRING NULL
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'testGroup',
>  'format' = 'avro'
> )
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19629) English words are spelled incorrectly and an example is not provided

2020-10-13 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17213561#comment-17213561
 ] 

shizhengchao commented on FLINK-19629:
--

[~jark], i can complete this work, could you assign it to me ?

 

> English words are spelled incorrectly and an example is not provided
> 
>
> Key: FLINK-19629
> URL: https://issues.apache.org/jira/browse/FLINK-19629
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.11.2
>Reporter: shizhengchao
>Priority: Critical
> Fix For: 1.12.0
>
>
> the docs Connectors/Table & SQL Connectors/Formats/Avro:
>  In addition to the types listed above, Flink supports reading/writing 
> nullable types. Flink maps nullable types to Avro union(something, null), 
> where something is the Avro type converted from Flink type.
> avro have no union type, should be unions:
>  Avro unions(something, null)
> by the way, an example should be provided that reading/writing nullable 
> types, such as this:
> {code:java}
> CREATE TABLE user_behavior (
>   behavior STRING NULL
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'testGroup',
>  'format' = 'avro'
> )
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19629) English words are spelled incorrectly and an example is not provided

2020-10-13 Thread shizhengchao (Jira)
shizhengchao created FLINK-19629:


 Summary: English words are spelled incorrectly and an example is 
not provided
 Key: FLINK-19629
 URL: https://issues.apache.org/jira/browse/FLINK-19629
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.11.2
Reporter: shizhengchao
 Fix For: 1.12.0


the docs Connectors/Table & SQL Connectors/Formats/Avro:
 In addition to the types listed above, Flink supports reading/writing nullable 
types. Flink maps nullable types to Avro union(something, null), where 
something is the Avro type converted from Flink type.

avro have no union type, should be unions:
 Avro unions(something, null)

by the way, an example should be provided that reading/writing nullable types, 
such as this:
{code:java}
CREATE TABLE user_behavior (
  behavior STRING NULL
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'avro'
)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)