[jira] [Created] (FLINK-34586) Update the README in Flink CDC
Hang Ruan created FLINK-34586: - Summary: Update the README in Flink CDC Key: FLINK-34586 URL: https://issues.apache.org/jira/browse/FLINK-34586 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: Hang Ruan We should update the README file in Flink CDC. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34585) [JUnit5 Migration] Module: Flink CDC
Hang Ruan created FLINK-34585: - Summary: [JUnit5 Migration] Module: Flink CDC Key: FLINK-34585 URL: https://issues.apache.org/jira/browse/FLINK-34585 Project: Flink Issue Type: Sub-task Components: Flink CDC Reporter: Hang Ruan Most tests in Flink CDC are still using Junit 4. We need to use Junit 5 instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34584) Change package name to org.apache.flink.cdc
Hang Ruan created FLINK-34584: - Summary: Change package name to org.apache.flink.cdc Key: FLINK-34584 URL: https://issues.apache.org/jira/browse/FLINK-34584 Project: Flink Issue Type: Sub-task Components: Flink CDC Reporter: Hang Ruan Flink CDC need to change its package name to org.apache.flink.cdc. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32862) Support INIT operation type to be compatible with DTS on Alibaba Cloud
Hang Ruan created FLINK-32862: - Summary: Support INIT operation type to be compatible with DTS on Alibaba Cloud Key: FLINK-32862 URL: https://issues.apache.org/jira/browse/FLINK-32862 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Reporter: Hang Ruan The operation type of canal json messages from DTS on Alibaba Cloud may contain a new type `INIT`. We cannot handle these messages. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31615) Fix some parts forgot to translate in "Table API" page of "Table API & SQL"
Hang Ruan created FLINK-31615: - Summary: Fix some parts forgot to translate in "Table API" page of "Table API & SQL" Key: FLINK-31615 URL: https://issues.apache.org/jira/browse/FLINK-31615 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Hang Ruan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31559) Update the flink version to 1.18-SNAPSHOT in flink-connector-kafka
Hang Ruan created FLINK-31559: - Summary: Update the flink version to 1.18-SNAPSHOT in flink-connector-kafka Key: FLINK-31559 URL: https://issues.apache.org/jira/browse/FLINK-31559 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Reporter: Hang Ruan For FLIP-208, there are some changes in flink-connector-base which the later changes in flink-connector-kafka depend on. So we need to update the version to 1.18-SNAPSHOT. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31511) Translate documentation sql_functions_zh.yml to the latest version
Hang Ruan created FLINK-31511: - Summary: Translate documentation sql_functions_zh.yml to the latest version Key: FLINK-31511 URL: https://issues.apache.org/jira/browse/FLINK-31511 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Hang Ruan Some content of these functions in sql_functions_zh.yml is outdated. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31268) OperatorCoordinator.Context#metricGroup will return null when restore from a savepoint
Hang Ruan created FLINK-31268: - Summary: OperatorCoordinator.Context#metricGroup will return null when restore from a savepoint Key: FLINK-31268 URL: https://issues.apache.org/jira/browse/FLINK-31268 Project: Flink Issue Type: Improvement Components: Runtime / Metrics Reporter: Hang Ruan The `metricGroup` is initialized lazily in the method `OperatorCoordinatorHandler#initializeOperatorCoordinators`. This will cause the NullPointerException when we use it in the method like `Source#restoreEnumerator`, which will be invoked through `SchedulerBase#createAndRestoreExecutionGraph` before `OperatorCoordinatorHandler#initializeOperatorCoordinators` in class `SchedulerBase#`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-26126) Sink V2 will cause error numRecordsOut metric
Hang Ruan created FLINK-26126: - Summary: Sink V2 will cause error numRecordsOut metric Key: FLINK-26126 URL: https://issues.apache.org/jira/browse/FLINK-26126 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.15.0 Reporter: Hang Ruan We found that the new sink v2 interface will have a wrong numRecordsOut metric for the sink writers. We send a fixed number of records to the source, but the numRecordsOut of the sink continues to increase by the time. The problem lies in the method `emitCommittables` in the class `SinkWriterOperator`. The field `output` in its parent class `AbstractStreamOperator` uses the same counter object as the `KafkaWriter`. It will cause the numRecordsOut increasing when doing the checkpoint. I found this problem when we implement the metric test in the testframe, now I disable this metric test in the PR([https://github.com/apache/flink/pull/18496).] We could reopen this test case after the fix. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25840) Add semantic test support in the connector testframe
Hang Ruan created FLINK-25840: - Summary: Add semantic test support in the connector testframe Key: FLINK-25840 URL: https://issues.apache.org/jira/browse/FLINK-25840 Project: Flink Issue Type: Sub-task Components: Tests Affects Versions: 1.15.0 Reporter: Hang Ruan -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25545) [JUnit5 Migration] Module: flink-clients
Hang Ruan created FLINK-25545: - Summary: [JUnit5 Migration] Module: flink-clients Key: FLINK-25545 URL: https://issues.apache.org/jira/browse/FLINK-25545 Project: Flink Issue Type: Sub-task Components: Tests Reporter: Hang Ruan -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25550) [JUnit5 Migration] Module: flink-kuberbetes
Hang Ruan created FLINK-25550: - Summary: [JUnit5 Migration] Module: flink-kuberbetes Key: FLINK-25550 URL: https://issues.apache.org/jira/browse/FLINK-25550 Project: Flink Issue Type: Sub-task Components: Tests Reporter: Hang Ruan -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25549) [JUnit5 Migration] Module: flink-dstl
Hang Ruan created FLINK-25549: - Summary: [JUnit5 Migration] Module: flink-dstl Key: FLINK-25549 URL: https://issues.apache.org/jira/browse/FLINK-25549 Project: Flink Issue Type: Sub-task Components: Tests Reporter: Hang Ruan -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25548) [JUnit5 Migration] Module: flink-sql-parser
Hang Ruan created FLINK-25548: - Summary: [JUnit5 Migration] Module: flink-sql-parser Key: FLINK-25548 URL: https://issues.apache.org/jira/browse/FLINK-25548 Project: Flink Issue Type: Sub-task Components: Tests Reporter: Hang Ruan -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25547) [JUnit5 Migration] Module: flink-optimizer
Hang Ruan created FLINK-25547: - Summary: [JUnit5 Migration] Module: flink-optimizer Key: FLINK-25547 URL: https://issues.apache.org/jira/browse/FLINK-25547 Project: Flink Issue Type: Sub-task Components: Tests Reporter: Hang Ruan -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25546) [JUnit5 Migration] Module: flink-connector-base
Hang Ruan created FLINK-25546: - Summary: [JUnit5 Migration] Module: flink-connector-base Key: FLINK-25546 URL: https://issues.apache.org/jira/browse/FLINK-25546 Project: Flink Issue Type: Sub-task Components: Tests Reporter: Hang Ruan -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25544) [JUnit5 Migration] Module: flink-streaming-java
Hang Ruan created FLINK-25544: - Summary: [JUnit5 Migration] Module: flink-streaming-java Key: FLINK-25544 URL: https://issues.apache.org/jira/browse/FLINK-25544 Project: Flink Issue Type: Sub-task Components: Tests Reporter: Hang Ruan -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25543) [JUnit5 Migration] Module: flink-yarn
Hang Ruan created FLINK-25543: - Summary: [JUnit5 Migration] Module: flink-yarn Key: FLINK-25543 URL: https://issues.apache.org/jira/browse/FLINK-25543 Project: Flink Issue Type: Sub-task Components: Tests Reporter: Hang Ruan -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25542) [JUnit5 Migration] Module: flink-runtime-web
Hang Ruan created FLINK-25542: - Summary: [JUnit5 Migration] Module: flink-runtime-web Key: FLINK-25542 URL: https://issues.apache.org/jira/browse/FLINK-25542 Project: Flink Issue Type: Sub-task Components: Tests Reporter: Hang Ruan -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25541) [JUnit5 Migration] Module: flink-test-utils
Hang Ruan created FLINK-25541: - Summary: [JUnit5 Migration] Module: flink-test-utils Key: FLINK-25541 URL: https://issues.apache.org/jira/browse/FLINK-25541 Project: Flink Issue Type: Sub-task Components: Tests Reporter: Hang Ruan -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25540) [JUnit5 Migration] Module: flink-runtime
Hang Ruan created FLINK-25540: - Summary: [JUnit5 Migration] Module: flink-runtime Key: FLINK-25540 URL: https://issues.apache.org/jira/browse/FLINK-25540 Project: Flink Issue Type: Sub-task Components: Tests Reporter: Hang Ruan -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25342) DataStream.sinkTo will not add sink to the sinks field in the StreamGraph
Hang Ruan created FLINK-25342: - Summary: DataStream.sinkTo will not add sink to the sinks field in the StreamGraph Key: FLINK-25342 URL: https://issues.apache.org/jira/browse/FLINK-25342 Project: Flink Issue Type: Bug Affects Versions: 1.14.0 Reporter: Hang Ruan I run a test in my IDEA and watch the generated StreamGraph. It seems like the sink is not in the field sinks in the StreamGraph. My test is as follows: {code:java} @Test public void selfTest() throws Exception { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream source = execEnv.fromSource( KafkaSource.builder() .setGroupId("flink-kafka-test") .setDeserializer( KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)) .setTopics("scaleDownTest") .setBootstrapServers("localhost:9092") .build(), WatermarkStrategy.noWatermarks(), "Kafka Source"); Properties props = new Properties(); props.setProperty("transaction.timeout.ms", "90"); source.sinkTo(KafkaSink.builder() .setBootstrapServers("localhost:9092") .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .setTransactionalIdPrefix("tp-test-") .setKafkaProducerConfig(props) .setRecordSerializer(new SelfSerializationSchema("scaleDownTestSink1", new SimpleStringSchema())) .build()); execEnv.execute("ScaleDownTest"); } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25315) Add some extensions and utils to help the Junit5 migration
Hang Ruan created FLINK-25315: - Summary: Add some extensions and utils to help the Junit5 migration Key: FLINK-25315 URL: https://issues.apache.org/jira/browse/FLINK-25315 Project: Flink Issue Type: Technical Debt Components: Tests Reporter: Hang Ruan Add some extensions and utils to help the Junit5 migration. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25104) Kafka table source cannot be used as bounded
Hang Ruan created FLINK-25104: - Summary: Kafka table source cannot be used as bounded Key: FLINK-25104 URL: https://issues.apache.org/jira/browse/FLINK-25104 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Reporter: Hang Ruan In DataStream API, we could use a Kafka source as bounded by `builder.setBounded`. In SQL, there is not a way to set a Kafka table source as bounded. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24697) Kafka table source cannot change the auto.offset.reset setting
Hang Ruan created FLINK-24697: - Summary: Kafka table source cannot change the auto.offset.reset setting Key: FLINK-24697 URL: https://issues.apache.org/jira/browse/FLINK-24697 Project: Flink Issue Type: Improvement Reporter: Hang Ruan Because Flink 1.13 SQL does not use the new Source API in FLIP-27, the behavior to start from group offsets in flink 1.13 will use the kafka 'auto.offset.reset' default value(latest), when the 'auto.offset.reset' configuration is not set in table options. But in flink 1.13 we could change the behavior by setting 'auto.offset.reset' to other values. See the method {{setStartFromGroupOffsets }}under the class {{FlinkKafkaConsumerBase.}} Flink 1.14 uses the new Source API, but we have no ways to change the default 'auto.offset.reset' value when use 'group-offsets' startup mode. In DataStream API, we could change it by `kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy))`. So we need the way to change auto offset reset configuration. The design is that when 'auto.offset.reset' is set, the 'group-offsets' startup mode will use the provided auto offset reset strategy, or else 'none' reset strategy n order to be consistent with the DataStream API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24627) Add some generic junit5 extensions to replace junit4 rules
Hang Ruan created FLINK-24627: - Summary: Add some generic junit5 extensions to replace junit4 rules Key: FLINK-24627 URL: https://issues.apache.org/jira/browse/FLINK-24627 Project: Flink Issue Type: Improvement Components: Tests Reporter: Hang Ruan We have to use junit5 extensions to replace the existed junit4 rules in order to change tests to junit5 in flink. There are some generic rules that should be provided in advance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23854) KafkaSink error when restart from the checkpoint with a lower parallelism
Hang Ruan created FLINK-23854: - Summary: KafkaSink error when restart from the checkpoint with a lower parallelism Key: FLINK-23854 URL: https://issues.apache.org/jira/browse/FLINK-23854 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.14.0 Reporter: Hang Ruan The KafkaSink throws the exception when restarted with a lower parallelism. The exception is like this. {code:java} // code placeholder java.lang.IllegalStateException: Internal error: It is expected that state from previous executions is distributed to the same subtask id.at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)at org.apache.flink.connector.kafka.sink.KafkaWriter.recoverAndInitializeState(KafkaWriter.java:178) at org.apache.flink.connector.kafka.sink.KafkaWriter.(KafkaWriter.java:130) at org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:99) at org.apache.flink.streaming.runtime.operators.sink.SinkOperator.initializeState(SinkOperator.java:134) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:690) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:666) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:785) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:638) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:572)at java.lang.Thread.run(Thread.java:748)Suppressed: java.lang.NullPointerExceptionat org.apache.flink.streaming.runtime.operators.sink.SinkOperator.close(SinkOperator.java:195) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127) at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1028) at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1014) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:927) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:797) ... 4 more {code} I start the kafka cluster(kafka_2.13-2.8.0) and the flink cluster in my own mac. I change the parallelism from 4 to 2 and restart the job from some completed checkpoint. Then the error occurs. And the cli command and the code are as follows. {code:java} // cli command ./bin/flink run -d -c com.test.KafkaExactlyOnceScaleDownTest -s /Users/test/checkpointDir/ExactlyOnceTest1/67105fcc1724e147fc6208af0dd90618/chk-1 /Users/test/project/self/target/test.jar {code} {code:java} public class KafkaExactlyOnceScaleDownTest { public static void main(String[] args) throws Exception { final String kafkaSourceTopic = "flinkSourceTest"; final String kafkaSinkTopic = "flinkSinkExactlyTest1"; final String groupId = "ExactlyOnceTest1"; final String brokers = "localhost:9092"; final String ckDir = "file:///Users/hangruan/checkpointDir/" + groupId; final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(6); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setCheckpointStorage(ckDir); env.setParallelism(4); KafkaSource source = KafkaSource.builder() .setBootstrapServers(brokers) .setTopics(kafkaSourceTopic) .setGroupId(groupId) .setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); DataStream flintstones = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); DataStream adults = flintstones.filter(s -> s != null && s.length() > 2); Properties props = new Properties(); props.setProperty("transaction.timeout.ms", "90"); adults.sinkTo(KafkaSink.builder() .setBootstrapServers(brokers) .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
[jira] [Created] (FLINK-23764) add RuntimeContext to SourceReaderContext
Hang Ruan created FLINK-23764: - Summary: add RuntimeContext to SourceReaderContext Key: FLINK-23764 URL: https://issues.apache.org/jira/browse/FLINK-23764 Project: Flink Issue Type: Improvement Components: Connectors / Common Reporter: Hang Ruan RuntimeContext is the important information for sourceReader. Not only the subtask index, we sometimes need to get other information in RuntimeContext like the operator id. -- This message was sent by Atlassian Jira (v8.3.4#803005)