[jira] [Updated] (FLINK-36407) SchemaRegistry doesn't shutdown its underlying ExecutorService upon closing

2024-09-29 Thread Sergei Morozov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergei Morozov updated FLINK-36407:
---
Attachment: Flink36407Reproducer.java

> SchemaRegistry doesn't shutdown its underlying ExecutorService upon closing
> ---
>
> Key: FLINK-36407
> URL: https://issues.apache.org/jira/browse/FLINK-36407
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
>Reporter: Sergei Morozov
>Priority: Minor
> Attachments: Flink36407Reproducer.java
>
>
> The {{SchemaRegistry}} class exclusively uses an {{ExecutorService}} 
> referenced by the {{coordinatorExecutor}} field, but when the schema registry 
> closes, the service remains running. As a result, a program running a Flink 
> CDC pipeline never terminates.
> For some reason, it's not reproducible with the existing integration tests 
> but is reproducible with the attached program.
> Once {{execution.execute()}} returns, it prints the following and never 
> terminates.
> {quote}
> Thread[schema-evolution-coordinator,5,main] is still running
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36407) SchemaRegistry doesn't shutdown its underlying ExecutorService upon closing

2024-09-29 Thread Sergei Morozov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergei Morozov updated FLINK-36407:
---
Description: 
The {{SchemaRegistry}} class exclusively uses an {{ExecutorService}} referenced 
by the {{coordinatorExecutor}} field, but when the schema registry closes, the 
service remains running. As a result, a program running a Flink CDC pipeline 
never terminates.

For some reason, it's not reproducible with the existing integration tests but 
is reproducible with the attached program.

Once {{execution.execute()}} returns, it prints the following and never 
terminates.

{quote}
Thread[schema-evolution-coordinator,5,main] is still running
{quote}

  was:
The {{SchemaRegistry}} class exclusively uses an {{ExecutorService}} referenced 
by the {{coordinatorExecutor}} field, but when the schema registry closes, the 
service remains running. As a result, a program running a Flink CDC pipeline 
never terminates.

For some reason, it's not reproducible with the existing integration tests but 
is reproducible with the attached program.


> SchemaRegistry doesn't shutdown its underlying ExecutorService upon closing
> ---
>
> Key: FLINK-36407
> URL: https://issues.apache.org/jira/browse/FLINK-36407
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
>Reporter: Sergei Morozov
>Priority: Minor
>
> The {{SchemaRegistry}} class exclusively uses an {{ExecutorService}} 
> referenced by the {{coordinatorExecutor}} field, but when the schema registry 
> closes, the service remains running. As a result, a program running a Flink 
> CDC pipeline never terminates.
> For some reason, it's not reproducible with the existing integration tests 
> but is reproducible with the attached program.
> Once {{execution.execute()}} returns, it prints the following and never 
> terminates.
> {quote}
> Thread[schema-evolution-coordinator,5,main] is still running
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36407) SchemaRegistry doesn't shutdown its underlying ExecutorService upon closing

2024-09-29 Thread Sergei Morozov (Jira)
Sergei Morozov created FLINK-36407:
--

 Summary: SchemaRegistry doesn't shutdown its underlying 
ExecutorService upon closing
 Key: FLINK-36407
 URL: https://issues.apache.org/jira/browse/FLINK-36407
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.2.0
Reporter: Sergei Morozov


The {{SchemaRegistry}} class exclusively uses an {{ExecutorService}} referenced 
by the {{coordinatorExecutor}} field, but when the schema registry closes, the 
service remains running. As a result, a program running a Flink CDC pipeline 
never terminates.

For some reason, it's not reproducible with the existing integration tests but 
is reproducible with the attached program.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36406) MetadataApplier should be closed when the job stop

2024-09-29 Thread Sergei Morozov (Jira)
Sergei Morozov created FLINK-36406:
--

 Summary: MetadataApplier should be closed when the job stop
 Key: FLINK-36406
 URL: https://issues.apache.org/jira/browse/FLINK-36406
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.2.0
Reporter: Sergei Morozov


Once deserialized, MetadataApplier can instantiate its internal resources (e.g. 
a JDBC connection) for managing external database schema. When the Flink job 
stops, there's no way to close these resources.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35056) when initial sqlserver table that's primary key is datetime type, it org.apache.flink.table.api.ValidationException: Timestamp precision must be between 0 and 9 (both

2024-08-21 Thread Sergei Morozov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17875604#comment-17875604
 ] 

Sergei Morozov commented on FLINK-35056:


I submitted a pull request with a test that reproduces the issue on {{master}} 
and a fix.

