[jira] (FLINK-34651) The HiveTableSink of Flink does not support writing to S3
[ https://issues.apache.org/jira/browse/FLINK-34651 ] shizhengchao deleted comment on FLINK-34651: -- was (Author: tinny): a > The HiveTableSink of Flink does not support writing to S3 > - > > Key: FLINK-34651 > URL: https://issues.apache.org/jira/browse/FLINK-34651 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.13.6, 1.14.6, 1.15.4, 1.16.3, 1.17.2, 1.18.1 >Reporter: shizhengchao >Priority: Blocker > > My Hive table is located on S3. When I try to write to Hive using Flink > Streaming SQL, I find that it does not support writing to S3. Furthermore, > this issue has not been fixed in the latest version. The error I got is as > follows: > {code:java} > //代码占位符 > java.io.IOException: No FileSystem for scheme: s3 > at > org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586) > at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593) > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) > at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632) > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) > at > org.apache.flink.connectors.hive.HadoopFileSystemFactory.create(HadoopFileSystemFactory.java:44) > at > org.apache.flink.table.filesystem.stream.StreamingSink.lambda$compactionWriter$8dbc1825$1(StreamingSink.java:95) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.initializeState(CompactCoordinator.java:102) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.lang.Thread.run(Thread.java:750) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34651) The HiveTableSink of Flink does not support writing to S3
[ https://issues.apache.org/jira/browse/FLINK-34651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828226#comment-17828226 ] shizhengchao commented on FLINK-34651: -- a > The HiveTableSink of Flink does not support writing to S3 > - > > Key: FLINK-34651 > URL: https://issues.apache.org/jira/browse/FLINK-34651 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.13.6, 1.14.6, 1.15.4, 1.16.3, 1.17.2, 1.18.1 >Reporter: shizhengchao >Priority: Blocker > > My Hive table is located on S3. When I try to write to Hive using Flink > Streaming SQL, I find that it does not support writing to S3. Furthermore, > this issue has not been fixed in the latest version. The error I got is as > follows: > {code:java} > //代码占位符 > java.io.IOException: No FileSystem for scheme: s3 > at > org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586) > at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593) > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) > at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632) > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) > at > org.apache.flink.connectors.hive.HadoopFileSystemFactory.create(HadoopFileSystemFactory.java:44) > at > org.apache.flink.table.filesystem.stream.StreamingSink.lambda$compactionWriter$8dbc1825$1(StreamingSink.java:95) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.initializeState(CompactCoordinator.java:102) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.lang.Thread.run(Thread.java:750) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34651) The HiveTableSink of Flink does not support writing to S3
[ https://issues.apache.org/jira/browse/FLINK-34651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828225#comment-17828225 ] shizhengchao commented on FLINK-34651: -- [~easonqin] It won't take effect because the HiveTableSink directly uses the HadoopFileSystemFactory. > The HiveTableSink of Flink does not support writing to S3 > - > > Key: FLINK-34651 > URL: https://issues.apache.org/jira/browse/FLINK-34651 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.13.6, 1.14.6, 1.15.4, 1.16.3, 1.17.2, 1.18.1 >Reporter: shizhengchao >Priority: Blocker > > My Hive table is located on S3. When I try to write to Hive using Flink > Streaming SQL, I find that it does not support writing to S3. Furthermore, > this issue has not been fixed in the latest version. The error I got is as > follows: > {code:java} > //代码占位符 > java.io.IOException: No FileSystem for scheme: s3 > at > org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586) > at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593) > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) > at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632) > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) > at > org.apache.flink.connectors.hive.HadoopFileSystemFactory.create(HadoopFileSystemFactory.java:44) > at > org.apache.flink.table.filesystem.stream.StreamingSink.lambda$compactionWriter$8dbc1825$1(StreamingSink.java:95) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.initializeState(CompactCoordinator.java:102) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.lang.Thread.run(Thread.java:750) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34651) The HiveTableSink of Flink does not support writing to S3
[ https://issues.apache.org/jira/browse/FLINK-34651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shizhengchao updated FLINK-34651: - Priority: Blocker (was: Major) > The HiveTableSink of Flink does not support writing to S3 > - > > Key: FLINK-34651 > URL: https://issues.apache.org/jira/browse/FLINK-34651 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.13.6, 1.14.6, 1.15.4, 1.16.3, 1.17.2, 1.18.1 >Reporter: shizhengchao >Priority: Blocker > > My Hive table is located on S3. When I try to write to Hive using Flink > Streaming SQL, I find that it does not support writing to S3. Furthermore, > this issue has not been fixed in the latest version. The error I got is as > follows: > {code:java} > //代码占位符 > java.io.IOException: No FileSystem for scheme: s3 > at > org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586) > at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593) > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) > at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632) > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) > at > org.apache.flink.connectors.hive.HadoopFileSystemFactory.create(HadoopFileSystemFactory.java:44) > at > org.apache.flink.table.filesystem.stream.StreamingSink.lambda$compactionWriter$8dbc1825$1(StreamingSink.java:95) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.initializeState(CompactCoordinator.java:102) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.lang.Thread.run(Thread.java:750) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34651) The HiveTableSink of Flink does not support writing to S3
[ https://issues.apache.org/jira/browse/FLINK-34651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shizhengchao updated FLINK-34651: - Affects Version/s: 1.18.1 1.17.2 1.15.4 1.14.6 1.13.6 > The HiveTableSink of Flink does not support writing to S3 > - > > Key: FLINK-34651 > URL: https://issues.apache.org/jira/browse/FLINK-34651 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.13.6, 1.14.6, 1.15.4, 1.16.3, 1.17.2, 1.18.1 >Reporter: shizhengchao >Priority: Major > > My Hive table is located on S3. When I try to write to Hive using Flink > Streaming SQL, I find that it does not support writing to S3. Furthermore, > this issue has not been fixed in the latest version. The error I got is as > follows: > {code:java} > //代码占位符 > java.io.IOException: No FileSystem for scheme: s3 > at > org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586) > at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593) > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) > at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632) > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) > at > org.apache.flink.connectors.hive.HadoopFileSystemFactory.create(HadoopFileSystemFactory.java:44) > at > org.apache.flink.table.filesystem.stream.StreamingSink.lambda$compactionWriter$8dbc1825$1(StreamingSink.java:95) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.initializeState(CompactCoordinator.java:102) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.lang.Thread.run(Thread.java:750) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34651) The HiveTableSink of Flink does not support writing to S3
[ https://issues.apache.org/jira/browse/FLINK-34651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shizhengchao updated FLINK-34651: - Affects Version/s: 1.16.3 (was: 1.13.6) > The HiveTableSink of Flink does not support writing to S3 > - > > Key: FLINK-34651 > URL: https://issues.apache.org/jira/browse/FLINK-34651 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.16.3 >Reporter: shizhengchao >Priority: Major > > My Hive table is located on S3. When I try to write to Hive using Flink > Streaming SQL, I find that it does not support writing to S3. Furthermore, > this issue has not been fixed in the latest version. The error I got is as > follows: > {code:java} > //代码占位符 > java.io.IOException: No FileSystem for scheme: s3 > at > org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586) > at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593) > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) > at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632) > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) > at > org.apache.flink.connectors.hive.HadoopFileSystemFactory.create(HadoopFileSystemFactory.java:44) > at > org.apache.flink.table.filesystem.stream.StreamingSink.lambda$compactionWriter$8dbc1825$1(StreamingSink.java:95) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.initializeState(CompactCoordinator.java:102) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.lang.Thread.run(Thread.java:750) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34651) The HiveTableSink of Flink does not support writing to S3
shizhengchao created FLINK-34651: Summary: The HiveTableSink of Flink does not support writing to S3 Key: FLINK-34651 URL: https://issues.apache.org/jira/browse/FLINK-34651 Project: Flink Issue Type: Bug Components: Connectors / Hive Affects Versions: 1.13.6 Reporter: shizhengchao My Hive table is located on S3. When I try to write to Hive using Flink Streaming SQL, I find that it does not support writing to S3. Furthermore, this issue has not been fixed in the latest version. The error I got is as follows: {code:java} //代码占位符 java.io.IOException: No FileSystem for scheme: s3 at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.flink.connectors.hive.HadoopFileSystemFactory.create(HadoopFileSystemFactory.java:44) at org.apache.flink.table.filesystem.stream.StreamingSink.lambda$compactionWriter$8dbc1825$1(StreamingSink.java:95) at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.initializeState(CompactCoordinator.java:102) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:750) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-26541) SQL Client should support submitting SQL jobs in application mode
[ https://issues.apache.org/jira/browse/FLINK-26541 ] shizhengchao deleted comment on FLINK-26541: -- was (Author: tinny): Why do not use session mode?If application mode supports sql, then I think there is no different whith session mode. > SQL Client should support submitting SQL jobs in application mode > - > > Key: FLINK-26541 > URL: https://issues.apache.org/jira/browse/FLINK-26541 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes, Deployment / YARN, Table SQL / > Client >Reporter: Jark Wu >Priority: Major > > Currently, the SQL Client only supports submitting jobs in session mode and > per-job mode. As the community going to drop the per-job mode (FLINK-26000), > SQL Client should support application mode as well. Otherwise, SQL Client can > only submit SQL in session mode then, but streaming jobs should be submitted > in per-job or application mode to have bettter resource isolation. > Disucssions: https://lists.apache.org/thread/2yq351nb721x23rz1q8qlyf2tqrk147r -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-26541) SQL Client should support submitting SQL jobs in application mode
[ https://issues.apache.org/jira/browse/FLINK-26541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17747428#comment-17747428 ] shizhengchao commented on FLINK-26541: -- Why do not use session mode?If application mode supports sql, then I think there is no different whith session mode. > SQL Client should support submitting SQL jobs in application mode > - > > Key: FLINK-26541 > URL: https://issues.apache.org/jira/browse/FLINK-26541 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes, Deployment / YARN, Table SQL / > Client >Reporter: Jark Wu >Priority: Major > > Currently, the SQL Client only supports submitting jobs in session mode and > per-job mode. As the community going to drop the per-job mode (FLINK-26000), > SQL Client should support application mode as well. Otherwise, SQL Client can > only submit SQL in session mode then, but streaming jobs should be submitted > in per-job or application mode to have bettter resource isolation. > Disucssions: https://lists.apache.org/thread/2yq351nb721x23rz1q8qlyf2tqrk147r -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29772) Kafka table source scan blocked
[ https://issues.apache.org/jira/browse/FLINK-29772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624836#comment-17624836 ] shizhengchao commented on FLINK-29772: -- my case is kafka interval join,deadlock occurs when using rocksdb > Kafka table source scan blocked > --- > > Key: FLINK-29772 > URL: https://issues.apache.org/jira/browse/FLINK-29772 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.2 >Reporter: shizhengchao >Priority: Major > > {code:java} > // > "Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, > watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, > cluster_name, server_time, server_time_s, client_time, client_time_s, imei, > request_id, owner_id, service_id, content_id, sign_id, receiver_type, > msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, > array_info_imei, phone, channel_id, process_time, code, msg, receiver, > content_type, android_version, apk_version]) -> Calc(select=[data_type, > server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS > proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> > Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', > _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), > (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER > SET "UTF-16LE") AND server_time IS NOT NULL), server_time, > CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, > 10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), > (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER > SET "UTF-16LE") AND client_time IS NOT NULL), client_time, > CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, > 10) AS client_time, IF(((data_type = > _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND > (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS > send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], > where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) > CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER > SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=77 > BLOCKED on java.lang.Object@4aa3fe44 owned by "Legacy Source Thread - Source: > TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, > watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, > cluster_name, server_time, server_time_s, client_time, client_time_s, imei, > request_id, owner_id, service_id, content_id, sign_id, receiver_type, > msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, > array_info_imei, phone, channel_id, process_time, code, msg, receiver, > content_type, android_version, apk_version]) -> Calc(select=[data_type, > server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS > proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> > Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', > _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), > (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER > SET "UTF-16LE") AND server_time IS NOT NULL), server_time, > CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, > 10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), > (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER > SET "UTF-16LE") AND client_time IS NOT NULL), client_time, > CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, > 10) AS client_time, IF(((data_type = > _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND > (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS > send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], > where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) > CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER > SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=87 > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > -blocked on java.lang.Object@4aa3fe44 > at
[jira] (FLINK-29772) Kafka table source scan blocked
[ https://issues.apache.org/jira/browse/FLINK-29772 ] shizhengchao deleted comment on FLINK-29772: -- was (Author: tinny): kafka interval join, deadlock occurs when using rocksdb > Kafka table source scan blocked > --- > > Key: FLINK-29772 > URL: https://issues.apache.org/jira/browse/FLINK-29772 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.2 >Reporter: shizhengchao >Priority: Major > > {code:java} > // > "Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, > watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, > cluster_name, server_time, server_time_s, client_time, client_time_s, imei, > request_id, owner_id, service_id, content_id, sign_id, receiver_type, > msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, > array_info_imei, phone, channel_id, process_time, code, msg, receiver, > content_type, android_version, apk_version]) -> Calc(select=[data_type, > server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS > proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> > Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', > _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), > (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER > SET "UTF-16LE") AND server_time IS NOT NULL), server_time, > CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, > 10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), > (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER > SET "UTF-16LE") AND client_time IS NOT NULL), client_time, > CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, > 10) AS client_time, IF(((data_type = > _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND > (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS > send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], > where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) > CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER > SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=77 > BLOCKED on java.lang.Object@4aa3fe44 owned by "Legacy Source Thread - Source: > TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, > watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, > cluster_name, server_time, server_time_s, client_time, client_time_s, imei, > request_id, owner_id, service_id, content_id, sign_id, receiver_type, > msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, > array_info_imei, phone, channel_id, process_time, code, msg, receiver, > content_type, android_version, apk_version]) -> Calc(select=[data_type, > server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS > proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> > Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', > _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), > (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER > SET "UTF-16LE") AND server_time IS NOT NULL), server_time, > CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, > 10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), > (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER > SET "UTF-16LE") AND client_time IS NOT NULL), client_time, > CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, > 10) AS client_time, IF(((data_type = > _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND > (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS > send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], > where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) > CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER > SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=87 > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > -blocked on java.lang.Object@4aa3fe44 > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) >
[jira] [Commented] (FLINK-29772) Kafka table source scan blocked
[ https://issues.apache.org/jira/browse/FLINK-29772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624834#comment-17624834 ] shizhengchao commented on FLINK-29772: -- kafka interval join, deadlock occurs when using rocksdb > Kafka table source scan blocked > --- > > Key: FLINK-29772 > URL: https://issues.apache.org/jira/browse/FLINK-29772 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.2 >Reporter: shizhengchao >Priority: Major > > {code:java} > // > "Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, > watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, > cluster_name, server_time, server_time_s, client_time, client_time_s, imei, > request_id, owner_id, service_id, content_id, sign_id, receiver_type, > msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, > array_info_imei, phone, channel_id, process_time, code, msg, receiver, > content_type, android_version, apk_version]) -> Calc(select=[data_type, > server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS > proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> > Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', > _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), > (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER > SET "UTF-16LE") AND server_time IS NOT NULL), server_time, > CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, > 10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), > (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER > SET "UTF-16LE") AND client_time IS NOT NULL), client_time, > CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, > 10) AS client_time, IF(((data_type = > _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND > (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS > send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], > where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) > CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER > SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=77 > BLOCKED on java.lang.Object@4aa3fe44 owned by "Legacy Source Thread - Source: > TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, > watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, > cluster_name, server_time, server_time_s, client_time, client_time_s, imei, > request_id, owner_id, service_id, content_id, sign_id, receiver_type, > msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, > array_info_imei, phone, channel_id, process_time, code, msg, receiver, > content_type, android_version, apk_version]) -> Calc(select=[data_type, > server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS > proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> > Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', > _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), > (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER > SET "UTF-16LE") AND server_time IS NOT NULL), server_time, > CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, > 10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), > (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER > SET "UTF-16LE") AND client_time IS NOT NULL), client_time, > CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, > 10) AS client_time, IF(((data_type = > _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND > (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS > send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], > where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) > CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER > SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=87 > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > -blocked on java.lang.Object@4aa3fe44 > at
[jira] [Updated] (FLINK-29772) Kafka table source scan blocked
[ https://issues.apache.org/jira/browse/FLINK-29772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shizhengchao updated FLINK-29772: - Description: {code:java} // "Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, cluster_name, server_time, server_time_s, client_time, client_time_s, imei, request_id, owner_id, service_id, content_id, sign_id, receiver_type, msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, array_info_imei, phone, channel_id, process_time, code, msg, receiver, content_type, android_version, apk_version]) -> Calc(select=[data_type, server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER SET "UTF-16LE") AND server_time IS NOT NULL), server_time, CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER SET "UTF-16LE") AND client_time IS NOT NULL), client_time, CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 10) AS client_time, IF(((data_type = _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=77 BLOCKED on java.lang.Object@4aa3fe44 owned by "Legacy Source Thread - Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, cluster_name, server_time, server_time_s, client_time, client_time_s, imei, request_id, owner_id, service_id, content_id, sign_id, receiver_type, msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, array_info_imei, phone, channel_id, process_time, code, msg, receiver, content_type, android_version, apk_version]) -> Calc(select=[data_type, server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER SET "UTF-16LE") AND server_time IS NOT NULL), server_time, CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER SET "UTF-16LE") AND client_time IS NOT NULL), client_time, CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 10) AS client_time, IF(((data_type = _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=87 at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) -blocked on java.lang.Object@4aa3fe44 at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) at
[jira] [Updated] (FLINK-29772) Kafka table source scan blocked
[ https://issues.apache.org/jira/browse/FLINK-29772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shizhengchao updated FLINK-29772: - Description: {code:java} // "Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, cluster_name, server_time, server_time_s, client_time, client_time_s, imei, request_id, owner_id, service_id, content_id, sign_id, receiver_type, msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, array_info_imei, phone, channel_id, process_time, code, msg, receiver, content_type, android_version, apk_version]) -> Calc(select=[data_type, server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER SET "UTF-16LE") AND server_time IS NOT NULL), server_time, CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER SET "UTF-16LE") AND client_time IS NOT NULL), client_time, CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 10) AS client_time, IF(((data_type = _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=77 {color:red}BLOCKED on java.lang.Object@4aa3fe44 owned by{color} "Legacy Source Thread - Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, cluster_name, server_time, server_time_s, client_time, client_time_s, imei, request_id, owner_id, service_id, content_id, sign_id, receiver_type, msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, array_info_imei, phone, channel_id, process_time, code, msg, receiver, content_type, android_version, apk_version]) -> Calc(select=[data_type, server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER SET "UTF-16LE") AND server_time IS NOT NULL), server_time, CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER SET "UTF-16LE") AND client_time IS NOT NULL), client_time, CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 10) AS client_time, IF(((data_type = _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=87 at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) -{color:red} blocked on java.lang.Object@4aa3fe44{color} at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) at
[jira] [Created] (FLINK-29772) Kafka table source scan blocked
shizhengchao created FLINK-29772: Summary: Kafka table source scan blocked Key: FLINK-29772 URL: https://issues.apache.org/jira/browse/FLINK-29772 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.13.2 Reporter: shizhengchao {code:java} // "Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, cluster_name, server_time, server_time_s, client_time, client_time_s, imei, request_id, owner_id, service_id, content_id, sign_id, receiver_type, msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, array_info_imei, phone, channel_id, process_time, code, msg, receiver, content_type, android_version, apk_version]) -> Calc(select=[data_type, server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER SET "UTF-16LE") AND server_time IS NOT NULL), server_time, CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER SET "UTF-16LE") AND client_time IS NOT NULL), client_time, CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 10) AS client_time, IF(((data_type = _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=77 BLOCKED on java.lang.Object@4aa3fe44 owned by "Legacy Source Thread - Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, watermark=[-(toTimeStamps($2), 1:INTERVAL SECOND)]]], fields=[data_type, cluster_name, server_time, server_time_s, client_time, client_time_s, imei, request_id, owner_id, service_id, content_id, sign_id, receiver_type, msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, array_info_imei, phone, channel_id, process_time, code, msg, receiver, content_type, android_version, apk_version]) -> Calc(select=[data_type, server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER SET "UTF-16LE") AND server_time IS NOT NULL), server_time, CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 10) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER SET "UTF-16LE") AND client_time IS NOT NULL), client_time, CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 10) AS client_time, IF(((data_type = _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=87 at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) - blocked on java.lang.Object@4aa3fe44 at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330) at
[jira] [Comment Edited] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect
[ https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17514050#comment-17514050 ] shizhengchao edited comment on FLINK-26033 at 3/30/22, 3:06 AM: [~renqs] [ |https://github.com/PatrickRen] I combined multiple commits into one,and created another 2 PRs to cherry-pick the commit onto release-1.13 and release-1.14 was (Author: tinny): master: 2049f849a130e738759001c5a6b9d85834da08d0 1.13: 3e77dc114261cc91cc80159c95fa27e74b25b8ed 1.14: 6ae8ee89898f3fbaca435a599595d065e71cbab8 > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect > -- > > Key: FLINK-26033 > URL: https://issues.apache.org/jira/browse/FLINK-26033 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3 >Reporter: shizhengchao >Assignee: shizhengchao >Priority: Major > Labels: pull-request-available > > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect. Flink treats 'default' and 'round-robin' as the same > strategy. > {code:java} > //代码占位符 > public static Optional> > getFlinkKafkaPartitioner( > ReadableConfig tableOptions, ClassLoader classLoader) { > return tableOptions > .getOptional(SINK_PARTITIONER) > .flatMap( > (String partitioner) -> { > switch (partitioner) { > case SINK_PARTITIONER_VALUE_FIXED: > return Optional.of(new > FlinkFixedPartitioner<>()); > case SINK_PARTITIONER_VALUE_DEFAULT: > case SINK_PARTITIONER_VALUE_ROUND_ROBIN: > return Optional.empty(); > // Default fallback to full class name of the > partitioner. > default: > return Optional.of( > initializePartitioner(partitioner, > classLoader)); > } > }); > } {code} > They both use kafka's default partitioner, but the actual There are two > scenarios for the partition on DefaultPartitioner: > 1. Random when there is no key > 2. When there is a key, take the modulo according to the key > {code:java} > // org.apache.kafka.clients.producer.internals.DefaultPartitioner > public int partition(String topic, Object key, byte[] keyBytes, Object value, > byte[] valueBytes, Cluster cluster) { > if (keyBytes == null) { > // Random when there is no key > return stickyPartitionCache.partition(topic, cluster); > } > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > // hash the keyBytes to choose a partition > return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; > } {code} > Therefore, KafkaConnector does not have a round-robin strategy.But we can > borrow from kafka's RoundRobinPartitioner > {code:java} > //代码占位符 > public class RoundRobinPartitioner implements Partitioner { > private final ConcurrentMap topicCounterMap = new > ConcurrentHashMap<>(); > public void configure(Map configs) {} > /** > * Compute the partition for the given record. > * > * @param topic The topic name > * @param key The key to partition on (or null if no key) > * @param keyBytes serialized key to partition on (or null if no key) > * @param value The value to partition on or null > * @param valueBytes serialized value to partition on or null > * @param cluster The current cluster metadata > */ > @Override > public int partition(String topic, Object key, byte[] keyBytes, Object > value, byte[] valueBytes, Cluster cluster) { > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > int nextValue = nextValue(topic); > List availablePartitions = > cluster.availablePartitionsForTopic(topic); > if (!availablePartitions.isEmpty()) { > int part = Utils.toPositive(nextValue) % > availablePartitions.size(); > return availablePartitions.get(part).partition(); > } else { > // no partitions are available, give a non-available partition > return Utils.toPositive(nextValue) % numPartitions; > } > } > private int nextValue(String topic) { > AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> { > return new AtomicInteger(0); > }); > return
[jira] [Comment Edited] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect
[ https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17514050#comment-17514050 ] shizhengchao edited comment on FLINK-26033 at 3/30/22, 2:24 AM: master: 2049f849a130e738759001c5a6b9d85834da08d0 1.13: 3e77dc114261cc91cc80159c95fa27e74b25b8ed 1.14: 6ae8ee89898f3fbaca435a599595d065e71cbab8 was (Author: tinny): master: 2049f849a130e738759001c5a6b9d85834da08d0 1.13: 8b18e74cec71c9c480bb747fb2f77a55f675ca2a 1.14: d4f374237b21aab93f5d185cb2d1e15a29c9c537 > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect > -- > > Key: FLINK-26033 > URL: https://issues.apache.org/jira/browse/FLINK-26033 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3 >Reporter: shizhengchao >Assignee: shizhengchao >Priority: Major > Labels: pull-request-available > > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect. Flink treats 'default' and 'round-robin' as the same > strategy. > {code:java} > //代码占位符 > public static Optional> > getFlinkKafkaPartitioner( > ReadableConfig tableOptions, ClassLoader classLoader) { > return tableOptions > .getOptional(SINK_PARTITIONER) > .flatMap( > (String partitioner) -> { > switch (partitioner) { > case SINK_PARTITIONER_VALUE_FIXED: > return Optional.of(new > FlinkFixedPartitioner<>()); > case SINK_PARTITIONER_VALUE_DEFAULT: > case SINK_PARTITIONER_VALUE_ROUND_ROBIN: > return Optional.empty(); > // Default fallback to full class name of the > partitioner. > default: > return Optional.of( > initializePartitioner(partitioner, > classLoader)); > } > }); > } {code} > They both use kafka's default partitioner, but the actual There are two > scenarios for the partition on DefaultPartitioner: > 1. Random when there is no key > 2. When there is a key, take the modulo according to the key > {code:java} > // org.apache.kafka.clients.producer.internals.DefaultPartitioner > public int partition(String topic, Object key, byte[] keyBytes, Object value, > byte[] valueBytes, Cluster cluster) { > if (keyBytes == null) { > // Random when there is no key > return stickyPartitionCache.partition(topic, cluster); > } > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > // hash the keyBytes to choose a partition > return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; > } {code} > Therefore, KafkaConnector does not have a round-robin strategy.But we can > borrow from kafka's RoundRobinPartitioner > {code:java} > //代码占位符 > public class RoundRobinPartitioner implements Partitioner { > private final ConcurrentMap topicCounterMap = new > ConcurrentHashMap<>(); > public void configure(Map configs) {} > /** > * Compute the partition for the given record. > * > * @param topic The topic name > * @param key The key to partition on (or null if no key) > * @param keyBytes serialized key to partition on (or null if no key) > * @param value The value to partition on or null > * @param valueBytes serialized value to partition on or null > * @param cluster The current cluster metadata > */ > @Override > public int partition(String topic, Object key, byte[] keyBytes, Object > value, byte[] valueBytes, Cluster cluster) { > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > int nextValue = nextValue(topic); > List availablePartitions = > cluster.availablePartitionsForTopic(topic); > if (!availablePartitions.isEmpty()) { > int part = Utils.toPositive(nextValue) % > availablePartitions.size(); > return availablePartitions.get(part).partition(); > } else { > // no partitions are available, give a non-available partition > return Utils.toPositive(nextValue) % numPartitions; > } > } > private int nextValue(String topic) { > AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> { > return new AtomicInteger(0); > }); > return counter.getAndIncrement(); >
[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect
[ https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17514050#comment-17514050 ] shizhengchao commented on FLINK-26033: -- master: 2049f849a130e738759001c5a6b9d85834da08d0 1.13: 8b18e74cec71c9c480bb747fb2f77a55f675ca2a 1.14: d4f374237b21aab93f5d185cb2d1e15a29c9c537 > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect > -- > > Key: FLINK-26033 > URL: https://issues.apache.org/jira/browse/FLINK-26033 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3 >Reporter: shizhengchao >Assignee: shizhengchao >Priority: Major > Labels: pull-request-available > > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect. Flink treats 'default' and 'round-robin' as the same > strategy. > {code:java} > //代码占位符 > public static Optional> > getFlinkKafkaPartitioner( > ReadableConfig tableOptions, ClassLoader classLoader) { > return tableOptions > .getOptional(SINK_PARTITIONER) > .flatMap( > (String partitioner) -> { > switch (partitioner) { > case SINK_PARTITIONER_VALUE_FIXED: > return Optional.of(new > FlinkFixedPartitioner<>()); > case SINK_PARTITIONER_VALUE_DEFAULT: > case SINK_PARTITIONER_VALUE_ROUND_ROBIN: > return Optional.empty(); > // Default fallback to full class name of the > partitioner. > default: > return Optional.of( > initializePartitioner(partitioner, > classLoader)); > } > }); > } {code} > They both use kafka's default partitioner, but the actual There are two > scenarios for the partition on DefaultPartitioner: > 1. Random when there is no key > 2. When there is a key, take the modulo according to the key > {code:java} > // org.apache.kafka.clients.producer.internals.DefaultPartitioner > public int partition(String topic, Object key, byte[] keyBytes, Object value, > byte[] valueBytes, Cluster cluster) { > if (keyBytes == null) { > // Random when there is no key > return stickyPartitionCache.partition(topic, cluster); > } > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > // hash the keyBytes to choose a partition > return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; > } {code} > Therefore, KafkaConnector does not have a round-robin strategy.But we can > borrow from kafka's RoundRobinPartitioner > {code:java} > //代码占位符 > public class RoundRobinPartitioner implements Partitioner { > private final ConcurrentMap topicCounterMap = new > ConcurrentHashMap<>(); > public void configure(Map configs) {} > /** > * Compute the partition for the given record. > * > * @param topic The topic name > * @param key The key to partition on (or null if no key) > * @param keyBytes serialized key to partition on (or null if no key) > * @param value The value to partition on or null > * @param valueBytes serialized value to partition on or null > * @param cluster The current cluster metadata > */ > @Override > public int partition(String topic, Object key, byte[] keyBytes, Object > value, byte[] valueBytes, Cluster cluster) { > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > int nextValue = nextValue(topic); > List availablePartitions = > cluster.availablePartitionsForTopic(topic); > if (!availablePartitions.isEmpty()) { > int part = Utils.toPositive(nextValue) % > availablePartitions.size(); > return availablePartitions.get(part).partition(); > } else { > // no partitions are available, give a non-available partition > return Utils.toPositive(nextValue) % numPartitions; > } > } > private int nextValue(String topic) { > AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> { > return new AtomicInteger(0); > }); > return counter.getAndIncrement(); > } > public void close() {} > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26527) ClassCastException in TemporaryClassLoaderContext
[ https://issues.apache.org/jira/browse/FLINK-26527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17502790#comment-17502790 ] shizhengchao commented on FLINK-26527: -- [~martijnvisser] Thanks, I've got it, it's not a problem. The userclassloader should not include flink related dependencies. So I will separate userclassloader and application classloader(parent) to solve this problem > ClassCastException in TemporaryClassLoaderContext > - > > Key: FLINK-26527 > URL: https://issues.apache.org/jira/browse/FLINK-26527 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.13.5, 1.14.3 >Reporter: shizhengchao >Priority: Major > > When I try to run sql using flink's classloader, I get the following > exception: > {code:java} > Exception in thread "main" java.lang.ClassCastException: > org.codehaus.janino.CompilerFactory cannot be cast to > org.codehaus.commons.compiler.ICompilerFactory > at > org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129) > > ……{code} > my code is like this: > {code:java} > Configuration configuration = new Configuration(); > configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "child-first"); > List dependencies = > FlinkClassLoader.getFlinkDependencies(FLINK_HOME/lib); > URLClassLoader classLoader = ClientUtils.buildUserCodeClassLoader( > dependencies, > Collections.emptyList(), > SessionContext.class.getClassLoader(), > configuration); > try (TemporaryClassLoaderContext ignored = > TemporaryClassLoaderContext.of(classLoader)) { >tableEnv.explainSql(sql); > > //CompilerFactoryFactory.getCompilerFactory("org.codehaus.janino.CompilerFactory"); > } {code} > But, if you change `classloader.resolve-order` to `parent-first`, everything > works fine -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-26527) ClassCastException in TemporaryClassLoaderContext
[ https://issues.apache.org/jira/browse/FLINK-26527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shizhengchao updated FLINK-26527: - Description: When I try to run sql using flink's classloader, I get the following exception: {code:java} Exception in thread "main" java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129) ……{code} my code is like this: {code:java} Configuration configuration = new Configuration(); configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "child-first"); List dependencies = FlinkClassLoader.getFlinkDependencies(FLINK_HOME/lib); URLClassLoader classLoader = ClientUtils.buildUserCodeClassLoader( dependencies, Collections.emptyList(), SessionContext.class.getClassLoader(), configuration); try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) { tableEnv.explainSql(sql); //CompilerFactoryFactory.getCompilerFactory("org.codehaus.janino.CompilerFactory"); } {code} But, if you change `classloader.resolve-order` to `parent-first`, everything works fine was: When I try to run sql using flink's classloader, I get the following exception: {code:java} Exception in thread "main" java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129) ……{code} my code is like this: {code:java} Configuration configuration = new Configuration(); configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "child-first"); List dependencies = FlinkClassLoader.getFlinkDependencies( {code} {color:#91}${FLINK_HOME}/lib{color} {code:java} ); URLClassLoader classLoader = ClientUtils.buildUserCodeClassLoader( dependencies, Collections.emptyList(), SessionContext.class.getClassLoader(), configuration); try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) { tableEnv.explainSql(sql); //CompilerFactoryFactory.getCompilerFactory("org.codehaus.janino.CompilerFactory"); } {code} But, if you change `classloader.resolve-order` to `parent-first`, everything works fine > ClassCastException in TemporaryClassLoaderContext > - > > Key: FLINK-26527 > URL: https://issues.apache.org/jira/browse/FLINK-26527 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.13.5, 1.14.3 >Reporter: shizhengchao >Priority: Major > > When I try to run sql using flink's classloader, I get the following > exception: > {code:java} > Exception in thread "main" java.lang.ClassCastException: > org.codehaus.janino.CompilerFactory cannot be cast to > org.codehaus.commons.compiler.ICompilerFactory > at > org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129) > > ……{code} > my code is like this: > {code:java} > Configuration configuration = new Configuration(); > configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "child-first"); > List dependencies = > FlinkClassLoader.getFlinkDependencies(FLINK_HOME/lib); > URLClassLoader classLoader = ClientUtils.buildUserCodeClassLoader( > dependencies, > Collections.emptyList(), > SessionContext.class.getClassLoader(), > configuration); > try (TemporaryClassLoaderContext ignored = > TemporaryClassLoaderContext.of(classLoader)) { >tableEnv.explainSql(sql); > > //CompilerFactoryFactory.getCompilerFactory("org.codehaus.janino.CompilerFactory"); > } {code} > But, if you change `classloader.resolve-order` to `parent-first`, everything > works fine -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-26527) ClassCastException in TemporaryClassLoaderContext
[ https://issues.apache.org/jira/browse/FLINK-26527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shizhengchao updated FLINK-26527: - Description: When I try to run sql using flink's classloader, I get the following exception: {code:java} Exception in thread "main" java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129) ……{code} my code is like this: {code:java} Configuration configuration = new Configuration(); configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "child-first"); List dependencies = FlinkClassLoader.getFlinkDependencies( {code} {color:#91}${FLINK_HOME}/lib{color} {code:java} ); URLClassLoader classLoader = ClientUtils.buildUserCodeClassLoader( dependencies, Collections.emptyList(), SessionContext.class.getClassLoader(), configuration); try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) { tableEnv.explainSql(sql); //CompilerFactoryFactory.getCompilerFactory("org.codehaus.janino.CompilerFactory"); } {code} But, if you change `classloader.resolve-order` to `parent-first`, everything works fine was: When I try to run sql using flink's classloader, I get the following exception: {code:java} Exception in thread "main" java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129) ……{code} my code is like this: {code:java} Configuration configuration = new Configuration(); configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "child-first"); List dependencies = FlinkClassLoader.getFlinkDependencies(System.getenv(ConfigConstants.ENV_FLINK_HOME_DIR)); URLClassLoader classLoader = ClientUtils.buildUserCodeClassLoader( dependencies, Collections.emptyList(), SessionContext.class.getClassLoader(), configuration); try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) { tableEnv.explainSql(sql); //CompilerFactoryFactory.getCompilerFactory("org.codehaus.janino.CompilerFactory"); } {code} But, if you change `classloader.resolve-order` to `parent-first`, everything works fine > ClassCastException in TemporaryClassLoaderContext > - > > Key: FLINK-26527 > URL: https://issues.apache.org/jira/browse/FLINK-26527 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.13.5, 1.14.3 >Reporter: shizhengchao >Priority: Major > > When I try to run sql using flink's classloader, I get the following > exception: > {code:java} > Exception in thread "main" java.lang.ClassCastException: > org.codehaus.janino.CompilerFactory cannot be cast to > org.codehaus.commons.compiler.ICompilerFactory > at > org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129) > > ……{code} > my code is like this: > {code:java} > Configuration configuration = new Configuration(); > configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "child-first"); > List dependencies = FlinkClassLoader.getFlinkDependencies( > {code} > {color:#91}${FLINK_HOME}/lib{color} > {code:java} > ); > URLClassLoader classLoader = ClientUtils.buildUserCodeClassLoader( > dependencies, > Collections.emptyList(), > SessionContext.class.getClassLoader(), > configuration); > try (TemporaryClassLoaderContext ignored = > TemporaryClassLoaderContext.of(classLoader)) { >tableEnv.explainSql(sql); > > //CompilerFactoryFactory.getCompilerFactory("org.codehaus.janino.CompilerFactory"); > } {code} > But, if you change `classloader.resolve-order` to `parent-first`, everything > works fine -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-26527) ClassCastException in TemporaryClassLoaderContext
[ https://issues.apache.org/jira/browse/FLINK-26527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shizhengchao updated FLINK-26527: - Description: When I try to run sql using flink's classloader, I get the following exception: {code:java} Exception in thread "main" java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129) ……{code} my code is like this: {code:java} Configuration configuration = new Configuration(); configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "child-first"); List dependencies = FlinkClassLoader.getFlinkDependencies(System.getenv(ConfigConstants.ENV_FLINK_HOME_DIR)); URLClassLoader classLoader = ClientUtils.buildUserCodeClassLoader( dependencies, Collections.emptyList(), SessionContext.class.getClassLoader(), configuration); try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) { tableEnv.explainSql(sql); //CompilerFactoryFactory.getCompilerFactory("org.codehaus.janino.CompilerFactory"); } {code} But, if you change `classloader.resolve-order` to `parent-first`, everything works fine was: When I try to run sql using flink's classloader, I get the following exception: {code:java} Exception in thread "main" java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129) ……{code} my code is like this: {code:java} Configuration configuration = new Configuration(); configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "child-first"); List dependencies = FlinkClassLoader.getFlinkDependencies(System.getenv(ConfigConstants.ENV_FLINK_HOME_DIR)); URLClassLoader classLoader = ClientUtils.buildUserCodeClassLoader( dependencies, Collections.emptyList(), SessionContext.class.getClassLoader(), configuration); try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) { tableEnv.explainSql(sql); //CompilerFactoryFactory.getCompilerFactory("org.codehaus.janino.CompilerFactory"); } {code} But, if you change `classloader.resolve-order` to `parent-first`, everything works fine > ClassCastException in TemporaryClassLoaderContext > - > > Key: FLINK-26527 > URL: https://issues.apache.org/jira/browse/FLINK-26527 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.13.5, 1.14.3 >Reporter: shizhengchao >Priority: Major > > When I try to run sql using flink's classloader, I get the following > exception: > {code:java} > Exception in thread "main" java.lang.ClassCastException: > org.codehaus.janino.CompilerFactory cannot be cast to > org.codehaus.commons.compiler.ICompilerFactory > at > org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129) > > ……{code} > my code is like this: > {code:java} > Configuration configuration = new Configuration(); > configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "child-first"); > List dependencies = > FlinkClassLoader.getFlinkDependencies(System.getenv(ConfigConstants.ENV_FLINK_HOME_DIR)); > URLClassLoader classLoader = ClientUtils.buildUserCodeClassLoader( > dependencies, > Collections.emptyList(), > SessionContext.class.getClassLoader(), > configuration); > try (TemporaryClassLoaderContext ignored = > TemporaryClassLoaderContext.of(classLoader)) { >tableEnv.explainSql(sql); > > //CompilerFactoryFactory.getCompilerFactory("org.codehaus.janino.CompilerFactory"); > } {code} > But, if you change `classloader.resolve-order` to `parent-first`, everything > works fine -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26527) ClassCastException in TemporaryClassLoaderContext
shizhengchao created FLINK-26527: Summary: ClassCastException in TemporaryClassLoaderContext Key: FLINK-26527 URL: https://issues.apache.org/jira/browse/FLINK-26527 Project: Flink Issue Type: Bug Components: Client / Job Submission Affects Versions: 1.14.3, 1.13.5 Reporter: shizhengchao When I try to run sql using flink's classloader, I get the following exception: {code:java} Exception in thread "main" java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129) ……{code} my code is like this: {code:java} Configuration configuration = new Configuration(); configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "child-first"); List dependencies = FlinkClassLoader.getFlinkDependencies(System.getenv(ConfigConstants.ENV_FLINK_HOME_DIR)); URLClassLoader classLoader = ClientUtils.buildUserCodeClassLoader( dependencies, Collections.emptyList(), SessionContext.class.getClassLoader(), configuration); try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) { tableEnv.explainSql(sql); //CompilerFactoryFactory.getCompilerFactory("org.codehaus.janino.CompilerFactory"); } {code} But, if you change `classloader.resolve-order` to `parent-first`, everything works fine -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect
[ https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500479#comment-17500479 ] shizhengchao edited comment on FLINK-26033 at 3/3/22, 3:36 AM: --- [~MartijnVisser] [~renqs] I got through testing that the RoundRobinPartitioner built into kafka does not work either, that is it can't distribute the writes to all partitions equally, due to abortForNewBatch. For example, there are 10 partitions, `org.apache.kafka.clients.producer.RoundRobinPartitioner` only send data to even partitions, due to abortForNewBatch is true. So we should implement a round-robin partitioner in flink, and need to automatically discover the partition design was (Author: tinny): [~MartijnVisser] [~renqs] I got through testing that the RoundRobinPartitioner built into kafka does not work either, that is it can't distribute the writes to all partitions equally, due to abortForNewBatch. For example, there are 10 partitions, ` org.apache.kafka.clients.producer.RoundRobinPartitioner` only send data to even partitions, due to abortForNewBatch is true. So we should implement a round-robin partitioner in flink, and need to automatically discover the partition design > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect > -- > > Key: FLINK-26033 > URL: https://issues.apache.org/jira/browse/FLINK-26033 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3 >Reporter: shizhengchao >Assignee: shizhengchao >Priority: Major > Labels: pull-request-available > > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect. Flink treats 'default' and 'round-robin' as the same > strategy. > {code:java} > //代码占位符 > public static Optional> > getFlinkKafkaPartitioner( > ReadableConfig tableOptions, ClassLoader classLoader) { > return tableOptions > .getOptional(SINK_PARTITIONER) > .flatMap( > (String partitioner) -> { > switch (partitioner) { > case SINK_PARTITIONER_VALUE_FIXED: > return Optional.of(new > FlinkFixedPartitioner<>()); > case SINK_PARTITIONER_VALUE_DEFAULT: > case SINK_PARTITIONER_VALUE_ROUND_ROBIN: > return Optional.empty(); > // Default fallback to full class name of the > partitioner. > default: > return Optional.of( > initializePartitioner(partitioner, > classLoader)); > } > }); > } {code} > They both use kafka's default partitioner, but the actual There are two > scenarios for the partition on DefaultPartitioner: > 1. Random when there is no key > 2. When there is a key, take the modulo according to the key > {code:java} > // org.apache.kafka.clients.producer.internals.DefaultPartitioner > public int partition(String topic, Object key, byte[] keyBytes, Object value, > byte[] valueBytes, Cluster cluster) { > if (keyBytes == null) { > // Random when there is no key > return stickyPartitionCache.partition(topic, cluster); > } > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > // hash the keyBytes to choose a partition > return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; > } {code} > Therefore, KafkaConnector does not have a round-robin strategy.But we can > borrow from kafka's RoundRobinPartitioner > {code:java} > //代码占位符 > public class RoundRobinPartitioner implements Partitioner { > private final ConcurrentMap topicCounterMap = new > ConcurrentHashMap<>(); > public void configure(Map configs) {} > /** > * Compute the partition for the given record. > * > * @param topic The topic name > * @param key The key to partition on (or null if no key) > * @param keyBytes serialized key to partition on (or null if no key) > * @param value The value to partition on or null > * @param valueBytes serialized value to partition on or null > * @param cluster The current cluster metadata > */ > @Override > public int partition(String topic, Object key, byte[] keyBytes, Object > value, byte[] valueBytes, Cluster cluster) { > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > int nextValue =
[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect
[ https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500479#comment-17500479 ] shizhengchao commented on FLINK-26033: -- [~MartijnVisser] [~renqs] I got through testing that the RoundRobinPartitioner built into kafka does not work either, that is it can't distribute the writes to all partitions equally, due to abortForNewBatch. For example, there are 10 partitions, ` org.apache.kafka.clients.producer.RoundRobinPartitioner` only send data to even partitions, due to abortForNewBatch is true. So we should implement a round-robin partitioner in flink, and need to automatically discover the partition design > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect > -- > > Key: FLINK-26033 > URL: https://issues.apache.org/jira/browse/FLINK-26033 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3 >Reporter: shizhengchao >Assignee: shizhengchao >Priority: Major > Labels: pull-request-available > > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect. Flink treats 'default' and 'round-robin' as the same > strategy. > {code:java} > //代码占位符 > public static Optional> > getFlinkKafkaPartitioner( > ReadableConfig tableOptions, ClassLoader classLoader) { > return tableOptions > .getOptional(SINK_PARTITIONER) > .flatMap( > (String partitioner) -> { > switch (partitioner) { > case SINK_PARTITIONER_VALUE_FIXED: > return Optional.of(new > FlinkFixedPartitioner<>()); > case SINK_PARTITIONER_VALUE_DEFAULT: > case SINK_PARTITIONER_VALUE_ROUND_ROBIN: > return Optional.empty(); > // Default fallback to full class name of the > partitioner. > default: > return Optional.of( > initializePartitioner(partitioner, > classLoader)); > } > }); > } {code} > They both use kafka's default partitioner, but the actual There are two > scenarios for the partition on DefaultPartitioner: > 1. Random when there is no key > 2. When there is a key, take the modulo according to the key > {code:java} > // org.apache.kafka.clients.producer.internals.DefaultPartitioner > public int partition(String topic, Object key, byte[] keyBytes, Object value, > byte[] valueBytes, Cluster cluster) { > if (keyBytes == null) { > // Random when there is no key > return stickyPartitionCache.partition(topic, cluster); > } > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > // hash the keyBytes to choose a partition > return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; > } {code} > Therefore, KafkaConnector does not have a round-robin strategy.But we can > borrow from kafka's RoundRobinPartitioner > {code:java} > //代码占位符 > public class RoundRobinPartitioner implements Partitioner { > private final ConcurrentMap topicCounterMap = new > ConcurrentHashMap<>(); > public void configure(Map configs) {} > /** > * Compute the partition for the given record. > * > * @param topic The topic name > * @param key The key to partition on (or null if no key) > * @param keyBytes serialized key to partition on (or null if no key) > * @param value The value to partition on or null > * @param valueBytes serialized value to partition on or null > * @param cluster The current cluster metadata > */ > @Override > public int partition(String topic, Object key, byte[] keyBytes, Object > value, byte[] valueBytes, Cluster cluster) { > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > int nextValue = nextValue(topic); > List availablePartitions = > cluster.availablePartitionsForTopic(topic); > if (!availablePartitions.isEmpty()) { > int part = Utils.toPositive(nextValue) % > availablePartitions.size(); > return availablePartitions.get(part).partition(); > } else { > // no partitions are available, give a non-available partition > return Utils.toPositive(nextValue) % numPartitions; > } > } > private int nextValue(String topic) { > AtomicInteger counter =
[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect
[ https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17494327#comment-17494327 ] shizhengchao commented on FLINK-26033: -- [~renqs] I plan to add a robin class that implements the FlinkKafkaPartitioner interface > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect > -- > > Key: FLINK-26033 > URL: https://issues.apache.org/jira/browse/FLINK-26033 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3 >Reporter: shizhengchao >Priority: Major > > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect. Flink treats 'default' and 'round-robin' as the same > strategy. > {code:java} > //代码占位符 > public static Optional> > getFlinkKafkaPartitioner( > ReadableConfig tableOptions, ClassLoader classLoader) { > return tableOptions > .getOptional(SINK_PARTITIONER) > .flatMap( > (String partitioner) -> { > switch (partitioner) { > case SINK_PARTITIONER_VALUE_FIXED: > return Optional.of(new > FlinkFixedPartitioner<>()); > case SINK_PARTITIONER_VALUE_DEFAULT: > case SINK_PARTITIONER_VALUE_ROUND_ROBIN: > return Optional.empty(); > // Default fallback to full class name of the > partitioner. > default: > return Optional.of( > initializePartitioner(partitioner, > classLoader)); > } > }); > } {code} > They both use kafka's default partitioner, but the actual There are two > scenarios for the partition on DefaultPartitioner: > 1. Random when there is no key > 2. When there is a key, take the modulo according to the key > {code:java} > // org.apache.kafka.clients.producer.internals.DefaultPartitioner > public int partition(String topic, Object key, byte[] keyBytes, Object value, > byte[] valueBytes, Cluster cluster) { > if (keyBytes == null) { > // Random when there is no key > return stickyPartitionCache.partition(topic, cluster); > } > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > // hash the keyBytes to choose a partition > return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; > } {code} > Therefore, KafkaConnector does not have a round-robin strategy.But we can > borrow from kafka's RoundRobinPartitioner > {code:java} > //代码占位符 > public class RoundRobinPartitioner implements Partitioner { > private final ConcurrentMap topicCounterMap = new > ConcurrentHashMap<>(); > public void configure(Map configs) {} > /** > * Compute the partition for the given record. > * > * @param topic The topic name > * @param key The key to partition on (or null if no key) > * @param keyBytes serialized key to partition on (or null if no key) > * @param value The value to partition on or null > * @param valueBytes serialized value to partition on or null > * @param cluster The current cluster metadata > */ > @Override > public int partition(String topic, Object key, byte[] keyBytes, Object > value, byte[] valueBytes, Cluster cluster) { > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > int nextValue = nextValue(topic); > List availablePartitions = > cluster.availablePartitionsForTopic(topic); > if (!availablePartitions.isEmpty()) { > int part = Utils.toPositive(nextValue) % > availablePartitions.size(); > return availablePartitions.get(part).partition(); > } else { > // no partitions are available, give a non-available partition > return Utils.toPositive(nextValue) % numPartitions; > } > } > private int nextValue(String topic) { > AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> { > return new AtomicInteger(0); > }); > return counter.getAndIncrement(); > } > public void close() {} > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect
[ https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17494323#comment-17494323 ] shizhengchao commented on FLINK-26033: -- [~renqs] I 'm interested in fix this bug, please assign it to me > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect > -- > > Key: FLINK-26033 > URL: https://issues.apache.org/jira/browse/FLINK-26033 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3 >Reporter: shizhengchao >Priority: Major > > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect. Flink treats 'default' and 'round-robin' as the same > strategy. > {code:java} > //代码占位符 > public static Optional> > getFlinkKafkaPartitioner( > ReadableConfig tableOptions, ClassLoader classLoader) { > return tableOptions > .getOptional(SINK_PARTITIONER) > .flatMap( > (String partitioner) -> { > switch (partitioner) { > case SINK_PARTITIONER_VALUE_FIXED: > return Optional.of(new > FlinkFixedPartitioner<>()); > case SINK_PARTITIONER_VALUE_DEFAULT: > case SINK_PARTITIONER_VALUE_ROUND_ROBIN: > return Optional.empty(); > // Default fallback to full class name of the > partitioner. > default: > return Optional.of( > initializePartitioner(partitioner, > classLoader)); > } > }); > } {code} > They both use kafka's default partitioner, but the actual There are two > scenarios for the partition on DefaultPartitioner: > 1. Random when there is no key > 2. When there is a key, take the modulo according to the key > {code:java} > // org.apache.kafka.clients.producer.internals.DefaultPartitioner > public int partition(String topic, Object key, byte[] keyBytes, Object value, > byte[] valueBytes, Cluster cluster) { > if (keyBytes == null) { > // Random when there is no key > return stickyPartitionCache.partition(topic, cluster); > } > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > // hash the keyBytes to choose a partition > return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; > } {code} > Therefore, KafkaConnector does not have a round-robin strategy.But we can > borrow from kafka's RoundRobinPartitioner > {code:java} > //代码占位符 > public class RoundRobinPartitioner implements Partitioner { > private final ConcurrentMap topicCounterMap = new > ConcurrentHashMap<>(); > public void configure(Map configs) {} > /** > * Compute the partition for the given record. > * > * @param topic The topic name > * @param key The key to partition on (or null if no key) > * @param keyBytes serialized key to partition on (or null if no key) > * @param value The value to partition on or null > * @param valueBytes serialized value to partition on or null > * @param cluster The current cluster metadata > */ > @Override > public int partition(String topic, Object key, byte[] keyBytes, Object > value, byte[] valueBytes, Cluster cluster) { > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > int nextValue = nextValue(topic); > List availablePartitions = > cluster.availablePartitionsForTopic(topic); > if (!availablePartitions.isEmpty()) { > int part = Utils.toPositive(nextValue) % > availablePartitions.size(); > return availablePartitions.get(part).partition(); > } else { > // no partitions are available, give a non-available partition > return Utils.toPositive(nextValue) % numPartitions; > } > } > private int nextValue(String topic) { > AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> { > return new AtomicInteger(0); > }); > return counter.getAndIncrement(); > } > public void close() {} > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect
[ https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17489345#comment-17489345 ] shizhengchao commented on FLINK-26033: -- [~MartijnVisser] This is not a problem, but in higher versions of kafka-clinet, such as 2.4.1, there will be problems > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect > -- > > Key: FLINK-26033 > URL: https://issues.apache.org/jira/browse/FLINK-26033 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.3, 1.14.3 >Reporter: shizhengchao >Priority: Major > > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect. Flink treats 'default' and 'round-robin' as the same > strategy. > {code:java} > //代码占位符 > public static Optional> > getFlinkKafkaPartitioner( > ReadableConfig tableOptions, ClassLoader classLoader) { > return tableOptions > .getOptional(SINK_PARTITIONER) > .flatMap( > (String partitioner) -> { > switch (partitioner) { > case SINK_PARTITIONER_VALUE_FIXED: > return Optional.of(new > FlinkFixedPartitioner<>()); > case SINK_PARTITIONER_VALUE_DEFAULT: > case SINK_PARTITIONER_VALUE_ROUND_ROBIN: > return Optional.empty(); > // Default fallback to full class name of the > partitioner. > default: > return Optional.of( > initializePartitioner(partitioner, > classLoader)); > } > }); > } {code} > They both use kafka's default partitioner, but the actual There are two > scenarios for the partition on DefaultPartitioner: > 1. Random when there is no key > 2. When there is a key, take the modulo according to the key > {code:java} > // org.apache.kafka.clients.producer.internals.DefaultPartitioner > public int partition(String topic, Object key, byte[] keyBytes, Object value, > byte[] valueBytes, Cluster cluster) { > if (keyBytes == null) { > // Random when there is no key > return stickyPartitionCache.partition(topic, cluster); > } > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > // hash the keyBytes to choose a partition > return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; > } {code} > Therefore, KafkaConnector does not have a round-robin strategy.But we can > borrow from kafka's RoundRobinPartitioner > {code:java} > //代码占位符 > public class RoundRobinPartitioner implements Partitioner { > private final ConcurrentMap topicCounterMap = new > ConcurrentHashMap<>(); > public void configure(Map configs) {} > /** > * Compute the partition for the given record. > * > * @param topic The topic name > * @param key The key to partition on (or null if no key) > * @param keyBytes serialized key to partition on (or null if no key) > * @param value The value to partition on or null > * @param valueBytes serialized value to partition on or null > * @param cluster The current cluster metadata > */ > @Override > public int partition(String topic, Object key, byte[] keyBytes, Object > value, byte[] valueBytes, Cluster cluster) { > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > int nextValue = nextValue(topic); > List availablePartitions = > cluster.availablePartitionsForTopic(topic); > if (!availablePartitions.isEmpty()) { > int part = Utils.toPositive(nextValue) % > availablePartitions.size(); > return availablePartitions.get(part).partition(); > } else { > // no partitions are available, give a non-available partition > return Utils.toPositive(nextValue) % numPartitions; > } > } > private int nextValue(String topic) { > AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> { > return new AtomicInteger(0); > }); > return counter.getAndIncrement(); > } > public void close() {} > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect
[ https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17489342#comment-17489342 ] shizhengchao commented on FLINK-26033: -- [~MartijnVisser] I made a mistake,I re-checked, the documentation only applies to older kafka versions, such as kafka-client-2.0.1, and the problem I'm talking about is kafka-client-2.4.1. {code:java} //kafka-2.0.1 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = nextValue(topic); List availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } // kafka-2.4.1 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { if (keyBytes == null) { return stickyPartitionCache.partition(topic, cluster); } List partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } public int nextPartition(String topic, Cluster cluster, int prevPartition) { List partitions = cluster.partitionsForTopic(topic); Integer oldPart = indexCache.get(topic); Integer newPart = oldPart; // Check that the current sticky partition for the topic is either not set or that the partition that // triggered the new batch matches the sticky partition that needs to be changed. if (oldPart == null || oldPart == prevPartition) { List availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() < 1) { Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = random % partitions.size(); } else if (availablePartitions.size() == 1) { newPart = availablePartitions.get(0).partition(); } else { while (newPart == null || newPart.equals(oldPart)) { Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = availablePartitions.get(random % availablePartitions.size()).partition(); } } // Only change the sticky partition if it is null or prevPartition matches the current sticky partition. if (oldPart == null) { indexCache.putIfAbsent(topic, newPart); } else { indexCache.replace(topic, prevPartition, newPart); } return indexCache.get(topic); } return indexCache.get(topic); } {code} > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect > -- > > Key: FLINK-26033 > URL: https://issues.apache.org/jira/browse/FLINK-26033 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.3, 1.14.3 >Reporter: shizhengchao >Priority: Major > > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect. Flink treats 'default' and 'round-robin' as the same > strategy. > {code:java} > //代码占位符 > public static Optional> > getFlinkKafkaPartitioner( > ReadableConfig tableOptions, ClassLoader classLoader) { > return tableOptions > .getOptional(SINK_PARTITIONER) > .flatMap( > (String partitioner) -> { > switch (partitioner) { > case SINK_PARTITIONER_VALUE_FIXED: > return Optional.of(new > FlinkFixedPartitioner<>()); > case SINK_PARTITIONER_VALUE_DEFAULT: > case SINK_PARTITIONER_VALUE_ROUND_ROBIN: > return Optional.empty(); > // Default fallback to full class name of the > partitioner. > default: > return Optional.of( > initializePartitioner(partitioner, > classLoader)); > } >
[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect
[ https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17489298#comment-17489298 ] shizhengchao commented on FLINK-26033: -- [~jark] [~libenchao] Can you check this question? > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect > -- > > Key: FLINK-26033 > URL: https://issues.apache.org/jira/browse/FLINK-26033 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.3, 1.14.3 >Reporter: shizhengchao >Priority: Major > > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect. Flink treats 'default' and 'round-robin' as the same > strategy. > {code:java} > //代码占位符 > public static Optional> > getFlinkKafkaPartitioner( > ReadableConfig tableOptions, ClassLoader classLoader) { > return tableOptions > .getOptional(SINK_PARTITIONER) > .flatMap( > (String partitioner) -> { > switch (partitioner) { > case SINK_PARTITIONER_VALUE_FIXED: > return Optional.of(new > FlinkFixedPartitioner<>()); > case SINK_PARTITIONER_VALUE_DEFAULT: > case SINK_PARTITIONER_VALUE_ROUND_ROBIN: > return Optional.empty(); > // Default fallback to full class name of the > partitioner. > default: > return Optional.of( > initializePartitioner(partitioner, > classLoader)); > } > }); > } {code} > They both use kafka's default partitioner, but the actual There are two > scenarios for the partition on DefaultPartitioner: > 1. Random when there is no key > 2. When there is a key, take the modulo according to the key > {code:java} > // org.apache.kafka.clients.producer.internals.DefaultPartitioner > public int partition(String topic, Object key, byte[] keyBytes, Object value, > byte[] valueBytes, Cluster cluster) { > if (keyBytes == null) { > // Random when there is no key > return stickyPartitionCache.partition(topic, cluster); > } > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > // hash the keyBytes to choose a partition > return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; > } {code} > Therefore, KafkaConnector does not have a round-robin strategy.But we can > borrow from kafka's RoundRobinPartitioner > {code:java} > //代码占位符 > public class RoundRobinPartitioner implements Partitioner { > private final ConcurrentMap topicCounterMap = new > ConcurrentHashMap<>(); > public void configure(Map configs) {} > /** > * Compute the partition for the given record. > * > * @param topic The topic name > * @param key The key to partition on (or null if no key) > * @param keyBytes serialized key to partition on (or null if no key) > * @param value The value to partition on or null > * @param valueBytes serialized value to partition on or null > * @param cluster The current cluster metadata > */ > @Override > public int partition(String topic, Object key, byte[] keyBytes, Object > value, byte[] valueBytes, Cluster cluster) { > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > int nextValue = nextValue(topic); > List availablePartitions = > cluster.availablePartitionsForTopic(topic); > if (!availablePartitions.isEmpty()) { > int part = Utils.toPositive(nextValue) % > availablePartitions.size(); > return availablePartitions.get(part).partition(); > } else { > // no partitions are available, give a non-available partition > return Utils.toPositive(nextValue) % numPartitions; > } > } > private int nextValue(String topic) { > AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> { > return new AtomicInteger(0); > }); > return counter.getAndIncrement(); > } > public void close() {} > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect
shizhengchao created FLINK-26033: Summary: In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect Key: FLINK-26033 URL: https://issues.apache.org/jira/browse/FLINK-26033 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.14.3, 1.13.3 Reporter: shizhengchao In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect. Flink treats 'default' and 'round-robin' as the same strategy. {code:java} //代码占位符 public static Optional> getFlinkKafkaPartitioner( ReadableConfig tableOptions, ClassLoader classLoader) { return tableOptions .getOptional(SINK_PARTITIONER) .flatMap( (String partitioner) -> { switch (partitioner) { case SINK_PARTITIONER_VALUE_FIXED: return Optional.of(new FlinkFixedPartitioner<>()); case SINK_PARTITIONER_VALUE_DEFAULT: case SINK_PARTITIONER_VALUE_ROUND_ROBIN: return Optional.empty(); // Default fallback to full class name of the partitioner. default: return Optional.of( initializePartitioner(partitioner, classLoader)); } }); } {code} They both use kafka's default partitioner, but the actual There are two scenarios for the partition on DefaultPartitioner: 1. Random when there is no key 2. When there is a key, take the modulo according to the key {code:java} // org.apache.kafka.clients.producer.internals.DefaultPartitioner public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { if (keyBytes == null) { // Random when there is no key return stickyPartitionCache.partition(topic, cluster); } List partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } {code} Therefore, KafkaConnector does not have a round-robin strategy.But we can borrow from kafka's RoundRobinPartitioner {code:java} //代码占位符 public class RoundRobinPartitioner implements Partitioner { private final ConcurrentMap topicCounterMap = new ConcurrentHashMap<>(); public void configure(Map configs) {} /** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); int nextValue = nextValue(topic); List availablePartitions = cluster.availablePartitionsForTopic(topic); if (!availablePartitions.isEmpty()) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> { return new AtomicInteger(0); }); return counter.getAndIncrement(); } public void close() {} } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25703) In Batch, I think .stagingPath should be created when the task is running to prevent permission issues caused by inconsistency of hadoop usernames
shizhengchao created FLINK-25703: Summary: In Batch, I think .stagingPath should be created when the task is running to prevent permission issues caused by inconsistency of hadoop usernames Key: FLINK-25703 URL: https://issues.apache.org/jira/browse/FLINK-25703 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.14.3 Reporter: shizhengchao If the HADOOP_USER_NAME set on the client side is user_a, but when the task is running in the TaskManager is user_b(this is achievable), then the Batch task will cause permission problems, as follows: {code:java} //代码占位符 Caused by: org.apache.hadoop.security.AccessControlException: Permission denied: user=user_b, access=WRITE, inode="/where/to/whatever/.staging__1642575264185":user_a:supergroup:drwxr-xr-x{code} because the ./staging__1642575264185 is created by user_a。 So we should move the code that creates the .staging directory into the FileSystemOutputFormat#open method : {code:java} //代码占位符 @Override public void open(int taskNumber, int numTasks) throws IOException { try { // check temppath and create it checkOrCreateTmpPath(); PartitionTempFileManager fileManager = new PartitionTempFileManager( fsFactory, tmpPath, taskNumber, CHECKPOINT_ID, outputFileConfig); PartitionWriter.Context context = new PartitionWriter.Context<>(parameters, formatFactory); writer = PartitionWriterFactory.get( partitionColumns.length - staticPartitions.size() > 0, dynamicGrouped, staticPartitions) .create(context, fileManager, computer); } catch (Exception e) { throw new TableException("Exception in open", e); } } private void checkOrCreateTmpPath() throws Exception { FileSystem fs = tmpPath.getFileSystem(); Preconditions.checkState(fs.exists(tmpPath) || fs.mkdirs(tmpPath), "Failed to create staging dir " + tmpPath); } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25124) A deadlock occurs when the jdbc sink uses two consecutive dimension tables to associate
shizhengchao created FLINK-25124: Summary: A deadlock occurs when the jdbc sink uses two consecutive dimension tables to associate Key: FLINK-25124 URL: https://issues.apache.org/jira/browse/FLINK-25124 Project: Flink Issue Type: Bug Components: Connectors / JDBC Affects Versions: 1.13.1 Reporter: shizhengchao The sql statement is as follows: {code:java} //代码占位符 INSERT INTO imei_phone_domestic_realtime SELECT t.data.imei AS imei, CAST(t.data.register_date_key AS bigint) AS register_date_key, c.agent_type AS channel_name, c.agent_short_name, c.agent_name, c.agent_chinese_name, c.isforeign AS agent_market_type, p.seriename AS series_name, p.salename AS sale_name, p.devname AS dev_name, p.devnamesource AS dev_name_source, p.color, p.isforeign AS product_market_type, p.carrier, p.lcname AS life_cycle, IFNULL(p.shipping_price,0) AS shipping_price, IFNULL(p.retail_price,0) AS retail_price FROM kafka_imei_phone_domestic_realtime AS t LEFT JOIN dim_product FOR SYSTEM_TIME AS OF t.proctime AS p ON p.pn=t.item_code LEFT JOIN dim_customer FOR SYSTEM_TIME AS OF t.proctime AS c ON c.customer_code=t.customer_code where t.eventType='update'; {code} There will be a probability of deadlock: {code:java} //代码占位符 "jdbc-upsert-output-format-thread-1" Id=84 BLOCKED on org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat@649788af owned by "Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, kafka_imei_phone_domestic_realtime]], fields=[data, eventType]) -> Calc(select=[data, data.item_code AS $f3], where=[(eventType = _UTF-16LE'update':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> LookupJoin(table=[default_catalog.default_database.dim_product], joinType=[LeftOuterJoin], async=[false], lookup=[pn=$f3], select=[data, $f3, pn, color, isforeign, devname, salename, seriename, lcname, carrier, devnamesource, shipping_price, retail_price]) -> Calc(select=[data, color, isforeign, devname, salename, seriename, lcname, carrier, devnamesource, shipping_price, retail_price, data.customer_code AS $f31]) -> LookupJoin(table=[default_catalog.default_database.dim_customer], joinType=[LeftOuterJoin], async=[false], lookup=[customer_code=$f31], select=[data, color, isforeign, devname, salename, seriename, lcname, carrier, devnamesource, shipping_price, retail_price, $f31, customer_code, agent_short_name, agent_name, isforeign, agent_type, agent_chinese_name]) -> Calc(select=[data.imei AS imei, CAST(data.register_date_key) AS register_date_key, agent_type AS channel_name, agent_short_name, agent_name, agent_chinese_name, isforeign0 AS agent_market_type, seriename AS series_name, salename AS sale_name, devname AS dev_name, devnamesource AS dev_name_source, color, isforeign AS product_market_type, carrier, lcname AS life_cycle, IFNULL(shipping_price, 0:DECIMAL(10, 0)) AS shipping_price, IFNULL(retail_price, 0:DECIMAL(10, 0)) AS retail_price]) -> NotNullEnforcer(fields=[imei]) -> Sink: Sink(table=[default_catalog.default_database.imei_phone_domestic_realtime], fields=[imei, register_date_key, channel_name, agent_short_name, agent_name, agent_chinese_name, agent_market_type, series_name, sale_name, dev_name, dev_name_source, color, product_market_type, carrier, life_cycle, shipping_price, retail_price]) (6/12)#0" Id=82 at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:124) - blocked on org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat@649788af at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat$$Lambda$344/21845506.run(Unknown Source) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... Number of locked synchronizers = 1 - java.util.concurrent.ThreadPoolExecutor$Worker@325612a2 {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-23096) HiveParser could not attach the sessionstate of hive
[ https://issues.apache.org/jira/browse/FLINK-23096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367969#comment-17367969 ] shizhengchao commented on FLINK-23096: -- The *clearSessionState* method catches IOException, which causes the root cause to be overwritten。When I caught the Exception, I got the root cause: {code:java} Caused by: org.apache.hadoop.security.AccessControlException: Permission denied: user=service, access=WRITE, inode="/tmp":hdfs:supergroup:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:280) ... at org.apache.hadoop.hive.ql.exec.Utilities.createDirsWithPermission(Utilities.java:3678) at org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:597) at org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:554) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:508) {code} > HiveParser could not attach the sessionstate of hive > > > Key: FLINK-23096 > URL: https://issues.apache.org/jira/browse/FLINK-23096 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.13.1 >Reporter: shizhengchao >Priority: Major > > My sql code is as follows: > {code:java} > //代码占位符 > CREATE CATALOG myhive WITH ( > 'type' = 'hive', > 'default-database' = 'default', > 'hive-conf-dir' = '/home/service/upload-job-file/1624269463008' > ); > use catalog hive; > set 'table.sql-dialect' = 'hive'; > create view if not exists view_test as > select > cast(goods_id as string) as goods_id, > cast(depot_id as string) as depot_id, > cast(product_id as string) as product_id, > cast(tenant_code as string) as tenant_code > from edw.dim_yezi_whse_goods_base_info/*+ > OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */; > {code} > and the exception is as follows: > {code:java} > //代码占位符 > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: Conf non-local session path expected to be non-null > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) > at > org.apache.flink.client.cli.CliFrontend$$Lambda$68/330382173.call(Unknown > Source) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext$$Lambda$69/680712932.run(Unknown > Source) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) > Caused by: java.lang.NullPointerException: Conf non-local session path > expected to be non-null > at > com.google.common.base.Preconditions.checkNotNull(Preconditions.java:208) > at > org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:669) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.clearSessionState(HiveParser.java:376) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:219) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) > at > com.shizhengchao.io.FlinkSqlStreamingPlatform.callFlinkSql(FlinkSqlStreamingPlatform.java:157) > at > com.shizhengchao.io.FlinkSqlStreamingPlatform.callCommand(FlinkSqlStreamingPlatform.java:129) > at > com.shizhengchao.io.FlinkSqlStreamingPlatform.run(FlinkSqlStreamingPlatform.java:91) > at > com.shizhengchao.io.FlinkSqlStreamingPlatform.main(FlinkSqlStreamingPlatform.java:66) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at >
[jira] [Comment Edited] (FLINK-23096) HiveParser could not attach the sessionstate of hive
[ https://issues.apache.org/jira/browse/FLINK-23096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367832#comment-17367832 ] shizhengchao edited comment on FLINK-23096 at 6/23/21, 3:29 AM: [~Leonard Xu] [~lirui] {code:java} //代码占位符 private void clearSessionState(HiveConf hiveConf) { SessionState sessionState = SessionState.get(); if (sessionState != null) { try { sessionState.close(); List toDelete = new ArrayList<>(); toDelete.add(SessionState.getHDFSSessionPath(hiveConf)); toDelete.add(SessionState.getLocalSessionPath(hiveConf)); for (Path path : toDelete) { FileSystem fs = path.getFileSystem(hiveConf); fs.delete(path, true); } } catch (IOException e) { LOG.warn("Error closing SessionState", e); } } public static Path getHDFSSessionPath(Configuration conf) { SessionState ss = SessionState.get(); if (ss == null) { String sessionPathString = conf.get(HDFS_SESSION_PATH_KEY); Preconditions.checkNotNull(sessionPathString, "Conf non-local session path expected to be non-null"); return new Path(sessionPathString); } Preconditions.checkNotNull(ss.hdfsSessionPath, "Non-local session path expected to be non-null"); return ss.hdfsSessionPath; } }{code} beacuse the session is closed,so SessionState.get() is null. was (Author: tinny): [~Leonard Xu] [~lirui] {code:java} //代码占位符 private void clearSessionState(HiveConf hiveConf) { SessionState sessionState = SessionState.get(); if (sessionState != null) { try { sessionState.close(); List toDelete = new ArrayList<>(); toDelete.add(SessionState.getHDFSSessionPath(hiveConf)); toDelete.add(SessionState.getLocalSessionPath(hiveConf)); for (Path path : toDelete) { FileSystem fs = path.getFileSystem(hiveConf); fs.delete(path, true); } } catch (IOException e) { LOG.warn("Error closing SessionState", e); } } public static Path getHDFSSessionPath(Configuration conf) { SessionState ss = SessionState.get(); if (ss == null) { String sessionPathString = conf.get(HDFS_SESSION_PATH_KEY); Preconditions.checkNotNull(sessionPathString, "Conf non-local session path expected to be non-null"); return new Path(sessionPathString); } Preconditions.checkNotNull(ss.hdfsSessionPath, "Non-local session path expected to be non-null"); return ss.hdfsSessionPath; } }{code} beacuse the session is closed,so SessionState.get() will be cause an NPE. > HiveParser could not attach the sessionstate of hive > > > Key: FLINK-23096 > URL: https://issues.apache.org/jira/browse/FLINK-23096 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.13.1 >Reporter: shizhengchao >Priority: Major > > My sql code is as follows: > {code:java} > //代码占位符 > CREATE CATALOG myhive WITH ( > 'type' = 'hive', > 'default-database' = 'default', > 'hive-conf-dir' = '/home/service/upload-job-file/1624269463008' > ); > use catalog hive; > set 'table.sql-dialect' = 'hive'; > create view if not exists view_test as > select > cast(goods_id as string) as goods_id, > cast(depot_id as string) as depot_id, > cast(product_id as string) as product_id, > cast(tenant_code as string) as tenant_code > from edw.dim_yezi_whse_goods_base_info/*+ > OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */; > {code} > and the exception is as follows: > {code:java} > //代码占位符 > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: Conf non-local session path expected to be non-null > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) > at > org.apache.flink.client.cli.CliFrontend$$Lambda$68/330382173.call(Unknown > Source) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext$$Lambda$69/680712932.run(Unknown > Source) > at
[jira] [Comment Edited] (FLINK-23096) HiveParser could not attach the sessionstate of hive
[ https://issues.apache.org/jira/browse/FLINK-23096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367832#comment-17367832 ] shizhengchao edited comment on FLINK-23096 at 6/23/21, 3:10 AM: [~Leonard Xu] [~lirui] {code:java} //代码占位符 private void clearSessionState(HiveConf hiveConf) { SessionState sessionState = SessionState.get(); if (sessionState != null) { try { sessionState.close(); List toDelete = new ArrayList<>(); toDelete.add(SessionState.getHDFSSessionPath(hiveConf)); toDelete.add(SessionState.getLocalSessionPath(hiveConf)); for (Path path : toDelete) { FileSystem fs = path.getFileSystem(hiveConf); fs.delete(path, true); } } catch (IOException e) { LOG.warn("Error closing SessionState", e); } } public static Path getHDFSSessionPath(Configuration conf) { SessionState ss = SessionState.get(); if (ss == null) { String sessionPathString = conf.get(HDFS_SESSION_PATH_KEY); Preconditions.checkNotNull(sessionPathString, "Conf non-local session path expected to be non-null"); return new Path(sessionPathString); } Preconditions.checkNotNull(ss.hdfsSessionPath, "Non-local session path expected to be non-null"); return ss.hdfsSessionPath; } }{code} beacuse the session is closed,so SessionState.get() will be cause an NPE. was (Author: tinny): [~Leonard Xu] [~lirui] {code:java} //代码占位符 private void clearSessionState(HiveConf hiveConf) { SessionState sessionState = SessionState.get(); if (sessionState != null) { try { sessionState.close(); List toDelete = new ArrayList<>(); toDelete.add(SessionState.getHDFSSessionPath(hiveConf)); toDelete.add(SessionState.getLocalSessionPath(hiveConf)); for (Path path : toDelete) { FileSystem fs = path.getFileSystem(hiveConf); fs.delete(path, true); } } catch (IOException e) { LOG.warn("Error closing SessionState", e); } } }{code} beacuse the session is closed,so SessionState.getHDFSSessionPath(hiveConf) will be cause an NPE. > HiveParser could not attach the sessionstate of hive > > > Key: FLINK-23096 > URL: https://issues.apache.org/jira/browse/FLINK-23096 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.13.1 >Reporter: shizhengchao >Priority: Major > > My sql code is as follows: > {code:java} > //代码占位符 > CREATE CATALOG myhive WITH ( > 'type' = 'hive', > 'default-database' = 'default', > 'hive-conf-dir' = '/home/service/upload-job-file/1624269463008' > ); > use catalog hive; > set 'table.sql-dialect' = 'hive'; > create view if not exists view_test as > select > cast(goods_id as string) as goods_id, > cast(depot_id as string) as depot_id, > cast(product_id as string) as product_id, > cast(tenant_code as string) as tenant_code > from edw.dim_yezi_whse_goods_base_info/*+ > OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */; > {code} > and the exception is as follows: > {code:java} > //代码占位符 > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: Conf non-local session path expected to be non-null > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) > at > org.apache.flink.client.cli.CliFrontend$$Lambda$68/330382173.call(Unknown > Source) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext$$Lambda$69/680712932.run(Unknown > Source) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) > Caused by: java.lang.NullPointerException: Conf
[jira] [Comment Edited] (FLINK-23096) HiveParser could not attach the sessionstate of hive
[ https://issues.apache.org/jira/browse/FLINK-23096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367832#comment-17367832 ] shizhengchao edited comment on FLINK-23096 at 6/23/21, 3:09 AM: [~Leonard Xu] [~lirui] {code:java} //代码占位符 private void clearSessionState(HiveConf hiveConf) { SessionState sessionState = SessionState.get(); if (sessionState != null) { try { sessionState.close(); List toDelete = new ArrayList<>(); toDelete.add(SessionState.getHDFSSessionPath(hiveConf)); toDelete.add(SessionState.getLocalSessionPath(hiveConf)); for (Path path : toDelete) { FileSystem fs = path.getFileSystem(hiveConf); fs.delete(path, true); } } catch (IOException e) { LOG.warn("Error closing SessionState", e); } } }{code} beacuse the session is closed,so SessionState.getHDFSSessionPath(hiveConf) will be cause an NPE. was (Author: tinny): [~Leonard Xu] [~lirui] {code:java} //代码占位符private void clearSessionState(HiveConf hiveConf) { SessionState sessionState = SessionState.get(); if (sessionState != null) { try { sessionState.close(); List toDelete = new ArrayList<>(); toDelete.add(SessionState.getHDFSSessionPath(hiveConf)); toDelete.add(SessionState.getLocalSessionPath(hiveConf)); for (Path path : toDelete) { FileSystem fs = path.getFileSystem(hiveConf); fs.delete(path, true); } } catch (IOException e) { LOG.warn("Error closing SessionState", e); } } }{code} beacuse the session is closed,so SessionState.getHDFSSessionPath(hiveConf) will be cause an NPE. > HiveParser could not attach the sessionstate of hive > > > Key: FLINK-23096 > URL: https://issues.apache.org/jira/browse/FLINK-23096 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.13.1 >Reporter: shizhengchao >Priority: Major > > My sql code is as follows: > {code:java} > //代码占位符 > CREATE CATALOG myhive WITH ( > 'type' = 'hive', > 'default-database' = 'default', > 'hive-conf-dir' = '/home/service/upload-job-file/1624269463008' > ); > use catalog hive; > set 'table.sql-dialect' = 'hive'; > create view if not exists view_test as > select > cast(goods_id as string) as goods_id, > cast(depot_id as string) as depot_id, > cast(product_id as string) as product_id, > cast(tenant_code as string) as tenant_code > from edw.dim_yezi_whse_goods_base_info/*+ > OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */; > {code} > and the exception is as follows: > {code:java} > //代码占位符 > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: Conf non-local session path expected to be non-null > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) > at > org.apache.flink.client.cli.CliFrontend$$Lambda$68/330382173.call(Unknown > Source) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext$$Lambda$69/680712932.run(Unknown > Source) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) > Caused by: java.lang.NullPointerException: Conf non-local session path > expected to be non-null > at > com.google.common.base.Preconditions.checkNotNull(Preconditions.java:208) > at > org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:669) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.clearSessionState(HiveParser.java:376) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:219) > at >
[jira] [Comment Edited] (FLINK-23096) HiveParser could not attach the sessionstate of hive
[ https://issues.apache.org/jira/browse/FLINK-23096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367832#comment-17367832 ] shizhengchao edited comment on FLINK-23096 at 6/23/21, 3:07 AM: [~Leonard Xu] [~lirui] {code:java} //代码占位符private void clearSessionState(HiveConf hiveConf) { SessionState sessionState = SessionState.get(); if (sessionState != null) { try { sessionState.close(); List toDelete = new ArrayList<>(); toDelete.add(SessionState.getHDFSSessionPath(hiveConf)); toDelete.add(SessionState.getLocalSessionPath(hiveConf)); for (Path path : toDelete) { FileSystem fs = path.getFileSystem(hiveConf); fs.delete(path, true); } } catch (IOException e) { LOG.warn("Error closing SessionState", e); } } }{code} beacuse the session is closed,so SessionState.getHDFSSessionPath(hiveConf) will be cause an NPE. was (Author: tinny): [~Leonard Xu] [~lirui] > HiveParser could not attach the sessionstate of hive > > > Key: FLINK-23096 > URL: https://issues.apache.org/jira/browse/FLINK-23096 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.13.1 >Reporter: shizhengchao >Priority: Major > > My sql code is as follows: > {code:java} > //代码占位符 > CREATE CATALOG myhive WITH ( > 'type' = 'hive', > 'default-database' = 'default', > 'hive-conf-dir' = '/home/service/upload-job-file/1624269463008' > ); > use catalog hive; > set 'table.sql-dialect' = 'hive'; > create view if not exists view_test as > select > cast(goods_id as string) as goods_id, > cast(depot_id as string) as depot_id, > cast(product_id as string) as product_id, > cast(tenant_code as string) as tenant_code > from edw.dim_yezi_whse_goods_base_info/*+ > OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */; > {code} > and the exception is as follows: > {code:java} > //代码占位符 > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: Conf non-local session path expected to be non-null > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) > at > org.apache.flink.client.cli.CliFrontend$$Lambda$68/330382173.call(Unknown > Source) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext$$Lambda$69/680712932.run(Unknown > Source) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) > Caused by: java.lang.NullPointerException: Conf non-local session path > expected to be non-null > at > com.google.common.base.Preconditions.checkNotNull(Preconditions.java:208) > at > org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:669) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.clearSessionState(HiveParser.java:376) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:219) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) > at > com.shizhengchao.io.FlinkSqlStreamingPlatform.callFlinkSql(FlinkSqlStreamingPlatform.java:157) > at > com.shizhengchao.io.FlinkSqlStreamingPlatform.callCommand(FlinkSqlStreamingPlatform.java:129) > at > com.shizhengchao.io.FlinkSqlStreamingPlatform.run(FlinkSqlStreamingPlatform.java:91) > at > com.shizhengchao.io.FlinkSqlStreamingPlatform.main(FlinkSqlStreamingPlatform.java:66) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at >
[jira] [Commented] (FLINK-23096) HiveParser could not attach the sessionstate of hive
[ https://issues.apache.org/jira/browse/FLINK-23096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367832#comment-17367832 ] shizhengchao commented on FLINK-23096: -- [~Leonard Xu] [~lirui] > HiveParser could not attach the sessionstate of hive > > > Key: FLINK-23096 > URL: https://issues.apache.org/jira/browse/FLINK-23096 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.13.1 >Reporter: shizhengchao >Priority: Major > > My sql code is as follows: > {code:java} > //代码占位符 > CREATE CATALOG myhive WITH ( > 'type' = 'hive', > 'default-database' = 'default', > 'hive-conf-dir' = '/home/service/upload-job-file/1624269463008' > ); > use catalog hive; > set 'table.sql-dialect' = 'hive'; > create view if not exists view_test as > select > cast(goods_id as string) as goods_id, > cast(depot_id as string) as depot_id, > cast(product_id as string) as product_id, > cast(tenant_code as string) as tenant_code > from edw.dim_yezi_whse_goods_base_info/*+ > OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */; > {code} > and the exception is as follows: > {code:java} > //代码占位符 > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: Conf non-local session path expected to be non-null > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) > at > org.apache.flink.client.cli.CliFrontend$$Lambda$68/330382173.call(Unknown > Source) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext$$Lambda$69/680712932.run(Unknown > Source) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) > Caused by: java.lang.NullPointerException: Conf non-local session path > expected to be non-null > at > com.google.common.base.Preconditions.checkNotNull(Preconditions.java:208) > at > org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:669) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.clearSessionState(HiveParser.java:376) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:219) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) > at > com.shizhengchao.io.FlinkSqlStreamingPlatform.callFlinkSql(FlinkSqlStreamingPlatform.java:157) > at > com.shizhengchao.io.FlinkSqlStreamingPlatform.callCommand(FlinkSqlStreamingPlatform.java:129) > at > com.shizhengchao.io.FlinkSqlStreamingPlatform.run(FlinkSqlStreamingPlatform.java:91) > at > com.shizhengchao.io.FlinkSqlStreamingPlatform.main(FlinkSqlStreamingPlatform.java:66) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > ... 13 common frames omitted > {code} > My guess is that sessionstate is not set to threadlocal: > {code:java} > //代码占位符 > // @see org.apache.hadoop.hive.ql.session.SessionState.setCurrentSessionState > public static void setCurrentSessionState(SessionState startSs) { > tss.get().attach(startSs); > } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23096) HiveParser could not attach the sessionstate of hive
[ https://issues.apache.org/jira/browse/FLINK-23096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367327#comment-17367327 ] shizhengchao commented on FLINK-23096: -- This is the location of the code exception I found: {code:java} //代码占位符 // @see org.apache.flink.table.planner.delegation.hive.HiveParser private void clearSessionState(HiveConf hiveConf) { SessionState sessionState = SessionState.get(); if (sessionState != null) { try { sessionState.close(); List toDelete = new ArrayList<>(); toDelete.add(SessionState.getHDFSSessionPath(hiveConf)); toDelete.add(SessionState.getLocalSessionPath(hiveConf)); for (Path path : toDelete) { FileSystem fs = path.getFileSystem(hiveConf); fs.delete(path, true); } } catch (IOException e) { LOG.warn("Error closing SessionState", e); } } } // @see org.apache.hadoop.hive.ql.session.SessionState public static Path getHDFSSessionPath(Configuration conf) { SessionState ss = SessionState.get(); if (ss == null) { String sessionPathString = conf.get(HDFS_SESSION_PATH_KEY); Preconditions.checkNotNull(sessionPathString, "Conf non-local session path expected to be non-null"); return new Path(sessionPathString); } Preconditions.checkNotNull(ss.hdfsSessionPath, "Non-local session path expected to be non-null"); return ss.hdfsSessionPath; } {code} > HiveParser could not attach the sessionstate of hive > > > Key: FLINK-23096 > URL: https://issues.apache.org/jira/browse/FLINK-23096 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.13.1 >Reporter: shizhengchao >Priority: Major > > My sql code is as follows: > {code:java} > //代码占位符 > CREATE CATALOG myhive WITH ( > 'type' = 'hive', > 'default-database' = 'default', > 'hive-conf-dir' = '/home/service/upload-job-file/1624269463008' > ); > use catalog hive; > set 'table.sql-dialect' = 'hive'; > create view if not exists view_test as > select > cast(goods_id as string) as goods_id, > cast(depot_id as string) as depot_id, > cast(product_id as string) as product_id, > cast(tenant_code as string) as tenant_code > from edw.dim_yezi_whse_goods_base_info/*+ > OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */; > {code} > and the exception is as follows: > {code:java} > //代码占位符 > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: Conf non-local session path expected to be non-null > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) > at > org.apache.flink.client.cli.CliFrontend$$Lambda$68/330382173.call(Unknown > Source) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext$$Lambda$69/680712932.run(Unknown > Source) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) > Caused by: java.lang.NullPointerException: Conf non-local session path > expected to be non-null > at > com.google.common.base.Preconditions.checkNotNull(Preconditions.java:208) > at > org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:669) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.clearSessionState(HiveParser.java:376) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:219) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) > at > com.shizhengchao.io.FlinkSqlStreamingPlatform.callFlinkSql(FlinkSqlStreamingPlatform.java:157) > at >
[jira] [Updated] (FLINK-23096) HiveParser could not attach the sessionstate of hive
[ https://issues.apache.org/jira/browse/FLINK-23096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shizhengchao updated FLINK-23096: - Description: My sql code is as follows: {code:java} //代码占位符 CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-conf-dir' = '/home/service/upload-job-file/1624269463008' ); use catalog hive; set 'table.sql-dialect' = 'hive'; create view if not exists view_test as select cast(goods_id as string) as goods_id, cast(depot_id as string) as depot_id, cast(product_id as string) as product_id, cast(tenant_code as string) as tenant_code from edw.dim_yezi_whse_goods_base_info/*+ OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */; {code} and the exception is as follows: {code:java} //代码占位符 org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Conf non-local session path expected to be non-null at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) at org.apache.flink.client.cli.CliFrontend$$Lambda$68/330382173.call(Unknown Source) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext$$Lambda$69/680712932.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) Caused by: java.lang.NullPointerException: Conf non-local session path expected to be non-null at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:208) at org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:669) at org.apache.flink.table.planner.delegation.hive.HiveParser.clearSessionState(HiveParser.java:376) at org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:219) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) at com.shizhengchao.io.FlinkSqlStreamingPlatform.callFlinkSql(FlinkSqlStreamingPlatform.java:157) at com.shizhengchao.io.FlinkSqlStreamingPlatform.callCommand(FlinkSqlStreamingPlatform.java:129) at com.shizhengchao.io.FlinkSqlStreamingPlatform.run(FlinkSqlStreamingPlatform.java:91) at com.shizhengchao.io.FlinkSqlStreamingPlatform.main(FlinkSqlStreamingPlatform.java:66) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ... 13 common frames omitted {code} My guess is that sessionstate is not set to threadlocal: {code:java} //代码占位符 // @see org.apache.hadoop.hive.ql.session.SessionState.setCurrentSessionState public static void setCurrentSessionState(SessionState startSs) { tss.get().attach(startSs); } {code} was: My sql code is as follows: {code:java} //代码占位符 CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-conf-dir' = '/home/service/upload-job-file/1624269463008' ); use catalog hive; set 'table.sql-dialect' = 'hive'; create view if not exists view_test as select cast(goods_id as string) as goods_id, cast(depot_id as string) as depot_id, cast(product_id as string) as product_id, cast(tenant_code as string) as tenant_code from edw.dim_yezi_whse_goods_base_info/*+ OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */; {code} and the exception is as follows: {code:java} //代码占位符 org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Conf non-local session path expected to be non-null at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) at
[jira] [Updated] (FLINK-23096) HiveParser could not attach the sessionstate of hive
[ https://issues.apache.org/jira/browse/FLINK-23096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shizhengchao updated FLINK-23096: - Description: My sql code is as follows: {code:java} //代码占位符 CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-conf-dir' = '/home/service/upload-job-file/1624269463008' ); use catalog hive; set 'table.sql-dialect' = 'hive'; create view if not exists view_test as select cast(goods_id as string) as goods_id, cast(depot_id as string) as depot_id, cast(product_id as string) as product_id, cast(tenant_code as string) as tenant_code from edw.dim_yezi_whse_goods_base_info/*+ OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */; {code} and the exception is as follows: {code:java} //代码占位符 org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Conf non-local session path expected to be non-null at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) at org.apache.flink.client.cli.CliFrontend$$Lambda$68/330382173.call(Unknown Source) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext$$Lambda$69/680712932.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) Caused by: java.lang.NullPointerException: Conf non-local session path expected to be non-null at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:208) at org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:669) at org.apache.flink.table.planner.delegation.hive.HiveParser.clearSessionState(HiveParser.java:376) at org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:219) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) at com.shizhengchao.io.FlinkSqlStreamingPlatform.callFlinkSql(FlinkSqlStreamingPlatform.java:157) at com.shizhengchao.io.FlinkSqlStreamingPlatform.callCommand(FlinkSqlStreamingPlatform.java:129) at com.shizhengchao.io.FlinkSqlStreamingPlatform.run(FlinkSqlStreamingPlatform.java:91) at com.shizhengchao.io.FlinkSqlStreamingPlatform.main(FlinkSqlStreamingPlatform.java:66) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ... 13 common frames omitted {code} was: My sql code is as follows: {code:java} //代码占位符 CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-conf-dir' = '/home/service/upload-job-file/1624269463008' ); use catalog hive; set 'table.sql-dialect' = 'hive'; create view if not exists view_test as select cast(goods_id as string) as goods_id, cast(depot_id as string) as depot_id, cast(product_id as string) as product_id, cast(tenant_code as string) as tenant_code from edw.dim_yezi_whse_goods_base_info/*+ OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */; {code} > HiveParser could not attach the sessionstate of hive > > > Key: FLINK-23096 > URL: https://issues.apache.org/jira/browse/FLINK-23096 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.13.1 >Reporter: shizhengchao >Priority: Major > > My sql code is as follows: > {code:java} > //代码占位符 > CREATE CATALOG myhive WITH ( > 'type' = 'hive', > 'default-database' = 'default', > 'hive-conf-dir' =
[jira] [Created] (FLINK-23096) HiveParser could not attach the sessionstate of hive
shizhengchao created FLINK-23096: Summary: HiveParser could not attach the sessionstate of hive Key: FLINK-23096 URL: https://issues.apache.org/jira/browse/FLINK-23096 Project: Flink Issue Type: Bug Components: Connectors / Hive Affects Versions: 1.13.1 Reporter: shizhengchao My sql code is as follows: {code:java} //代码占位符 CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-conf-dir' = '/home/service/upload-job-file/1624269463008' ); use catalog hive; set 'table.sql-dialect' = 'hive'; create view if not exists view_test as select cast(goods_id as string) as goods_id, cast(depot_id as string) as depot_id, cast(product_id as string) as product_id, cast(tenant_code as string) as tenant_code from edw.dim_yezi_whse_goods_base_info/*+ OPTIONS('streaming-source.consume-start-offset'='dayno=20210621') */; {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19446) canal-json has a situation that -U and +U are equal, when updating the null field to be non-null
[ https://issues.apache.org/jira/browse/FLINK-19446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268342#comment-17268342 ] shizhengchao commented on FLINK-19446: -- [~Balro],i think your patch would be work well. > canal-json has a situation that -U and +U are equal, when updating the null > field to be non-null > > > Key: FLINK-19446 > URL: https://issues.apache.org/jira/browse/FLINK-19446 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.11.1 >Reporter: shizhengchao >Assignee: Nicholas Jiang >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > > line 118 in CanalJsonDeserializationSchema#deserialize method: > {code:java} > GenericRowData after = (GenericRowData) data.getRow(i, fieldCount); > GenericRowData before = (GenericRowData) old.getRow(i, fieldCount); > for (int f = 0; f < fieldCount; f++) { > if (before.isNullAt(f)) { > // not null fields in "old" (before) means the fields are > changed > // null/empty fields in "old" (before) means the fields are not > changed > // so we just copy the not changed fields into before > before.setField(f, after.getField(f)); > } > } > before.setRowKind(RowKind.UPDATE_BEFORE); > after.setRowKind(RowKind.UPDATE_AFTER); > {code} > if a field is null before update,it will cause -U and +U to be equal -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20995) Calling execute() on org.apache.flink.table.api.Table Throws Exception
[ https://issues.apache.org/jira/browse/FLINK-20995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17267996#comment-17267996 ] shizhengchao commented on FLINK-20995: -- Did you submit the task through the web interface ? > Calling execute() on org.apache.flink.table.api.Table Throws Exception > -- > > Key: FLINK-20995 > URL: https://issues.apache.org/jira/browse/FLINK-20995 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0 > Environment: Flink version 1.12.0 > Kubernetes Standalone Cluster (Session Mode) > uname -a > Linux flink-jobmanager-664c4b8f46-77llc 3.10.0-957.el7.x86_64 #1 SMP Thu Nov > 8 23:39:32 UTC 2018 x86_64 GNU/Linux >Reporter: Robert Cullen >Priority: Blocker > Fix For: 1.13.0 > > > Exception on this line: > {code:java} > try (CloseableIterator iterator = log_counts.execute().collect()) { > ... > {code} > Here's the code snippet: (See Stack Trace below) > {code:java} > ... > > final EnvironmentSettings settings = > EnvironmentSettings.newInstance().inStreamingMode().build(); > final TableEnvironment tEnv = TableEnvironment.create(settings); > String ddl = "CREATE TABLE log_counts (\n" + > " msg_id STRING,\n" + > " hostname STRING,\n" + > " last_updated TIMESTAMP(3),\n" + > " WATERMARK FOR last_updated AS last_updated - INTERVAL '5' > SECOND\n" + > ") WITH (\n" + > " 'connector.type' = 'jdbc',\n" + > " 'connector.url' = > 'jdbc:postgresql://cmdaa-postgres.cmdaa.svc.cluster.local:5432/postgres',\n" + > " 'connector.table' = 'chi_logger_intake',\n" + > " 'connector.driver' = 'org.postgresql.Driver',\n" + > " 'connector.username' = 'user',\n" + > " 'connector.password' = 'password'\n" + > ")"; > tEnv.executeSql(ddl); > Table log_counts = tEnv.from("log_counts") > .filter($("hostname").isNotNull() > .and($("hostname").isNotEqual(""))) > .window(Tumble > .over(lit(5).minutes()) > .on($("last_updated")).as("w")) > .groupBy($("msg_id"), $("hostname"), $("w")) > .select($("msg_id"), > $("hostname"), > $("msg_id").count().as("cnt")); > try (CloseableIterator iterator = > log_counts.execute().collect()) { > final List materializedUpdates = new ArrayList<>(); > iterator.forEachRemaining( > row -> { > final RowKind kind = row.getKind(); > switch (kind) { > case INSERT: > case UPDATE_AFTER: > row.setKind(RowKind.INSERT); // for full > equality > materializedUpdates.add(row); > break; > case UPDATE_BEFORE: > case DELETE: > row.setKind(RowKind.INSERT); // for full > equality > materializedUpdates.remove(row); > break; > } > }); > // show the final output table if the result is bounded, > // the output should exclude San Antonio because it has a smaller > population than > // Houston or Dallas in Texas (TX) > materializedUpdates.forEach(System.out::println); > }{code} > > Stack Trace: > {code:java} > 2021-01-15 16:52:00,628 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Configuring > the job submission via query parameters is deprecated. Please migrate to > submitting a JSON request instead. > 2021-01-15 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - > Starting program (detached: true) > 2021-01-15 16:52:00,678 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted. > 2021-01-15 16:52:00,678 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1. > 2021-01-15 16:52:00,830 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received > JobGraph submission 84c9f12fe943bc7f32ee637666ed3bc1 (collect). > 2021-01-15 16:52:00,830 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting job > 84c9f12fe943bc7f32ee637666ed3bc1 (collect). >
[jira] [Commented] (FLINK-20660) Time window operator with computed column triggers an exception in batch mode
[ https://issues.apache.org/jira/browse/FLINK-20660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17251616#comment-17251616 ] shizhengchao commented on FLINK-20660: -- [~sujun1020], i think the problem is the longToTimestamp method can not convert a long to timestamp(3), so you can try use this way: > Time window operator with computed column triggers an exception in batch mode > - > > Key: FLINK-20660 > URL: https://issues.apache.org/jira/browse/FLINK-20660 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.0 >Reporter: sujun >Priority: Minor > > {{Time window operator with computed column triggers an exception in batch > mode, it may be a bug in BatchExecWindowAggregateRule.}} > {{My test code:}} > {code:java} > public class WindowAggWithBigintTest { > public static void main(String[] args) throws Exception { > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment tEnv = TableEnvironment.create(settings); > > tEnv.registerFunction("longToTimestamp",new LongToTimestamp()); > > String ddl = "CREATE TABLE source(occur_time bigint,rowtime AS > longToTimestamp(occur_time)) WITH ('connector' = 'filesystem', 'format' = > 'orc', 'path' = '/path/to/orc')"; > tEnv.executeSql(ddl); > Table table = tEnv.sqlQuery("select TUMBLE_START(rowtime, > INTERVAL '1' HOUR) as ts,count(1) as ct from source group by TUMBLE(rowtime, > INTERVAL '1' HOUR)"); > DiscardingOutputFormat outputFormat = new > DiscardingOutputFormat(); > TableResultSink tableResultSink = new > TableResultSink(table.getSchema(), outputFormat); > tEnv.registerTableSink("sink",tableResultSink); > table.insertInto("sink"); > tEnv.execute("test"); > } > private static class TableResultSink implements StreamTableSink > { > private final TableSchema schema; > private final DataType rowType; > private final OutputFormat outputFormat; > > TableResultSink(TableSchema schema, OutputFormat > outputFormat) { > this.schema = schema; > this.rowType = schema.toRowDataType(); > this.outputFormat = outputFormat; > } > @Override > public DataType getConsumedDataType() { > return rowType; > } > @Override > public TableSchema getTableSchema() { > return schema; > } > @Override > public TableSink configure(String[] fieldNames, > TypeInformation[] fieldTypes) { > throw new UnsupportedOperationException( > "This sink is configured by passing a static > schema when initiating"); > } > @Override > public DataStreamSink consumeDataStream(DataStream > dataStream) { > return > dataStream.writeUsingOutputFormat(outputFormat).setParallelism(1).name("tableResult"); > } > } > } > {code} > > Exception: > > {code:java} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule BatchExecWindowAggregateRule, args > [rel#264:FlinkLogicalWindowAggregate.LOGICAL.any.[](input=RelSubset#263,group={},ct=COUNT(),window=TumblingGroupWindow('w$, > $f0, 360),properties=w$start, w$end, w$rowtime), > rel#250:FlinkLogicalLegacyTableSourceScan.LOGICAL.any.[](table=[default_catalog, > default_database, source, source: [FileSystemTableSource(occur_time, > rowtime)]],fields=occur_time, rowtime)] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:244) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:636) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at
[jira] [Comment Edited] (FLINK-20660) Time window operator with computed column triggers an exception in batch mode
[ https://issues.apache.org/jira/browse/FLINK-20660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17251616#comment-17251616 ] shizhengchao edited comment on FLINK-20660 at 12/18/20, 9:08 AM: - [~sujun1020], i think the problem is the longToTimestamp method can not convert a long to timestamp(3), so you can try use this way: {code:java} `rowtime AS CAST(occur_time AS TIMESTAMP(3))` {code} was (Author: tinny): [~sujun1020], i think the problem is the longToTimestamp method can not convert a long to timestamp(3), so you can try use this way: > Time window operator with computed column triggers an exception in batch mode > - > > Key: FLINK-20660 > URL: https://issues.apache.org/jira/browse/FLINK-20660 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.0 >Reporter: sujun >Priority: Minor > > {{Time window operator with computed column triggers an exception in batch > mode, it may be a bug in BatchExecWindowAggregateRule.}} > {{My test code:}} > {code:java} > public class WindowAggWithBigintTest { > public static void main(String[] args) throws Exception { > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment tEnv = TableEnvironment.create(settings); > > tEnv.registerFunction("longToTimestamp",new LongToTimestamp()); > > String ddl = "CREATE TABLE source(occur_time bigint,rowtime AS > longToTimestamp(occur_time)) WITH ('connector' = 'filesystem', 'format' = > 'orc', 'path' = '/path/to/orc')"; > tEnv.executeSql(ddl); > Table table = tEnv.sqlQuery("select TUMBLE_START(rowtime, > INTERVAL '1' HOUR) as ts,count(1) as ct from source group by TUMBLE(rowtime, > INTERVAL '1' HOUR)"); > DiscardingOutputFormat outputFormat = new > DiscardingOutputFormat(); > TableResultSink tableResultSink = new > TableResultSink(table.getSchema(), outputFormat); > tEnv.registerTableSink("sink",tableResultSink); > table.insertInto("sink"); > tEnv.execute("test"); > } > private static class TableResultSink implements StreamTableSink > { > private final TableSchema schema; > private final DataType rowType; > private final OutputFormat outputFormat; > > TableResultSink(TableSchema schema, OutputFormat > outputFormat) { > this.schema = schema; > this.rowType = schema.toRowDataType(); > this.outputFormat = outputFormat; > } > @Override > public DataType getConsumedDataType() { > return rowType; > } > @Override > public TableSchema getTableSchema() { > return schema; > } > @Override > public TableSink configure(String[] fieldNames, > TypeInformation[] fieldTypes) { > throw new UnsupportedOperationException( > "This sink is configured by passing a static > schema when initiating"); > } > @Override > public DataStreamSink consumeDataStream(DataStream > dataStream) { > return > dataStream.writeUsingOutputFormat(outputFormat).setParallelism(1).name("tableResult"); > } > } > } > {code} > > Exception: > > {code:java} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule BatchExecWindowAggregateRule, args > [rel#264:FlinkLogicalWindowAggregate.LOGICAL.any.[](input=RelSubset#263,group={},ct=COUNT(),window=TumblingGroupWindow('w$, > $f0, 360),properties=w$start, w$end, w$rowtime), > rel#250:FlinkLogicalLegacyTableSourceScan.LOGICAL.any.[](table=[default_catalog, > default_database, source, source: [FileSystemTableSource(occur_time, > rowtime)]],fields=occur_time, rowtime)] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:244) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:636) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at >
[jira] [Created] (FLINK-20607) a wrong example in udfs page.
shizhengchao created FLINK-20607: Summary: a wrong example in udfs page. Key: FLINK-20607 URL: https://issues.apache.org/jira/browse/FLINK-20607 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.11.1, 1.12.0 Reporter: shizhengchao Demonstration error of multiple input types in FunctionHint: {code:java} @FunctionHint( input = [@DataTypeHint("INT"), @DataTypeHint("INT")], output = @DataTypeHint("INT") ) {code} should be {code:java} @FunctionHint( input = {@DataTypeHint("INT"), @DataTypeHint("INT")}, output = @DataTypeHint("INT") ) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20570) The `NOTE` tip style is different from the others in process_function page.
[ https://issues.apache.org/jira/browse/FLINK-20570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shizhengchao updated FLINK-20570: - Description: in `/docs/stream/operators/process_function.md`, line 252. The `NOTE` css style is different from the others. {code:java} current is: **NOTE:** and another style is : Note {code} beside: `{% top %}` appears in the middle,need to remove it was: in `/docs/stream/operators/process_function.md`, line 252. The `NOTE` css style is different from the others. {code:java} current is: **NOTE:** and another style is : Note {code} > The `NOTE` tip style is different from the others in process_function page. > --- > > Key: FLINK-20570 > URL: https://issues.apache.org/jira/browse/FLINK-20570 > Project: Flink > Issue Type: Improvement > Components: API / DataStream, Documentation >Affects Versions: 1.12.0 >Reporter: shizhengchao >Priority: Minor > Labels: pull-request-available > > in `/docs/stream/operators/process_function.md`, line 252. The `NOTE` css > style is different from the others. > {code:java} > current is: **NOTE:** > and another style is : Note > {code} > beside: > `{% top %}` appears in the middle,need to remove it -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20570) `/docs/stream/operators/process_function.md`, line 252. The `NOTE tip` css style is different from the others.
shizhengchao created FLINK-20570: Summary: `/docs/stream/operators/process_function.md`, line 252. The `NOTE tip` css style is different from the others. Key: FLINK-20570 URL: https://issues.apache.org/jira/browse/FLINK-20570 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.12.0 Reporter: shizhengchao in `/docs/stream/operators/process_function.md`, line 252. The `NOTE` css style is different from the others. {code:java} current is: **NOTE:** and another style is : Note {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20200) SQL Hints are not supported in "Create View" syntax
shizhengchao created FLINK-20200: Summary: SQL Hints are not supported in "Create View" syntax Key: FLINK-20200 URL: https://issues.apache.org/jira/browse/FLINK-20200 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.11.2 Reporter: shizhengchao I have aready set the config option `table.dynamic-table-options.enabled` to be true, but "SQL Hints" are not supported in View syntax. I got an error: {code:java} Exception in thread "main" java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlSyntax$6: SPECIAL at org.apache.calcite.util.Util.needToImplement(Util.java:967) at org.apache.calcite.sql.SqlSyntax$6.unparse(SqlSyntax.java:116) at org.apache.calcite.sql.SqlOperator.unparse(SqlOperator.java:333) at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:470) at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:101) at org.apache.calcite.sql.SqlSelectOperator.unparse(SqlSelectOperator.java:176) at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:470) at org.apache.calcite.sql.SqlSelect.unparse(SqlSelect.java:246) at org.apache.calcite.sql.SqlNode.toSqlString(SqlNode.java:151) at org.apache.calcite.sql.SqlNode.toSqlString(SqlNode.java:173) at org.apache.calcite.sql.SqlNode.toSqlString(SqlNode.java:182) at org.apache.flink.table.planner.operations.SqlToOperationConverter.getQuotedSqlString(SqlToOperationConverter.java:784) at org.apache.flink.table.planner.utils.Expander$Expanded.substitute(Expander.java:169) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertViewQuery(SqlToOperationConverter.java:694) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateView(SqlToOperationConverter.java:665) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:228) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) {code} The sql code is as follows: {code:java} drop table if exists SourceA; create table SourceA ( idstring, name string ) with ( 'connector' = 'kafka-0.11', 'topic' = 'MyTopic', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'Test', 'scan.startup.mode' = 'group-offsets', 'format' = 'csv' ); drop table if exists print; create table print ( idstring, name string ) with ( 'connector' = 'print' ); drop view if exists test_view; create view test_view as select * from SourceA /*+ OPTIONS('properties.group.id'='NewGroup') */; insert into print select * from test_view; {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19723) The retry mechanism of jdbc connector has the risk of data duplication
[ https://issues.apache.org/jira/browse/FLINK-19723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17222726#comment-17222726 ] shizhengchao commented on FLINK-19723: -- Hi [~jark], after testing, this is not a bug, could you close the issue?:D Generally, after executing executeBatch in jdbc, regardless of success or failure, the list of preparedStatements will be cleared > The retry mechanism of jdbc connector has the risk of data duplication > -- > > Key: FLINK-19723 > URL: https://issues.apache.org/jira/browse/FLINK-19723 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: 1.11.1 >Reporter: shizhengchao >Assignee: shizhengchao >Priority: Major > Labels: pull-request-available > > for example, if statement.executeBatch() failur for some reason, but the > "statement" was not closed in the retry, there is a risk of data duplication: > {code:java} > for (int i = 1; i <= executionOptions.getMaxRetries(); i++) { > try { > attemptFlush(); > batchCount = 0; > break; > } catch (SQLException e) { > LOG.error("JDBC executeBatch error, retry times = {}", i, e); > if (i >= executionOptions.getMaxRetries()) { > throw new IOException(e); > } > try { > if > (!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) { > connection = > connectionProvider.reestablishConnection(); > jdbcStatementExecutor.closeStatements(); > > jdbcStatementExecutor.prepareStatements(connection); > } > } catch (Exception excpetion) { > LOG.error("JDBC connection is not valid, and > reestablish connection failed.", excpetion); > throw new IOException("Reestablish JDBC connection > failed", excpetion); > } > try { > Thread.sleep(1000 * i); > } catch (InterruptedException ex) { > Thread.currentThread().interrupt(); > throw new IOException("unable to flush; interrupted > while doing another attempt", e); > } > } > } > {code} > the correct code should be: > {code:java} > try { > if > (!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) { > connection = > connectionProvider.reestablishConnection(); > } > jdbcStatementExecutor.closeStatements(); > jdbcStatementExecutor.prepareStatements(connection); > } catch (Exception excpetion) { > LOG.error("JDBC connection is not valid, and > reestablish connection failed.", excpetion); > throw new IOException("Reestablish JDBC connection > failed", excpetion); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19818) ArrayIndexOutOfBoundsException occus when the source table have nest json
[ https://issues.apache.org/jira/browse/FLINK-19818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shizhengchao updated FLINK-19818: - Description: I get an *ArrayIndexOutOfBoundsException* , when my table source have nest json. as the follows is my test: {code:sql} CREATE TABLE Orders ( nest ROW< idBIGINT, consumerName STRING, price DECIMAL(10, 5), productName STRING >, proctime AS PROCTIME() ) WITH ( 'connector' = 'kafka-0.11', 'topic' = 'Orders', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE print ( orderId BIGINT, consumerName STRING, price DECIMAL(10, 5), productName STRING ) WITH ( 'connector' = 'print' ); CREATE VIEW testView AS SELECT id, consumerName, price, productName FROM ( SELECT * FROM Orders ); INSERT INTO print SELECT * FROM testView; {code} The following is the exception of flink: {code} Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(ArrayList.java:422) at java.util.ArrayList.get(ArrayList.java:435) at org.apache.calcite.sql.validate.SelectNamespace.getMonotonicity(SelectNamespace.java:73) at org.apache.calcite.sql.SqlIdentifier.getMonotonicity(SqlIdentifier.java:375) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4132) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertViewQuery(SqlToOperationConverter.java:696) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateView(SqlToOperationConverter.java:665) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:228) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) at com.fcbox.streaming.sql.submit.StreamingJob.callExecuteSql(StreamingJob.java:239) at com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:207) at com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:133) at com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:77) {code} was: I get an *ArrayIndexOutOfBoundsException* , when my table source have nest json. as the follows is my test: {code:sql} CREATE TABLE Orders ( nest ROW< idBIGINT, consumerName STRING, price DECIMAL(10, 5), productName STRING >, proctime AS PROCTIME() ) WITH ( 'connector' = 'kafka-0.11', 'topic' = 'Orders', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE print ( orderId BIGINT, consumerName STRING, price DECIMAL(10, 5), productName STRING ) WITH ( 'connector' = 'print' ); CREATE VIEW testView AS SELECT id, consumerName, price, productName FROM ( SELECT * FROM Orders ); INSERT INTO print SELECT * FROM testView; {code} The following is the exception of flink: {code} Unable to find source-code formatter for language: log. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yamlException in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(ArrayList.java:422) at java.util.ArrayList.get(ArrayList.java:435) at org.apache.calcite.sql.validate.SelectNamespace.getMonotonicity(SelectNamespace.java:73) at org.apache.calcite.sql.SqlIdentifier.getMonotonicity(SqlIdentifier.java:375) at
[jira] [Updated] (FLINK-19818) ArrayIndexOutOfBoundsException occus when the source table have nest json
[ https://issues.apache.org/jira/browse/FLINK-19818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shizhengchao updated FLINK-19818: - Description: I get an *ArrayIndexOutOfBoundsException* , when my table source have nest json. as the follows is my test: {code:sql} CREATE TABLE Orders ( nest ROW< idBIGINT, consumerName STRING, price DECIMAL(10, 5), productName STRING >, proctime AS PROCTIME() ) WITH ( 'connector' = 'kafka-0.11', 'topic' = 'Orders', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE print ( orderId BIGINT, consumerName STRING, price DECIMAL(10, 5), productName STRING ) WITH ( 'connector' = 'print' ); CREATE VIEW testView AS SELECT id, consumerName, price, productName FROM ( SELECT * FROM Orders ); INSERT INTO print SELECT * FROM testView; {code} The following is the exception of flink: {code} Unable to find source-code formatter for language: log. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yamlException in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(ArrayList.java:422) at java.util.ArrayList.get(ArrayList.java:435) at org.apache.calcite.sql.validate.SelectNamespace.getMonotonicity(SelectNamespace.java:73) at org.apache.calcite.sql.SqlIdentifier.getMonotonicity(SqlIdentifier.java:375) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4132) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertViewQuery(SqlToOperationConverter.java:696) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateView(SqlToOperationConverter.java:665) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:228) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) at com.fcbox.streaming.sql.submit.StreamingJob.callExecuteSql(StreamingJob.java:239) at com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:207) at com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:133) at com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:77) {code} was: I get an *ArrayIndexOutOfBoundsException* in *Interval Joins*, when my table source have nest json. as the follows is my test: {code:sql} CREATE TABLE Orders ( nest ROW< idBIGINT, consumerName STRING, price DECIMAL(10, 5), productName STRING >, proctime AS PROCTIME() ) WITH ( 'connector' = 'kafka-0.11', 'topic' = 'Orders', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); DROP TABLE IF EXISTS Shipments; CREATE TABLE Shipments ( idBIGINT, orderId BIGINT, originSTRING, destnationSTRING, isArrived BOOLEAN, proctime AS PROCTIME() ) WITH ( 'connector' = 'kafka-0.11', 'topic' = 'Shipments', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); DROP TABLE IF EXISTS print; CREATE TABLE print ( orderId BIGINT, consumerName STRING, price DECIMAL(10, 5), productName STRING, originSTRING, destnationSTRING, isArrived BOOLEAN ) WITH ( 'connector' = 'print' ); DROP VIEW IF EXISTS IntervalJoinView; CREATE VIEW IntervalJoinView AS SELECT o.id, o.consumerName, o.price, o.productName,
[jira] [Updated] (FLINK-19818) ArrayIndexOutOfBoundsException occus when the source table have nest json
[ https://issues.apache.org/jira/browse/FLINK-19818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shizhengchao updated FLINK-19818: - Summary: ArrayIndexOutOfBoundsException occus when the source table have nest json (was: ArrayIndexOutOfBoundsException occus in 'Interval Joins' when the source table have nest json) > ArrayIndexOutOfBoundsException occus when the source table have nest json > -- > > Key: FLINK-19818 > URL: https://issues.apache.org/jira/browse/FLINK-19818 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.2 >Reporter: shizhengchao >Priority: Major > > I get an *ArrayIndexOutOfBoundsException* in *Interval Joins*, when my table > source have nest json. as the follows is my test: > {code:sql} > CREATE TABLE Orders ( > nest ROW< > idBIGINT, > consumerName STRING, > price DECIMAL(10, 5), > productName STRING > >, > proctime AS PROCTIME() > ) WITH ( > 'connector' = 'kafka-0.11', > 'topic' = 'Orders', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'testGroup', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ); > DROP TABLE IF EXISTS Shipments; > CREATE TABLE Shipments ( > idBIGINT, > orderId BIGINT, > originSTRING, > destnationSTRING, > isArrived BOOLEAN, > proctime AS PROCTIME() > ) WITH ( > 'connector' = 'kafka-0.11', > 'topic' = 'Shipments', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'testGroup', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ); > DROP TABLE IF EXISTS print; > CREATE TABLE print ( > orderId BIGINT, > consumerName STRING, > price DECIMAL(10, 5), > productName STRING, > originSTRING, > destnationSTRING, > isArrived BOOLEAN > ) WITH ( > 'connector' = 'print' > ); > DROP VIEW IF EXISTS IntervalJoinView; > CREATE VIEW IntervalJoinView AS > SELECT > o.id, > o.consumerName, > o.price, > o.productName, > s.origin, > s.destnation, > s.isArrived > FROM > (SELECT * FROM Orders) o, > (SELECT * FROM Shipments) s > WHERE s.orderId = o.id AND o.proctime BETWEEN s.proctime - INTERVAL '4' HOUR > AND s.proctime; > INSERT INTO print > SELECT > id, > consumerName, > price, > productName, > origin, > destnation, > isArrived > FROM IntervalJoinView; > {code} > The following is the exception of flink: > {code:log} > Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1 > at java.util.ArrayList.elementData(ArrayList.java:422) > at java.util.ArrayList.get(ArrayList.java:435) > at > org.apache.calcite.sql.validate.SelectNamespace.getMonotonicity(SelectNamespace.java:73) > at > org.apache.calcite.sql.SqlIdentifier.getMonotonicity(SqlIdentifier.java:375) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4132) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertViewQuery(SqlToOperationConverter.java:696) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateView(SqlToOperationConverter.java:665) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:228) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > at > com.fcbox.streaming.sql.submit.StreamingJob.callExecuteSql(StreamingJob.java:239) > at > com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:207) > at > com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:133) > at > com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:77) > {code}
[jira] [Commented] (FLINK-19818) ArrayIndexOutOfBoundsException occus in 'Interval Joins' when the source table have nest json
[ https://issues.apache.org/jira/browse/FLINK-19818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17221118#comment-17221118 ] shizhengchao commented on FLINK-19818: -- After my test, only `select * from Orders` will report an error, while use `select id, consumerName, price, productName, proctime from Orders` will be OK > ArrayIndexOutOfBoundsException occus in 'Interval Joins' when the source > table have nest json > - > > Key: FLINK-19818 > URL: https://issues.apache.org/jira/browse/FLINK-19818 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.2 >Reporter: shizhengchao >Priority: Major > > I get an *ArrayIndexOutOfBoundsException* in *Interval Joins*, when my table > source have nest json. as the follows is my test: > {code:sql} > CREATE TABLE Orders ( > nest ROW< > idBIGINT, > consumerName STRING, > price DECIMAL(10, 5), > productName STRING > >, > proctime AS PROCTIME() > ) WITH ( > 'connector' = 'kafka-0.11', > 'topic' = 'Orders', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'testGroup', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ); > DROP TABLE IF EXISTS Shipments; > CREATE TABLE Shipments ( > idBIGINT, > orderId BIGINT, > originSTRING, > destnationSTRING, > isArrived BOOLEAN, > proctime AS PROCTIME() > ) WITH ( > 'connector' = 'kafka-0.11', > 'topic' = 'Shipments', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'testGroup', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ); > DROP TABLE IF EXISTS print; > CREATE TABLE print ( > orderId BIGINT, > consumerName STRING, > price DECIMAL(10, 5), > productName STRING, > originSTRING, > destnationSTRING, > isArrived BOOLEAN > ) WITH ( > 'connector' = 'print' > ); > DROP VIEW IF EXISTS IntervalJoinView; > CREATE VIEW IntervalJoinView AS > SELECT > o.id, > o.consumerName, > o.price, > o.productName, > s.origin, > s.destnation, > s.isArrived > FROM > (SELECT * FROM Orders) o, > (SELECT * FROM Shipments) s > WHERE s.orderId = o.id AND o.proctime BETWEEN s.proctime - INTERVAL '4' HOUR > AND s.proctime; > INSERT INTO print > SELECT > id, > consumerName, > price, > productName, > origin, > destnation, > isArrived > FROM IntervalJoinView; > {code} > The following is the exception of flink: > {code:log} > Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1 > at java.util.ArrayList.elementData(ArrayList.java:422) > at java.util.ArrayList.get(ArrayList.java:435) > at > org.apache.calcite.sql.validate.SelectNamespace.getMonotonicity(SelectNamespace.java:73) > at > org.apache.calcite.sql.SqlIdentifier.getMonotonicity(SqlIdentifier.java:375) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4132) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertViewQuery(SqlToOperationConverter.java:696) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateView(SqlToOperationConverter.java:665) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:228) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > at > com.fcbox.streaming.sql.submit.StreamingJob.callExecuteSql(StreamingJob.java:239) > at > com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:207) > at > com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:133) > at >
[jira] [Created] (FLINK-19818) ArrayIndexOutOfBoundsException occus in 'Interval Joins' when the source table have nest json
shizhengchao created FLINK-19818: Summary: ArrayIndexOutOfBoundsException occus in 'Interval Joins' when the source table have nest json Key: FLINK-19818 URL: https://issues.apache.org/jira/browse/FLINK-19818 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.11.2 Reporter: shizhengchao I get an *ArrayIndexOutOfBoundsException* in *Interval Joins*, when my table source have nest json. as the follows is my test: {code:sql} CREATE TABLE Orders ( nest ROW< idBIGINT, consumerName STRING, price DECIMAL(10, 5), productName STRING >, proctime AS PROCTIME() ) WITH ( 'connector' = 'kafka-0.11', 'topic' = 'Orders', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); DROP TABLE IF EXISTS Shipments; CREATE TABLE Shipments ( idBIGINT, orderId BIGINT, originSTRING, destnationSTRING, isArrived BOOLEAN, proctime AS PROCTIME() ) WITH ( 'connector' = 'kafka-0.11', 'topic' = 'Shipments', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); DROP TABLE IF EXISTS print; CREATE TABLE print ( orderId BIGINT, consumerName STRING, price DECIMAL(10, 5), productName STRING, originSTRING, destnationSTRING, isArrived BOOLEAN ) WITH ( 'connector' = 'print' ); DROP VIEW IF EXISTS IntervalJoinView; CREATE VIEW IntervalJoinView AS SELECT o.id, o.consumerName, o.price, o.productName, s.origin, s.destnation, s.isArrived FROM (SELECT * FROM Orders) o, (SELECT * FROM Shipments) s WHERE s.orderId = o.id AND o.proctime BETWEEN s.proctime - INTERVAL '4' HOUR AND s.proctime; INSERT INTO print SELECT id, consumerName, price, productName, origin, destnation, isArrived FROM IntervalJoinView; {code} The following is the exception of flink: {code:log} Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(ArrayList.java:422) at java.util.ArrayList.get(ArrayList.java:435) at org.apache.calcite.sql.validate.SelectNamespace.getMonotonicity(SelectNamespace.java:73) at org.apache.calcite.sql.SqlIdentifier.getMonotonicity(SqlIdentifier.java:375) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4132) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertViewQuery(SqlToOperationConverter.java:696) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateView(SqlToOperationConverter.java:665) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:228) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) at com.fcbox.streaming.sql.submit.StreamingJob.callExecuteSql(StreamingJob.java:239) at com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:207) at com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:133) at com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:77) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19629) Avro format cause NullPointException,as null value in MAP type's value type
[ https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218199#comment-17218199 ] shizhengchao commented on FLINK-19629: -- Hi, [~jark] Can you help me review the code, thank you very mutch. > Avro format cause NullPointException,as null value in MAP type's value type > > > Key: FLINK-19629 > URL: https://issues.apache.org/jira/browse/FLINK-19629 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.11.2 >Reporter: shizhengchao >Assignee: shizhengchao >Priority: Critical > Labels: pull-request-available > Fix For: 1.12.0 > > > create table tableA ( > name STRING, > hobly MAP, > phone STRING > ) with ( > 'connector' = 'kafka-0.11', > 'topic' = 'ShizcTest', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'ShizcTest', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'avro' > ); > if hobly have an null value like this: > {"name": "shizc", "hobly": {"key1":null}, "phone": "1104564"} > cause an NullPointException: > {code:java} > java.io.IOException: Failed to deserialize Avro record. > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75) > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:147) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > Caused by: java.lang.NullPointerException: null > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createConverter$57e941b$5(AvroRowDataDeserializationSchema.java:252) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createMapConverter$7941d275$1(AvroRowDataDeserializationSchema.java:315) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:221) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:206) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:148) > ... 8 common frames omitted > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19735) TableFunction can not work in Flink View
[ https://issues.apache.org/jira/browse/FLINK-19735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217535#comment-17217535 ] shizhengchao commented on FLINK-19735: -- Hi [~jark], my flink version is 1.11.1, I found that this problem has been resolved in version 1.11.2, see FLINK-18750 > TableFunction can not work in Flink View > > > Key: FLINK-19735 > URL: https://issues.apache.org/jira/browse/FLINK-19735 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.11.1 >Reporter: shizhengchao >Priority: Major > > TableFunction can't be work in Flink Sql. Here is my code: > {code:sql} > CREATE TABLE test ( > myField STRING, > name STRING > ) WITH ( > 'connector' = 'kafka-0.11', > 'topic' = '', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'mygroup', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'csv' > ); > CREATE TABLE print ( > myField STRING, > newWord STRING, > newLength INT > ) WITH ( > 'connector' = 'print' > ); > CREATE VIEW test_view AS > SELECT myField, newWord, newLength FROM test, LATERAL > TABLE(SplitFunction(myField)); > INSERT INTO print > SELECT * FROM test_view; > {code} > And the function code as this: > {code:java} > @FunctionHint(output = @DataTypeHint("ROW")) > public class SplitFunction extends TableFunction { > public void eval(String str) { > for (String s : str.split(" ")) { > collect(Row.of(s, s.length())); > } > } > } > {code} > run the sql,cause an error: > {code} > Unable to find source-code formatter for language: log. Available languages > are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, > groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, > perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, > yamlException in thread "main" > org.apache.flink.table.api.ValidationException: SQL validation failed. From > line 2, column 17 to line 2, column 23: Column 'newWord' not found in any > table > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > at > org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52) > at > com.fcbox.streaming.sql.submit.StreamingJob.callInsertInto(StreamingJob.java:208) > at > com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:200) > at > com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:129) > at > com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:73) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 2, > column 17 to line 2, column 23: Column 'newWord' not found in any table > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) > at > org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6154) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6140) > at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5574) >
[jira] [Updated] (FLINK-19735) TableFunction can not work in Flink View
[ https://issues.apache.org/jira/browse/FLINK-19735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shizhengchao updated FLINK-19735: - Description: TableFunction can't be work in Flink Sql. Here is my code: {code:sql} CREATE TABLE test ( myField STRING, name STRING ) WITH ( 'connector' = 'kafka-0.11', 'topic' = '', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'mygroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv' ); CREATE TABLE print ( myField STRING, newWord STRING, newLength INT ) WITH ( 'connector' = 'print' ); CREATE VIEW test_view AS SELECT myField, newWord, newLength FROM test, LATERAL TABLE(SplitFunction(myField)); INSERT INTO print SELECT * FROM test_view; {code} And the function code as this: {code:java} @FunctionHint(output = @DataTypeHint("ROW")) public class SplitFunction extends TableFunction { public void eval(String str) { for (String s : str.split(" ")) { collect(Row.of(s, s.length())); } } } {code} run the sql,cause an error: {code} Unable to find source-code formatter for language: log. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yamlException in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 2, column 17 to line 2, column 23: Column 'newWord' not found in any table at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52) at com.fcbox.streaming.sql.submit.StreamingJob.callInsertInto(StreamingJob.java:208) at com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:200) at com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:129) at com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:73) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 2, column 17 to line 2, column 23: Column 'newWord' not found in any table at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991) at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6154) at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6140) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5574) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:452) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4255) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3523) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at
[jira] [Updated] (FLINK-19735) TableFunction can not work in Flink View
[ https://issues.apache.org/jira/browse/FLINK-19735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shizhengchao updated FLINK-19735: - Description: TableFunction can't be work in Flink Sql. Here is my code: {code:sql} CREATE TABLE test ( myField STRING, name STRING ) WITH ( 'connector' = 'kafka-0.11', 'topic' = '', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'mygroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv' ); CREATE TABLE print ( myField STRING, newWord STRING, newLength INT ) WITH ( 'connector' = 'print' ); CREATE VIEW test_view AS SELECT myField, newWord, newLength FROM test, LATERAL TABLE(SplitFunction(myField)); INSERT INTO print SELECT * FROM test_view; {code} And the function code as this: {code:java} @FunctionHint(output = @DataTypeHint("ROW")) public class SplitFunction extends TableFunction { public void eval(String str) { for (String s : str.split(" ")) { collect(Row.of(s, s.length())); } } } {code} run the sql,cause an error: {code} Unable to find source-code formatter for language: log. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yamlException in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 2, column 17 to line 2, column 23: Column 'newWord' not found in any table at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52) at com.fcbox.streaming.sql.submit.StreamingJob.callInsertInto(StreamingJob.java:208) at com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:200) at com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:129) at com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:73) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 2, column 17 to line 2, column 23: Column 'newWord' not found in any table at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991) at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6154) at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6140) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5574) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:452) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4255) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3523) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at
[jira] [Commented] (FLINK-19735) TableFunction can not work in Flink View
[ https://issues.apache.org/jira/browse/FLINK-19735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217527#comment-17217527 ] shizhengchao commented on FLINK-19735: -- Hi, [~jark] , currently, TableFunction only work effect in "INSERT INTO" statement > TableFunction can not work in Flink View > > > Key: FLINK-19735 > URL: https://issues.apache.org/jira/browse/FLINK-19735 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.11.1 >Reporter: shizhengchao >Priority: Major > > TableFunction can't be work in Flink Sql. Here is my code: > {code:sql} > CREATE TABLE test ( > myField STRING, > name STRING > ) WITH ( > 'connector' = 'kafka-0.11', > 'topic' = '', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'mygroup', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'csv' > ); > CREATE TABLE print ( > myField STRING, > newWord STRING, > newLength INT > ) WITH ( > 'connector' = 'print' > ); > CREATE VIEW test_view AS > SELECT myField, newWord, newLength FROM test, LATERAL > TABLE(SplitFunction(myField)); > INSERT INTO print > SELECT * FROM test_view; > {code} > And the function code as this: > {code:java} > @FunctionHint(output = @DataTypeHint("ROW")) > public class SplitFunction extends TableFunction { > public void eval(String str) { > for (String s : str.split(" ")) { > collect(Row.of(s, s.length())); > } > } > } > {code} > run the sql,cause an error: > {code} > Unable to find source-code formatter for language: log. Available languages > are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, > groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, > perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, > yamlException in thread "main" > org.apache.flink.table.api.ValidationException: SQL validation failed. From > line 2, column 17 to line 2, column 23: Column 'newWord' not found in any > table > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > at > org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52) > at > com.fcbox.streaming.sql.submit.StreamingJob.callInsertInto(StreamingJob.java:208) > at > com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:200) > at > com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:129) > at > com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:73) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 2, > column 17 to line 2, column 23: Column 'newWord' not found in any table > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) > at > org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6154) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6140) > at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5574) > at >
[jira] [Created] (FLINK-19735) TableFunction can not work in Flink View
shizhengchao created FLINK-19735: Summary: TableFunction can not work in Flink View Key: FLINK-19735 URL: https://issues.apache.org/jira/browse/FLINK-19735 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.11.1 Reporter: shizhengchao TableFunction can't be work in Flink Sql. Here is my code: {code:sql} CREATE TABLE test ( myField STRING, name STRING ) WITH ( 'connector' = 'kafka-0.11', 'topic' = '', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'mygroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv' ); CREATE TABLE print ( myField STRING, newWord STRING, newLength INT ) WITH ( 'connector' = 'print' ); CREATE VIEW test_view AS SELECT myField, newWord, newLength FROM test, LATERAL TABLE(SplitFunction(myField)); INSERT INTO print SELECT * FROM test_view; {code} And the function code as this: {code:java} @FunctionHint(output = @DataTypeHint("ROW")) public class SplitFunction extends TableFunction { public void eval(String str) { for (String s : str.split(" ")) { collect(Row.of(s, s.length())); } } } {code} run the sql,cause an error: {code} Unable to find source-code formatter for language: log. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yamlException in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 2, column 17 to line 2, column 23: Column 'newWord' not found in any table at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52) at com.fcbox.streaming.sql.submit.StreamingJob.callInsertInto(StreamingJob.java:208) at com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:200) at com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:129) at com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:73) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 2, column 17 to line 2, column 23: Column 'newWord' not found in any table at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991) at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6154) at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6140) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5574) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:452) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4255) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3523) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at
[jira] [Created] (FLINK-19723) The retry mechanism of jdbc connector has the risk of data duplication
shizhengchao created FLINK-19723: Summary: The retry mechanism of jdbc connector has the risk of data duplication Key: FLINK-19723 URL: https://issues.apache.org/jira/browse/FLINK-19723 Project: Flink Issue Type: Bug Components: Connectors / JDBC Affects Versions: 1.11.1 Reporter: shizhengchao for example, if statement.executeBatch() failur for some reason, but the "statement" was not closed in the retry, there is a risk of data duplication: {code:java} for (int i = 1; i <= executionOptions.getMaxRetries(); i++) { try { attemptFlush(); batchCount = 0; break; } catch (SQLException e) { LOG.error("JDBC executeBatch error, retry times = {}", i, e); if (i >= executionOptions.getMaxRetries()) { throw new IOException(e); } try { if (!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) { connection = connectionProvider.reestablishConnection(); jdbcStatementExecutor.closeStatements(); jdbcStatementExecutor.prepareStatements(connection); } } catch (Exception excpetion) { LOG.error("JDBC connection is not valid, and reestablish connection failed.", excpetion); throw new IOException("Reestablish JDBC connection failed", excpetion); } try { Thread.sleep(1000 * i); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); throw new IOException("unable to flush; interrupted while doing another attempt", e); } } } {code} the correct code should be: {code:java} try { if (!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) { connection = connectionProvider.reestablishConnection(); } jdbcStatementExecutor.closeStatements(); jdbcStatementExecutor.prepareStatements(connection); } catch (Exception excpetion) { LOG.error("JDBC connection is not valid, and reestablish connection failed.", excpetion); throw new IOException("Reestablish JDBC connection failed", excpetion); } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18545) Sql api cannot specify flink job name
[ https://issues.apache.org/jira/browse/FLINK-18545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17214576#comment-17214576 ] shizhengchao commented on FLINK-18545: -- How about adding a tableoptions, for example "job.name", and use it in *TableEnvironmentImpl#executeInternal* : {code:java} @Override public TableResult executeInternal(List operations) { List> transformations = translate(operations); List sinkIdentifierNames = extractSinkIdentifierNames(operations); String jobName = "insert-into_" + String.join(",", sinkIdentifierNames); String name = tableConfig.getConfiguration().getString("job.name", jobName); Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, name); ... } {code} > Sql api cannot specify flink job name > - > > Key: FLINK-18545 > URL: https://issues.apache.org/jira/browse/FLINK-18545 > Project: Flink > Issue Type: Improvement > Components: Client / Job Submission, Table SQL / API >Affects Versions: 1.11.0 > Environment: execute sql : > StreamTableEnvironment.executeSql("insert into user_log_sink select user_id, > item_id, category_id, behavior, ts from user_log") > current job name : org.apache.flink.table.api.internal.TableEnvironmentImpl > {code:java} > public TableResult executeInternal(List operations) { > List> transformations = translate(operations); > List sinkIdentifierNames = extractSinkIdentifierNames(operations); > String jobName = "insert-into_" + String.join(",", sinkIdentifierNames); > Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, > jobName); > try { > JobClient jobClient = execEnv.executeAsync(pipeline); > TableSchema.Builder builder = TableSchema.builder(); > Object[] affectedRowCounts = new Long[operations.size()]; > for (int i = 0; i < operations.size(); ++i) { > // use sink identifier name as field name > builder.field(sinkIdentifierNames.get(i), DataTypes.BIGINT()); > affectedRowCounts[i] = -1L; > }return TableResultImpl.builder() > .jobClient(jobClient) > .resultKind(ResultKind.SUCCESS_WITH_CONTENT) > .tableSchema(builder.build()) > .data(Collections.singletonList(Row.of(affectedRowCounts))) > .build(); > } catch (Exception e) { > throw new TableException("Failed to execute sql", e); > } > } > {code} >Reporter: venn wu >Priority: Critical > Labels: pull-request-available > Fix For: 1.12.0, 1.11.3 > > > In Flink 1.11.0, {color:#172b4d}StreamTableEnvironment.executeSql(sql) > {color}will explan and execute job Immediately, The job name will special as > "insert-into_sink-table-name". But we have Multiple sql job will insert into > a same sink table, this is not very friendly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (FLINK-19629) Avro format cause NullPointException,as null value in MAP type's value type
[ https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shizhengchao updated FLINK-19629: - Comment: was deleted (was: Avro union(something, null) spelled incorrectly, should be Avro unions(something, null), not union. and i think, an example nullable types should be provided. So the complete documentation should be like this: In addition to the types listed above, Flink supports reading/writing nullable types, e.g "behavior STRING NULL" . Flink maps nullable types to Avro unions(something, null), where something is the Avro type converted from Flink type.) > Avro format cause NullPointException,as null value in MAP type's value type > > > Key: FLINK-19629 > URL: https://issues.apache.org/jira/browse/FLINK-19629 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.11.2 >Reporter: shizhengchao >Assignee: shizhengchao >Priority: Critical > Fix For: 1.12.0 > > > create table tableA ( > name STRING, > hobly MAP, > phone STRING > ) with ( > 'connector' = 'kafka-0.11', > 'topic' = 'ShizcTest', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'ShizcTest', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'avro' > ); > if hobly have an null value like this: > {"name": "shizc", "hobly": {"key1":null}, "phone": "1104564"} > cause an NullPointException: > {code:java} > java.io.IOException: Failed to deserialize Avro record. > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75) > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:147) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > Caused by: java.lang.NullPointerException: null > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createConverter$57e941b$5(AvroRowDataDeserializationSchema.java:252) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createMapConverter$7941d275$1(AvroRowDataDeserializationSchema.java:315) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:221) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:206) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:148) > ... 8 common frames omitted > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19638) AvroUnionLogicalSerializerTest class compile error
[ https://issues.apache.org/jira/browse/FLINK-19638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17213777#comment-17213777 ] shizhengchao commented on FLINK-19638: -- [~dwysakowicz] thank you , i change avro-tool.jar to 1.10.0.jar, and the tests is normal > AvroUnionLogicalSerializerTest class compile error > -- > > Key: FLINK-19638 > URL: https://issues.apache.org/jira/browse/FLINK-19638 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.11.2 >Reporter: shizhengchao >Priority: Minor > > the constructor parameter is java.lang.Long of class UnionLogicalType , but > the tests is java.time.Instant > {code:java} > @Override > protected UnionLogicalType[] getTestData() { > final Random rnd = new Random(); > final UnionLogicalType[] data = new UnionLogicalType[20]; > for (int i = 0; i < data.length; i++) { > data[i] = new > UnionLogicalType(Instant.ofEpochMilli(rnd.nextLong())); > } > return data; > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19638) AvroUnionLogicalSerializerTest class compile error
[ https://issues.apache.org/jira/browse/FLINK-19638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17213774#comment-17213774 ] shizhengchao commented on FLINK-19638: -- Flink version is 1.12-SNAPSHOT. I generated the code with avro-tools-1.7.7.jar, to generated the code of the package 'org.apache.flink.formats.avro.generated' > AvroUnionLogicalSerializerTest class compile error > -- > > Key: FLINK-19638 > URL: https://issues.apache.org/jira/browse/FLINK-19638 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.11.2 >Reporter: shizhengchao >Priority: Minor > > the constructor parameter is java.lang.Long of class UnionLogicalType , but > the tests is java.time.Instant > {code:java} > @Override > protected UnionLogicalType[] getTestData() { > final Random rnd = new Random(); > final UnionLogicalType[] data = new UnionLogicalType[20]; > for (int i = 0; i < data.length; i++) { > data[i] = new > UnionLogicalType(Instant.ofEpochMilli(rnd.nextLong())); > } > return data; > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19638) AvroUnionLogicalSerializerTest class compile error
shizhengchao created FLINK-19638: Summary: AvroUnionLogicalSerializerTest class compile error Key: FLINK-19638 URL: https://issues.apache.org/jira/browse/FLINK-19638 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.11.2 Reporter: shizhengchao the constructor parameter is java.lang.Long of class UnionLogicalType , but the tests is java.time.Instant {code:java} @Override protected UnionLogicalType[] getTestData() { final Random rnd = new Random(); final UnionLogicalType[] data = new UnionLogicalType[20]; for (int i = 0; i < data.length; i++) { data[i] = new UnionLogicalType(Instant.ofEpochMilli(rnd.nextLong())); } return data; } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19629) Avro format cause NullPointException,as null value in MAP type's value type
[ https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17213667#comment-17213667 ] shizhengchao commented on FLINK-19629: -- This is a duplicate of FLINK-19622 > Avro format cause NullPointException,as null value in MAP type's value type > > > Key: FLINK-19629 > URL: https://issues.apache.org/jira/browse/FLINK-19629 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.11.2 >Reporter: shizhengchao >Priority: Critical > Fix For: 1.12.0 > > > create table tableA ( > name STRING, > hobly MAP, > phone STRING > ) with ( > 'connector' = 'kafka-0.11', > 'topic' = 'ShizcTest', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'ShizcTest', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'avro' > ); > if hobly have an null value like this: > {"name": "shizc", "hobly": {"key1":null}, "phone": "1104564"} > cause an NullPointException: > {code:java} > java.io.IOException: Failed to deserialize Avro record. > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75) > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:147) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > Caused by: java.lang.NullPointerException: null > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createConverter$57e941b$5(AvroRowDataDeserializationSchema.java:252) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createMapConverter$7941d275$1(AvroRowDataDeserializationSchema.java:315) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:221) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:206) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:148) > ... 8 common frames omitted > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19629) Avro format cause NullPointException,as null value in MAP type's value type
[ https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shizhengchao updated FLINK-19629: - Description: create table tableA ( name STRING, hobly MAP, phone STRING ) with ( 'connector' = 'kafka-0.11', 'topic' = 'ShizcTest', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'ShizcTest', 'scan.startup.mode' = 'earliest-offset', 'format' = 'avro' ); if hobly have an null value like this: {"name": "shizc", "hobly": {"key1":null}, "phone": "1104564"} cause an NullPointException: {code:java} java.io.IOException: Failed to deserialize Avro record. at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75) at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81) at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:147) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) Caused by: java.lang.NullPointerException: null at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createConverter$57e941b$5(AvroRowDataDeserializationSchema.java:252) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createMapConverter$7941d275$1(AvroRowDataDeserializationSchema.java:315) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:221) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:206) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:148) ... 8 common frames omitted {code} was: the docs Connectors/Table & SQL Connectors/Formats/Avro: In addition to the types listed above, Flink supports reading/writing nullable types. Flink maps nullable types to Avro union(something, null), where something is the Avro type converted from Flink type. avro have no union type, should be unions: Avro unions(something, null) by the way, an example should be provided that reading/writing nullable types, such as this: {code:java} CREATE TABLE user_behavior ( behavior STRING NULL ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'avro' ) {code} > Avro format cause NullPointException,as null value in MAP type's value type > > > Key: FLINK-19629 > URL: https://issues.apache.org/jira/browse/FLINK-19629 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.11.2 >Reporter: shizhengchao >Priority: Critical > Fix For: 1.12.0 > > > create table tableA ( > name STRING, > hobly MAP, > phone STRING > ) with ( > 'connector' = 'kafka-0.11', > 'topic' = 'ShizcTest', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'ShizcTest', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'avro' > ); > if hobly have an null value like this: > {"name": "shizc", "hobly": {"key1":null}, "phone": "1104564"} > cause an NullPointException: > {code:java} > java.io.IOException: Failed to deserialize Avro record. > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75) > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) > at >
[jira] [Updated] (FLINK-19629) Avro format cause NullPointException,as null value in MAP type's value type
[ https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shizhengchao updated FLINK-19629: - Component/s: (was: Documentation) Formats (JSON, Avro, Parquet, ORC, SequenceFile) > Avro format cause NullPointException,as null value in MAP type's value type > > > Key: FLINK-19629 > URL: https://issues.apache.org/jira/browse/FLINK-19629 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.11.2 >Reporter: shizhengchao >Priority: Critical > Fix For: 1.12.0 > > > the docs Connectors/Table & SQL Connectors/Formats/Avro: > In addition to the types listed above, Flink supports reading/writing > nullable types. Flink maps nullable types to Avro union(something, null), > where something is the Avro type converted from Flink type. > avro have no union type, should be unions: > Avro unions(something, null) > by the way, an example should be provided that reading/writing nullable > types, such as this: > {code:java} > CREATE TABLE user_behavior ( > behavior STRING NULL > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'testGroup', > 'format' = 'avro' > ) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19629) Avro format cause NullPointException,as null value in MAP type's value type
[ https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shizhengchao updated FLINK-19629: - Summary: Avro format cause NullPointException,as null value in MAP type's value type (was: English words are spelled incorrectly and an example is not provided) > Avro format cause NullPointException,as null value in MAP type's value type > > > Key: FLINK-19629 > URL: https://issues.apache.org/jira/browse/FLINK-19629 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.11.2 >Reporter: shizhengchao >Priority: Critical > Fix For: 1.12.0 > > > the docs Connectors/Table & SQL Connectors/Formats/Avro: > In addition to the types listed above, Flink supports reading/writing > nullable types. Flink maps nullable types to Avro union(something, null), > where something is the Avro type converted from Flink type. > avro have no union type, should be unions: > Avro unions(something, null) > by the way, an example should be provided that reading/writing nullable > types, such as this: > {code:java} > CREATE TABLE user_behavior ( > behavior STRING NULL > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'testGroup', > 'format' = 'avro' > ) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-19629) English words are spelled incorrectly and an example is not provided
[ https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17213633#comment-17213633 ] shizhengchao edited comment on FLINK-19629 at 10/14/20, 6:00 AM: - [~jark] Thank you for reminding me that found this bug code: {code:java} private static AvroToRowDataConverter createMapConverter(LogicalType type) { final AvroToRowDataConverter keyConverter = createConverter(DataTypes.STRING().getLogicalType()); final AvroToRowDataConverter valueConverter = createConverter(extractValueTypeToAvroMap(type)); return avroObject -> { final Map map = (Map) avroObject; Map result = new HashMap<>(); for (Map.Entry entry : map.entrySet()) { Object key = keyConverter.convert(entry.getKey()); Object value = valueConverter.convert(entry.getValue()); result.put(key, value); } return new GenericMapData(result); }; } {code} if you don’t have time to fix it, I can fix it and submit the code after verification.:D was (Author: tinny): [~jark] Thank you for reminding me that found this bug code, if you don’t have time to fix it, I can fix it and submit the code after verification.:D > English words are spelled incorrectly and an example is not provided > > > Key: FLINK-19629 > URL: https://issues.apache.org/jira/browse/FLINK-19629 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.11.2 >Reporter: shizhengchao >Priority: Critical > Fix For: 1.12.0 > > > the docs Connectors/Table & SQL Connectors/Formats/Avro: > In addition to the types listed above, Flink supports reading/writing > nullable types. Flink maps nullable types to Avro union(something, null), > where something is the Avro type converted from Flink type. > avro have no union type, should be unions: > Avro unions(something, null) > by the way, an example should be provided that reading/writing nullable > types, such as this: > {code:java} > CREATE TABLE user_behavior ( > behavior STRING NULL > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'testGroup', > 'format' = 'avro' > ) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19629) English words are spelled incorrectly and an example is not provided
[ https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17213633#comment-17213633 ] shizhengchao commented on FLINK-19629: -- [~jark] Thank you for reminding me that found this bug code, if you don’t have time to fix it, I can fix it and submit the code after verification.:D > English words are spelled incorrectly and an example is not provided > > > Key: FLINK-19629 > URL: https://issues.apache.org/jira/browse/FLINK-19629 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.11.2 >Reporter: shizhengchao >Priority: Critical > Fix For: 1.12.0 > > > the docs Connectors/Table & SQL Connectors/Formats/Avro: > In addition to the types listed above, Flink supports reading/writing > nullable types. Flink maps nullable types to Avro union(something, null), > where something is the Avro type converted from Flink type. > avro have no union type, should be unions: > Avro unions(something, null) > by the way, an example should be provided that reading/writing nullable > types, such as this: > {code:java} > CREATE TABLE user_behavior ( > behavior STRING NULL > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'testGroup', > 'format' = 'avro' > ) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19629) English words are spelled incorrectly and an example is not provided
[ https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17213569#comment-17213569 ] shizhengchao commented on FLINK-19629: -- Avro union(something, null) spelled incorrectly, should be Avro unions(something, null), not union. and i think, an example nullable types should be provided. So the complete documentation should be like this: In addition to the types listed above, Flink supports reading/writing nullable types, e.g "behavior STRING NULL" . Flink maps nullable types to Avro unions(something, null), where something is the Avro type converted from Flink type. > English words are spelled incorrectly and an example is not provided > > > Key: FLINK-19629 > URL: https://issues.apache.org/jira/browse/FLINK-19629 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.11.2 >Reporter: shizhengchao >Priority: Critical > Fix For: 1.12.0 > > > the docs Connectors/Table & SQL Connectors/Formats/Avro: > In addition to the types listed above, Flink supports reading/writing > nullable types. Flink maps nullable types to Avro union(something, null), > where something is the Avro type converted from Flink type. > avro have no union type, should be unions: > Avro unions(something, null) > by the way, an example should be provided that reading/writing nullable > types, such as this: > {code:java} > CREATE TABLE user_behavior ( > behavior STRING NULL > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'testGroup', > 'format' = 'avro' > ) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19629) English words are spelled incorrectly and an example is not provided
[ https://issues.apache.org/jira/browse/FLINK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17213561#comment-17213561 ] shizhengchao commented on FLINK-19629: -- [~jark], i can complete this work, could you assign it to me ? > English words are spelled incorrectly and an example is not provided > > > Key: FLINK-19629 > URL: https://issues.apache.org/jira/browse/FLINK-19629 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.11.2 >Reporter: shizhengchao >Priority: Critical > Fix For: 1.12.0 > > > the docs Connectors/Table & SQL Connectors/Formats/Avro: > In addition to the types listed above, Flink supports reading/writing > nullable types. Flink maps nullable types to Avro union(something, null), > where something is the Avro type converted from Flink type. > avro have no union type, should be unions: > Avro unions(something, null) > by the way, an example should be provided that reading/writing nullable > types, such as this: > {code:java} > CREATE TABLE user_behavior ( > behavior STRING NULL > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'testGroup', > 'format' = 'avro' > ) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19629) English words are spelled incorrectly and an example is not provided
shizhengchao created FLINK-19629: Summary: English words are spelled incorrectly and an example is not provided Key: FLINK-19629 URL: https://issues.apache.org/jira/browse/FLINK-19629 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.11.2 Reporter: shizhengchao Fix For: 1.12.0 the docs Connectors/Table & SQL Connectors/Formats/Avro: In addition to the types listed above, Flink supports reading/writing nullable types. Flink maps nullable types to Avro union(something, null), where something is the Avro type converted from Flink type. avro have no union type, should be unions: Avro unions(something, null) by the way, an example should be provided that reading/writing nullable types, such as this: {code:java} CREATE TABLE user_behavior ( behavior STRING NULL ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'avro' ) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)