[jira] [Updated] (FLINK-36407) SchemaRegistry doesn't shutdown its underlying ExecutorService upon closing
[ https://issues.apache.org/jira/browse/FLINK-36407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergei Morozov updated FLINK-36407: --- Attachment: Flink36407Reproducer.java > SchemaRegistry doesn't shutdown its underlying ExecutorService upon closing > --- > > Key: FLINK-36407 > URL: https://issues.apache.org/jira/browse/FLINK-36407 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.2.0 >Reporter: Sergei Morozov >Priority: Minor > Attachments: Flink36407Reproducer.java > > > The {{SchemaRegistry}} class exclusively uses an {{ExecutorService}} > referenced by the {{coordinatorExecutor}} field, but when the schema registry > closes, the service remains running. As a result, a program running a Flink > CDC pipeline never terminates. > For some reason, it's not reproducible with the existing integration tests > but is reproducible with the attached program. > Once {{execution.execute()}} returns, it prints the following and never > terminates. > {quote} > Thread[schema-evolution-coordinator,5,main] is still running > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36407) SchemaRegistry doesn't shutdown its underlying ExecutorService upon closing
[ https://issues.apache.org/jira/browse/FLINK-36407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergei Morozov updated FLINK-36407: --- Description: The {{SchemaRegistry}} class exclusively uses an {{ExecutorService}} referenced by the {{coordinatorExecutor}} field, but when the schema registry closes, the service remains running. As a result, a program running a Flink CDC pipeline never terminates. For some reason, it's not reproducible with the existing integration tests but is reproducible with the attached program. Once {{execution.execute()}} returns, it prints the following and never terminates. {quote} Thread[schema-evolution-coordinator,5,main] is still running {quote} was: The {{SchemaRegistry}} class exclusively uses an {{ExecutorService}} referenced by the {{coordinatorExecutor}} field, but when the schema registry closes, the service remains running. As a result, a program running a Flink CDC pipeline never terminates. For some reason, it's not reproducible with the existing integration tests but is reproducible with the attached program. > SchemaRegistry doesn't shutdown its underlying ExecutorService upon closing > --- > > Key: FLINK-36407 > URL: https://issues.apache.org/jira/browse/FLINK-36407 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.2.0 >Reporter: Sergei Morozov >Priority: Minor > > The {{SchemaRegistry}} class exclusively uses an {{ExecutorService}} > referenced by the {{coordinatorExecutor}} field, but when the schema registry > closes, the service remains running. As a result, a program running a Flink > CDC pipeline never terminates. > For some reason, it's not reproducible with the existing integration tests > but is reproducible with the attached program. > Once {{execution.execute()}} returns, it prints the following and never > terminates. > {quote} > Thread[schema-evolution-coordinator,5,main] is still running > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36407) SchemaRegistry doesn't shutdown its underlying ExecutorService upon closing
Sergei Morozov created FLINK-36407: -- Summary: SchemaRegistry doesn't shutdown its underlying ExecutorService upon closing Key: FLINK-36407 URL: https://issues.apache.org/jira/browse/FLINK-36407 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.2.0 Reporter: Sergei Morozov The {{SchemaRegistry}} class exclusively uses an {{ExecutorService}} referenced by the {{coordinatorExecutor}} field, but when the schema registry closes, the service remains running. As a result, a program running a Flink CDC pipeline never terminates. For some reason, it's not reproducible with the existing integration tests but is reproducible with the attached program. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36406) MetadataApplier should be closed when the job stop
Sergei Morozov created FLINK-36406: -- Summary: MetadataApplier should be closed when the job stop Key: FLINK-36406 URL: https://issues.apache.org/jira/browse/FLINK-36406 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: cdc-3.2.0 Reporter: Sergei Morozov Once deserialized, MetadataApplier can instantiate its internal resources (e.g. a JDBC connection) for managing external database schema. When the Flink job stops, there's no way to close these resources. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35056) when initial sqlserver table that's primary key is datetime type, it org.apache.flink.table.api.ValidationException: Timestamp precision must be between 0 and 9 (both
[ https://issues.apache.org/jira/browse/FLINK-35056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17875604#comment-17875604 ] Sergei Morozov commented on FLINK-35056: I submitted a pull request with a test that reproduces the issue on {{master}} and a fix. > when initial sqlserver table that's primary key is datetime type, it > org.apache.flink.table.api.ValidationException: Timestamp precision must be > between 0 and 9 (both inclusive) > -- > > Key: FLINK-35056 > URL: https://issues.apache.org/jira/browse/FLINK-35056 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: 2.0.0 >Reporter: fengfengzai >Priority: Critical > Labels: pull-request-available > Attachments: sqlserver-bug.png > > > when initial sqlserver table that's primary key is datetime type. > it error: > org.apache.flink.table.api.ValidationException: Timestamp precision must be > between 0 and 9 (both inclusive) > > i find datetime's length is 23. it exceed 9. so it errors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35056) when initial sqlserver table that's primary key is datetime type, it org.apache.flink.table.api.ValidationException: Timestamp precision must be between 0 and 9 (both
[ https://issues.apache.org/jira/browse/FLINK-35056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17875320#comment-17875320 ] Sergei Morozov commented on FLINK-35056: Here's a source table example that will fail the connector: {code:sql} CREATE TABLE dt_pk ( dt datetime NOT NULL PRIMARY KEY, val INT ); {code} The failure will look like: {noformat} org.apache.flink.util.FlinkRuntimeException: Generate Splits for table sergei.dbo.dt_src error at com.ververica.cdc.connectors.sqlserver.source.dialect.SqlServerChunkSplitter.generateSplits(SqlServerChunkSplitter.java:125) at com.ververica.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.getNext(SnapshotSplitAssigner.java:188) at com.ververica.cdc.connectors.base.source.assigner.HybridSplitAssigner.getNext(HybridSplitAssigner.java:137) at com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.assignSplits(IncrementalSourceEnumerator.java:174) at com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.handleSplitRequest(IncrementalSourceEnumerator.java:97) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$2(SourceCoordinator.java:230) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$9(SourceCoordinator.java:406) at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) at java.base/java.lang.Thread.run(Thread.java:831) Caused by: org.apache.flink.table.api.ValidationException: Timestamp precision must be between 0 and 9 (both inclusive). at org.apache.flink.table.types.logical.TimestampType.(TimestampType.java:85) at org.apache.flink.table.types.logical.TimestampType.(TimestampType.java:97) at org.apache.flink.table.types.logical.TimestampType.(TimestampType.java:101) at org.apache.flink.table.api.DataTypes.TIMESTAMP(DataTypes.java:406) at com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerTypeUtils.convertFromColumn(SqlServerTypeUtils.java:71) at com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerTypeUtils.fromDbzColumn(SqlServerTypeUtils.java:31) at com.ververica.cdc.connectors.sqlserver.source.dialect.SqlServerChunkSplitter.fromDbzColumn(SqlServerChunkSplitter.java:144) at com.ververica.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter.getSplitType(JdbcSourceChunkSplitter.java:144) at com.ververica.cdc.connectors.sqlserver.source.dialect.SqlServerChunkSplitter.generateSplits(SqlServerChunkSplitter.java:102) ... 13 common frames omitted {noformat} > when initial sqlserver table that's primary key is datetime type, it > org.apache.flink.table.api.ValidationException: Timestamp precision must be > between 0 and 9 (both inclusive) > -- > > Key: FLINK-35056 > URL: https://issues.apache.org/jira/browse/FLINK-35056 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: 2.0.0 >Reporter: fengfengzai >Priority: Critical > Attachments: sqlserver-bug.png > > > when initial sqlserver table that's primary key is datetime type. > it error: > org.apache.flink.table.api.ValidationException: Timestamp precision must be > between 0 and 9 (both inclusive) > > i find datetime's length is 23. it exceed 9. so it errors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36086) SQL Server source connector skips changes if restarted mid transaction
Sergei Morozov created FLINK-36086: -- Summary: SQL Server source connector skips changes if restarted mid transaction Key: FLINK-36086 URL: https://issues.apache.org/jira/browse/FLINK-36086 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: Sergei Morozov If the SQL Server source connector is restarted while handling updates from a transaction with multiple updates, upon restart, it will skip the non-processed changes and proceed from the next transaction. This is an analog of [DBZ-1128|https://issues.redhat.com/browse/DBZ-1128] but reproducible only in Flink CDC. This is a regression introduced in [apache/flink-cdc#2176|https://github.com/apache/flink-cdc/pull/2176]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36051) JDBC sink doesn't re-prepare invalidated prepared statements
[ https://issues.apache.org/jira/browse/FLINK-36051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergei Morozov updated FLINK-36051: --- Description: An SQL Server sink connector may fail with the following exception: {code} java.io.IOException: Writing records to JDBC failed. at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:198) at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:57) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:68) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:62) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator$ContextImpl.output(LegacyKeyedProcessOperator.java:134) at com.acme.flink.job.AcmeFlinkJob$1.processElement(AcmeFlinkJob.java:123) at com.acme.flink.job.AcmeFlinkJob$1.processElement(AcmeFlinkJob.java:118) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:807) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:756) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.base/java.lang.Thread.run(Thread.java:831) Caused by: java.io.IOException: java.sql.BatchUpdateException: Could not find prepared statement with handle 1. at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:222) at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:195) ... 22 more Caused by: java.sql.BatchUpdateException: Could not find prepared statement with handle 1. at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeBatch(SQLServerPreparedStatement.java:2231) at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:246) at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:216) ... 23 more {code} While the connector can handle connection-level errors (see FLINK-16681), it doesn't handle the statement-level ones. was: An SQL Server sink connector may fail with the following exception: {code} java.io.IOException: Writing records to JDBC failed. at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:198) at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:57) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:68) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:62) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator$C
[jira] [Created] (FLINK-36051) JDBC sink doesn't re-prepare invalidated prepared statements
Sergei Morozov created FLINK-36051: -- Summary: JDBC sink doesn't re-prepare invalidated prepared statements Key: FLINK-36051 URL: https://issues.apache.org/jira/browse/FLINK-36051 Project: Flink Issue Type: Bug Components: Connectors / JDBC Reporter: Sergei Morozov An SQL Server sink connector may fail with the following exception: {code} java.io.IOException: Writing records to JDBC failed. at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:198) at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:57) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:68) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:62) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator$ContextImpl.output(LegacyKeyedProcessOperator.java:134) at com.acme.flink.job.AcmeFlinkJob$1.processElement(AcmeFlinkJob.java:123) at com.acme.flink.job.AcmeFlinkJob$1.processElement(AcmeFlinkJob.java:118) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:807) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:756) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.base/java.lang.Thread.run(Thread.java:831) Caused by: java.io.IOException: java.sql.BatchUpdateException: Could not find prepared statement with handle 1. at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:222) at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:195) ... 22 more Caused by: java.sql.BatchUpdateException: Could not find prepared statement with handle 1. at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeBatch(SQLServerPreparedStatement.java:2231) at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:246) at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:216) ... 23 more {code} While the connector can handle connection-level errors (see FLINK-16681), it doesn't handle the statement level ones. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35177) Datagen examples in documentation do not compile
[ https://issues.apache.org/jira/browse/FLINK-35177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergei Morozov updated FLINK-35177: --- Description: Currently, the [examples|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/#rate-limiting] look like this: {code:java} GeneratorFunction generatorFunction = index -> index; double recordsPerSecond = 100; DataGeneratorSource source = new DataGeneratorSource<>( generatorFunction, Long.MAX_VALUE, RateLimiterStrategy.perSecond(recordsPerSecond), Types.STRING); {code} The generator function returns Long but the DataGeneratorSource uses String, so their types do not match. Either the generator function needs to be changed to return a string, or the source needs to use Long. was: Currently, the examples look like this: {code} GeneratorFunction generatorFunction = index -> index; double recordsPerSecond = 100; DataGeneratorSource source = new DataGeneratorSource<>( generatorFunction, Long.MAX_VALUE, RateLimiterStrategy.perSecond(recordsPerSecond), Types.STRING); {code} The generator function returns Long but the DataGeneratorSource uses String, so their types do not match. Either the generator function needs to be changed to return a string, or the source needs to use Long. > Datagen examples in documentation do not compile > > > Key: FLINK-35177 > URL: https://issues.apache.org/jira/browse/FLINK-35177 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.19.0 >Reporter: Sergei Morozov >Priority: Not a Priority > > Currently, the > [examples|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/#rate-limiting] > look like this: > {code:java} > GeneratorFunction generatorFunction = index -> index; > double recordsPerSecond = 100; > DataGeneratorSource source = > new DataGeneratorSource<>( > generatorFunction, > Long.MAX_VALUE, > RateLimiterStrategy.perSecond(recordsPerSecond), > Types.STRING); > {code} > The generator function returns Long but the DataGeneratorSource uses String, > so their types do not match. > Either the generator function needs to be changed to return a string, or the > source needs to use Long. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35177) Datagen examples in documentation do not compile
Sergei Morozov created FLINK-35177: -- Summary: Datagen examples in documentation do not compile Key: FLINK-35177 URL: https://issues.apache.org/jira/browse/FLINK-35177 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.19.0 Reporter: Sergei Morozov Currently, the examples look like this: {code} GeneratorFunction generatorFunction = index -> index; double recordsPerSecond = 100; DataGeneratorSource source = new DataGeneratorSource<>( generatorFunction, Long.MAX_VALUE, RateLimiterStrategy.perSecond(recordsPerSecond), Types.STRING); {code} The generator function returns Long but the DataGeneratorSource uses String, so their types do not match. Either the generator function needs to be changed to return a string, or the source needs to use Long. -- This message was sent by Atlassian Jira (v8.20.10#820010)