> when initial sqlserver table that's primary key is datetime type,  it 
> org.apache.flink.table.api.ValidationException: Timestamp precision must be 
> between 0 and 9 (both inclusive)
> --
>
> Key: FLINK-35056
> URL: https://issues.apache.org/jira/browse/FLINK-35056
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: 2.0.0
>Reporter: fengfengzai
>Priority: Critical
>  Labels: pull-request-available
> Attachments: sqlserver-bug.png
>
>
> when initial sqlserver table that's primary key is datetime type.
> it error:
>  org.apache.flink.table.api.ValidationException: Timestamp precision must be 
> between 0 and 9 (both inclusive)
>  
> i find datetime's length is 23. it exceed 9. so it errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35056) when initial sqlserver table that's primary key is datetime type, it org.apache.flink.table.api.ValidationException: Timestamp precision must be between 0 and 9 (both

2024-08-20 Thread Sergei Morozov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17875320#comment-17875320
 ] 

Sergei Morozov commented on FLINK-35056:


Here's a source table example that will fail the connector:
{code:sql}
CREATE TABLE dt_pk (
  dt datetime NOT NULL PRIMARY KEY,
  val INT
);
{code}

The failure will look like:
{noformat}
org.apache.flink.util.FlinkRuntimeException: Generate Splits for table 
sergei.dbo.dt_src error
at 
com.ververica.cdc.connectors.sqlserver.source.dialect.SqlServerChunkSplitter.generateSplits(SqlServerChunkSplitter.java:125)
at 
com.ververica.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.getNext(SnapshotSplitAssigner.java:188)
at 
com.ververica.cdc.connectors.base.source.assigner.HybridSplitAssigner.getNext(HybridSplitAssigner.java:137)
at 
com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.assignSplits(IncrementalSourceEnumerator.java:174)
at 
com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.handleSplitRequest(IncrementalSourceEnumerator.java:97)
at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$2(SourceCoordinator.java:230)
at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$9(SourceCoordinator.java:406)
at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: org.apache.flink.table.api.ValidationException: Timestamp precision 
must be between 0 and 9 (both inclusive).
at 
org.apache.flink.table.types.logical.TimestampType.(TimestampType.java:85)
at 
org.apache.flink.table.types.logical.TimestampType.(TimestampType.java:97)
at 
org.apache.flink.table.types.logical.TimestampType.(TimestampType.java:101)
at org.apache.flink.table.api.DataTypes.TIMESTAMP(DataTypes.java:406)
at 
com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerTypeUtils.convertFromColumn(SqlServerTypeUtils.java:71)
at 
com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerTypeUtils.fromDbzColumn(SqlServerTypeUtils.java:31)
at 
com.ververica.cdc.connectors.sqlserver.source.dialect.SqlServerChunkSplitter.fromDbzColumn(SqlServerChunkSplitter.java:144)
at 
com.ververica.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter.getSplitType(JdbcSourceChunkSplitter.java:144)
at 
com.ververica.cdc.connectors.sqlserver.source.dialect.SqlServerChunkSplitter.generateSplits(SqlServerChunkSplitter.java:102)
... 13 common frames omitted
{noformat}

> when initial sqlserver table that's primary key is datetime type,  it 
> org.apache.flink.table.api.ValidationException: Timestamp precision must be 
> between 0 and 9 (both inclusive)
> --
>
> Key: FLINK-35056
> URL: https://issues.apache.org/jira/browse/FLINK-35056
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: 2.0.0
>Reporter: fengfengzai
>Priority: Critical
> Attachments: sqlserver-bug.png
>
>
> when initial sqlserver table that's primary key is datetime type.
> it error:
>  org.apache.flink.table.api.ValidationException: Timestamp precision must be 
> between 0 and 9 (both inclusive)
>  
> i find datetime's length is 23. it exceed 9. so it errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36086) SQL Server source connector skips changes if restarted mid transaction

2024-08-18 Thread Sergei Morozov (Jira)
Sergei Morozov created FLINK-36086:
--

 Summary: SQL Server source connector skips changes if restarted 
mid transaction
 Key: FLINK-36086
 URL: https://issues.apache.org/jira/browse/FLINK-36086
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: Sergei Morozov


If the SQL Server source connector is restarted while handling updates from a 
transaction with multiple updates, upon restart, it will skip the non-processed 
changes and proceed from the next transaction.

This is an analog of [DBZ-1128|https://issues.redhat.com/browse/DBZ-1128] but 
reproducible only in Flink CDC.

This is a regression introduced in 
[apache/flink-cdc#2176|https://github.com/apache/flink-cdc/pull/2176].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36051) JDBC sink doesn't re-prepare invalidated prepared statements

2024-08-13 Thread Sergei Morozov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergei Morozov updated FLINK-36051:
---
Description: 
An SQL Server sink connector may fail with the following exception:

{code}
java.io.IOException: Writing records to JDBC failed.
at 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:198)
at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:57)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:68)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:62)
at 
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator$ContextImpl.output(LegacyKeyedProcessOperator.java:134)
at 
com.acme.flink.job.AcmeFlinkJob$1.processElement(AcmeFlinkJob.java:123)
at 
com.acme.flink.job.AcmeFlinkJob$1.processElement(AcmeFlinkJob.java:118)
at 
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:807)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:756)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: java.io.IOException: java.sql.BatchUpdateException: Could not find 
prepared statement with handle 1.
at 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:222)
at 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:195)
... 22 more
Caused by: java.sql.BatchUpdateException: Could not find prepared statement 
with handle 1.
at 
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeBatch(SQLServerPreparedStatement.java:2231)
at 
org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65)
at 
org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64)
at 
org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101)
at 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:246)
at 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:216)
... 23 more
{code}

While the connector can handle connection-level errors (see FLINK-16681), it 
doesn't handle the statement-level ones.

  was:
An SQL Server sink connector may fail with the following exception:

{code}
java.io.IOException: Writing records to JDBC failed.
at 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:198)
at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:57)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:68)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:62)
at 
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator$C

[jira] [Created] (FLINK-36051) JDBC sink doesn't re-prepare invalidated prepared statements

2024-08-13 Thread Sergei Morozov (Jira)
Sergei Morozov created FLINK-36051:
--

 Summary: JDBC sink doesn't re-prepare invalidated prepared 
statements
 Key: FLINK-36051
 URL: https://issues.apache.org/jira/browse/FLINK-36051
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Reporter: Sergei Morozov


An SQL Server sink connector may fail with the following exception:

{code}
java.io.IOException: Writing records to JDBC failed.
at 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:198)
at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:57)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:68)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:62)
at 
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator$ContextImpl.output(LegacyKeyedProcessOperator.java:134)
at 
com.acme.flink.job.AcmeFlinkJob$1.processElement(AcmeFlinkJob.java:123)
at 
com.acme.flink.job.AcmeFlinkJob$1.processElement(AcmeFlinkJob.java:118)
at 
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:807)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:756)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: java.io.IOException: java.sql.BatchUpdateException: Could not find 
prepared statement with handle 1.
at 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:222)
at 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:195)
... 22 more
Caused by: java.sql.BatchUpdateException: Could not find prepared statement 
with handle 1.
at 
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeBatch(SQLServerPreparedStatement.java:2231)
at 
org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65)
at 
org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64)
at 
org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101)
at 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:246)
at 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:216)
... 23 more
{code}

While the connector can handle connection-level errors (see FLINK-16681), it 
doesn't handle the statement level ones.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35177) Datagen examples in documentation do not compile

2024-04-19 Thread Sergei Morozov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergei Morozov updated FLINK-35177:
---
Description: 
Currently, the 
[examples|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/#rate-limiting]
 look like this:
{code:java}
GeneratorFunction generatorFunction = index -> index;
double recordsPerSecond = 100;

DataGeneratorSource source =
new DataGeneratorSource<>(
 generatorFunction,
 Long.MAX_VALUE,
 RateLimiterStrategy.perSecond(recordsPerSecond),
 Types.STRING);
{code}
The generator function returns Long but the DataGeneratorSource uses String, so 
their types do not match.

Either the generator function needs to be changed to return a string, or the 
source needs to use Long.

  was:
Currently, the examples look like this:

{code}
GeneratorFunction generatorFunction = index -> index;
double recordsPerSecond = 100;

DataGeneratorSource source =
new DataGeneratorSource<>(
 generatorFunction,
 Long.MAX_VALUE,
 RateLimiterStrategy.perSecond(recordsPerSecond),
 Types.STRING);
{code}

The generator function returns Long but the DataGeneratorSource uses String, so 
their types do not match.

Either the generator function needs to be changed to return a string, or the 
source needs to use Long.


> Datagen examples in documentation do not compile
> 
>
> Key: FLINK-35177
> URL: https://issues.apache.org/jira/browse/FLINK-35177
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.19.0
>Reporter: Sergei Morozov
>Priority: Not a Priority
>
> Currently, the 
> [examples|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/#rate-limiting]
>  look like this:
> {code:java}
> GeneratorFunction generatorFunction = index -> index;
> double recordsPerSecond = 100;
> DataGeneratorSource source =
> new DataGeneratorSource<>(
>  generatorFunction,
>  Long.MAX_VALUE,
>  RateLimiterStrategy.perSecond(recordsPerSecond),
>  Types.STRING);
> {code}
> The generator function returns Long but the DataGeneratorSource uses String, 
> so their types do not match.
> Either the generator function needs to be changed to return a string, or the 
> source needs to use Long.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35177) Datagen examples in documentation do not compile

2024-04-19 Thread Sergei Morozov (Jira)
Sergei Morozov created FLINK-35177:
--

 Summary: Datagen examples in documentation do not compile
 Key: FLINK-35177
 URL: https://issues.apache.org/jira/browse/FLINK-35177
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.19.0
Reporter: Sergei Morozov


Currently, the examples look like this:

{code}
GeneratorFunction generatorFunction = index -> index;
double recordsPerSecond = 100;

DataGeneratorSource source =
new DataGeneratorSource<>(
 generatorFunction,
 Long.MAX_VALUE,
 RateLimiterStrategy.perSecond(recordsPerSecond),
 Types.STRING);
{code}

The generator function returns Long but the DataGeneratorSource uses String, so 
their types do not match.

Either the generator function needs to be changed to return a string, or the 
source needs to use Long.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)