[GitHub] [flink] reswqa commented on pull request #21953: [FLINK-30989][runtime] Some config options related to sorting and spilling are not valid.
reswqa commented on PR #21953: URL: https://github.com/apache/flink/pull/21953#issuecomment-1435495267 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31041) Race condition in DefaultScheduler results in memory leak and busy loop
[ https://issues.apache.org/jira/browse/FLINK-31041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690678#comment-17690678 ] Zhu Zhu commented on FLINK-31041: - I have assigned you the ticket. [~huwh] Feel free to open a pr for it. > Race condition in DefaultScheduler results in memory leak and busy loop > --- > > Key: FLINK-31041 > URL: https://issues.apache.org/jira/browse/FLINK-31041 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.15.3, 1.16.1 >Reporter: Danny Cranmer >Assignee: Weihua Hu >Priority: Critical > Labels: pull-request-available > Fix For: 1.17.0, 1.15.4, 1.16.2 > > Attachments: failovers.log, flink-31041-heap-dump.png, > test-restart-strategy.log > > > h4. Context > When a job creates multiple sources that use the {{SourceCoordinator}} > (FLIP-27), there is a failure race condition that results in: > * Memory leak of {{ExecutionVertexVersion}} > * Busy loop constantly trying to restart job > * Restart strategy is not respected > This results in the Job Manager becoming unresponsive. > h4. !flink-31041-heap-dump.png! > h4. Reproduction Steps > This can be reproduced by a job that creates multiple sources that fail in > the {{{}SplitEnumerator{}}}. We observed this with multiple {{KafkaSource's}} > trying to load a non-existent cert from the file system and throwing FNFE. > Thus, here is a simple job to reproduce (BE WARNED: running this locally will > lock up your IDE): > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > env.setRestartStrategy(new > RestartStrategies.FailureRateRestartStrategyConfiguration(1, Time.of(10, > TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS))); > KafkaSource source = KafkaSource.builder() > .setProperty("security.protocol", "SASL_SSL") > // SSL configurations > // Configure the path of truststore (CA) provided by the server > .setProperty("ssl.truststore.location", > "/path/to/kafka.client.truststore.jks") > .setProperty("ssl.truststore.password", "test1234") > // Configure the path of keystore (private key) if client > authentication is required > .setProperty("ssl.keystore.location", > "/path/to/kafka.client.keystore.jks") > .setProperty("ssl.keystore.password", "test1234") > // SASL configurations > // Set SASL mechanism as SCRAM-SHA-256 > .setProperty("sasl.mechanism", "SCRAM-SHA-256") > // Set JAAS configurations > .setProperty("sasl.jaas.config", > "org.apache.kafka.common.security.scram.ScramLoginModule required > username=\"username\" password=\"password\";") > .setBootstrapServers("http://localhost:3456;) > .setTopics("input-topic") > .setGroupId("my-group") > .setStartingOffsets(OffsetsInitializer.earliest()) > .setValueOnlyDeserializer(new SimpleStringSchema()) > .build(); > List> sources = IntStream.range(0, 32) > .mapToObj(i -> env > .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka > Source " + i).uid("source-" + i) > .keyBy(s -> s.charAt(0)) > .map(s -> s)) > .collect(Collectors.toList()); > env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka > Source").uid("source") > .keyBy(s -> s.charAt(0)) > .union(sources.toArray(new SingleOutputStreamOperator[] {})) > .print(); > env.execute("test job"); {code} > h4. Root Cause > We can see that the {{OperatorCoordinatorHolder}} already has a [debounce > mechanism|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L609], > however the {{DefaultScheduler}} does not. We need a debounce mechanism in > the {{DefaultScheduler}} since it handles many > {{{}OperatorCoordinatorHolder{}}}. > h4. Fix > I have managed to fix this, I will open a PR, but would need feedback from > people who understand this code better than me! > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31041) Race condition in DefaultScheduler results in memory leak and busy loop
[ https://issues.apache.org/jira/browse/FLINK-31041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu reassigned FLINK-31041: --- Assignee: Weihua Hu (was: Danny Cranmer) > Race condition in DefaultScheduler results in memory leak and busy loop > --- > > Key: FLINK-31041 > URL: https://issues.apache.org/jira/browse/FLINK-31041 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.15.3, 1.16.1 >Reporter: Danny Cranmer >Assignee: Weihua Hu >Priority: Critical > Labels: pull-request-available > Fix For: 1.17.0, 1.15.4, 1.16.2 > > Attachments: failovers.log, flink-31041-heap-dump.png, > test-restart-strategy.log > > > h4. Context > When a job creates multiple sources that use the {{SourceCoordinator}} > (FLIP-27), there is a failure race condition that results in: > * Memory leak of {{ExecutionVertexVersion}} > * Busy loop constantly trying to restart job > * Restart strategy is not respected > This results in the Job Manager becoming unresponsive. > h4. !flink-31041-heap-dump.png! > h4. Reproduction Steps > This can be reproduced by a job that creates multiple sources that fail in > the {{{}SplitEnumerator{}}}. We observed this with multiple {{KafkaSource's}} > trying to load a non-existent cert from the file system and throwing FNFE. > Thus, here is a simple job to reproduce (BE WARNED: running this locally will > lock up your IDE): > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > env.setRestartStrategy(new > RestartStrategies.FailureRateRestartStrategyConfiguration(1, Time.of(10, > TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS))); > KafkaSource source = KafkaSource.builder() > .setProperty("security.protocol", "SASL_SSL") > // SSL configurations > // Configure the path of truststore (CA) provided by the server > .setProperty("ssl.truststore.location", > "/path/to/kafka.client.truststore.jks") > .setProperty("ssl.truststore.password", "test1234") > // Configure the path of keystore (private key) if client > authentication is required > .setProperty("ssl.keystore.location", > "/path/to/kafka.client.keystore.jks") > .setProperty("ssl.keystore.password", "test1234") > // SASL configurations > // Set SASL mechanism as SCRAM-SHA-256 > .setProperty("sasl.mechanism", "SCRAM-SHA-256") > // Set JAAS configurations > .setProperty("sasl.jaas.config", > "org.apache.kafka.common.security.scram.ScramLoginModule required > username=\"username\" password=\"password\";") > .setBootstrapServers("http://localhost:3456;) > .setTopics("input-topic") > .setGroupId("my-group") > .setStartingOffsets(OffsetsInitializer.earliest()) > .setValueOnlyDeserializer(new SimpleStringSchema()) > .build(); > List> sources = IntStream.range(0, 32) > .mapToObj(i -> env > .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka > Source " + i).uid("source-" + i) > .keyBy(s -> s.charAt(0)) > .map(s -> s)) > .collect(Collectors.toList()); > env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka > Source").uid("source") > .keyBy(s -> s.charAt(0)) > .union(sources.toArray(new SingleOutputStreamOperator[] {})) > .print(); > env.execute("test job"); {code} > h4. Root Cause > We can see that the {{OperatorCoordinatorHolder}} already has a [debounce > mechanism|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L609], > however the {{DefaultScheduler}} does not. We need a debounce mechanism in > the {{DefaultScheduler}} since it handles many > {{{}OperatorCoordinatorHolder{}}}. > h4. Fix > I have managed to fix this, I will open a PR, but would need feedback from > people who understand this code better than me! > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31124) Add it case for HiveTableSink speculative execution
[ https://issues.apache.org/jira/browse/FLINK-31124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690677#comment-17690677 ] Zhu Zhu commented on FLINK-31124: - [~csq] Thanks for volunteering! I have assigned you the ticket. > Add it case for HiveTableSink speculative execution > --- > > Key: FLINK-31124 > URL: https://issues.apache.org/jira/browse/FLINK-31124 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Biao Liu >Assignee: chenshuiqiang >Priority: Major > > The part of HiveTableSink has supported speculative execution in > https://issues.apache.org/jira/browse/FLINK-30823. We would like to add some > integration test cases for this feature. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31124) Add it case for HiveTableSink speculative execution
[ https://issues.apache.org/jira/browse/FLINK-31124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-31124: Priority: Major (was: Minor) > Add it case for HiveTableSink speculative execution > --- > > Key: FLINK-31124 > URL: https://issues.apache.org/jira/browse/FLINK-31124 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Biao Liu >Priority: Major > > The part of HiveTableSink has supported speculative execution in > https://issues.apache.org/jira/browse/FLINK-30823. We would like to add some > integration test cases for this feature. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31124) Add it case for HiveTableSink speculative execution
[ https://issues.apache.org/jira/browse/FLINK-31124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu reassigned FLINK-31124: --- Assignee: chenshuiqiang > Add it case for HiveTableSink speculative execution > --- > > Key: FLINK-31124 > URL: https://issues.apache.org/jira/browse/FLINK-31124 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Biao Liu >Assignee: chenshuiqiang >Priority: Major > > The part of HiveTableSink has supported speculative execution in > https://issues.apache.org/jira/browse/FLINK-30823. We would like to add some > integration test cases for this feature. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] 1996fanrui commented on pull request #21960: [FLINK-31036][test][checkpoint] FileSystemCheckpointStorage as the default for test
1996fanrui commented on PR #21960: URL: https://github.com/apache/flink/pull/21960#issuecomment-1435470941 Record some CI here to compare the running time: First CI: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46277=results -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-31049) Add support for Kafka record headers to KafkaSink
[ https://issues.apache.org/jira/browse/FLINK-31049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge reassigned FLINK-31049: --- Assignee: Alex Gout > Add support for Kafka record headers to KafkaSink > - > > Key: FLINK-31049 > URL: https://issues.apache.org/jira/browse/FLINK-31049 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Alex Gout >Assignee: Alex Gout >Priority: Minor > Labels: KafkaSink > Original Estimate: 48h > Remaining Estimate: 48h > > The default org.apache.flink.connector.kafka.sink.KafkaSink does not support > adding Kafka record headers. In some implementations, downstream consumers > might rely on Kafka record headers being set. > > A way to add Headers would be to create a custom > KafkaRecordSerializationSchema and inject that into the KafkaSink. > However, I'm assuming the KafkaRecordSerializationSchemaBuilder was added for > convenience and allows a more usable approach of creating a KafkaSink without > having to deal with details like the RecordProducer directly. This builder > does not support adding record headers. > This is where I think it should be added. > The code responsible for creating the Kafka record involves > org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaWrapper > where the RecordProducer is created. > It is relatively simple to add support for record headers by adding a > "HeaderProducer" to the KafkaRecordSerializationSchemaBuilder next to the key > and value serializers and using the appropriate RecordProducer constructor. > > The issue was discussed > [here|https://lists.apache.org/thread/shlbbcqho0q9w5shjwdlscnsywjvbfro]. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30945) FTS does not support multiple writers into the same table and topic
[ https://issues.apache.org/jira/browse/FLINK-30945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690568#comment-17690568 ] Xinbin Huang commented on FLINK-30945: -- [~lzljs3620320] would you assign this to me? > FTS does not support multiple writers into the same table and topic > --- > > Key: FLINK-30945 > URL: https://issues.apache.org/jira/browse/FLINK-30945 > Project: Flink > Issue Type: Bug > Components: Table Store >Reporter: Vicky Papavasileiou >Priority: Major > > When creating two different streaming jobs that INSERT INTO the same table > and kafka topic, the second job is never able to make progress as the > transaction gets constantly aborted due to the producer getting fenced. > FTS should set the transactionalIdPrefix to avoid transactions of different > jobs clashing. > {code:java} > 2023-02-06 17:13:36,088 WARN org.apache.flink.runtime.taskmanager.Task [] - > Writer -> Global Committer -> Sink: end (1/1)#0 > (8cf4197af9716623c3c19e7fa3d7c071_b5c8d46f3e7b141acf271f12622e752b_0_0) > switched from RUNNING to FAILED with failure cause: > org.apache.flink.util.FlinkRuntimeException: Committing one of transactions > failed, logging first encountered failure at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:323) > at > org.apache.flink.table.store.connector.sink.StoreWriteOperator.notifyCheckpointComplete(StoreWriteOperator.java:175) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:479) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:413) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1412) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$15(StreamTask.java:1353) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$18(StreamTask.java:1392) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at > java.lang.Thread.run(Thread.java:750) Caused by: > org.apache.flink.table.store.shaded.org.apache.kafka.common.errors.ProducerFencedException: > There is a newer producer with the same transactionalId which fences the > current one. {code} > Sample queries: > > > {code:java} > CREATE CATALOG table_store_catalog WITH ( > 'type'='table-store', > 'warehouse'='s3://my-bucket/table-store' > ); > USE CATALOG table_store_catalog; > SET 'execution.checkpointing.interval' = '10 s'; > CREATE TABLE word_count_kafka ( > word STRING PRIMARY KEY NOT ENFORCED, > cnt BIGINT > ) WITH ( > 'log.system' = 'kafka', > 'kafka.bootstrap.servers' = 'broker:9092', > 'kafka.topic' = 'word_count_log' > ); > CREATE TEMPORARY TABLE word_table ( > word STRING > ) WITH ( > 'connector' = 'datagen', > 'fields.word.length' = '1' > ); > {code} > > And the two INSERT jobs: > {code:java} > INSERT INTO word_count_kafka SELECT word, COUNT(*) FROM word_table GROUP BY > word;{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-opensearch] lilyevsky commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch
lilyevsky commented on code in PR #11: URL: https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1110240788 ## flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java: ## @@ -148,6 +150,7 @@ public OpensearchSinkBuilder setBulkFlushMaxActions(int numMaxActions) { * @param maxSizeMb the maximum size of buffered actions, in mb. * @return this builder */ +@SuppressWarnings("UnusedReturnValue") Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31120) ConcurrentModificationException occurred in StringFunctionsITCase.test
[ https://issues.apache.org/jira/browse/FLINK-31120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31120: --- Labels: pull-request-available test-stability (was: test-stability) > ConcurrentModificationException occurred in StringFunctionsITCase.test > -- > > Key: FLINK-31120 > URL: https://issues.apache.org/jira/browse/FLINK-31120 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Priority: Blocker > Labels: pull-request-available, test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334 > {code} > Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 10.725 s <<< FAILURE! - in > org.apache.flink.table.planner.functions.StringFunctionsITCase > Feb 17 04:51:25 [ERROR] > org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4] > Time elapsed: 4.367 s <<< ERROR! > Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute > sql > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974) > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422) > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93) > Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #21961: [FLINK-31120] Add concurrent access control for collectorIterators in StreamExecutionEnvironment
flinkbot commented on PR #21961: URL: https://github.com/apache/flink/pull/21961#issuecomment-1435091671 ## CI report: * d48d6ecdba52849419a0c4a53a977b2ce7562a97 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shuiqiangchen opened a new pull request, #21961: [FLINK-29879] Add concurrent access control for collectorIterators in StreamExecutionEnvironment
shuiqiangchen opened a new pull request, #21961: URL: https://github.com/apache/flink/pull/21961 ## What is the purpose of the change *This change mainly to add a concurrent access control for the static list `collectIterators` in `StreamExecutionEnvironment`. There are still chances to cause a ConcurrentModifyException, although most of time we are accessing StreamExecutionEnvironment in a single thread.* ## Brief change log - Made collectorIterators a synchronizedList in StreamExecutionEnvironment. ## Verifying this change This change has test cases covered by ITCases which are executed in concurrent mode. It's a stability issue that might have a small probability to reproduce a failure case. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch
reta commented on code in PR #11: URL: https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1110194101 ## flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java: ## @@ -148,6 +150,7 @@ public OpensearchSinkBuilder setBulkFlushMaxActions(int numMaxActions) { * @param maxSizeMb the maximum size of buffered actions, in mb. * @return this builder */ +@SuppressWarnings("UnusedReturnValue") Review Comment: Please remove, thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] lilyevsky commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch
lilyevsky commented on code in PR #11: URL: https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1110179327 ## flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java: ## @@ -148,6 +150,7 @@ public OpensearchSinkBuilder setBulkFlushMaxActions(int numMaxActions) { * @param maxSizeMb the maximum size of buffered actions, in mb. * @return this builder */ +@SuppressWarnings("UnusedReturnValue") Review Comment: I put it because compiler gave me a warning about it. I can remove that suppression if you prefer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-31120) ConcurrentModificationException occurred in StringFunctionsITCase.test
[ https://issues.apache.org/jira/browse/FLINK-31120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690523#comment-17690523 ] Shuiqiang Chen edited comment on FLINK-31120 at 2/17/23 6:17 PM: - Maybe it needs concurrent control for access to the static field `collectIterators` in `StreamExecutionEnvironment`. There are four test cases executing when running StringFunctionsITCase in a concurrent execution mode, that has chance for a thread to add a collectorIterator through `registerCollectIterator` while there is a foreach loop in executeAsync() in another thread. was (Author: csq): Maybe tt needs concurrent control for access to the static field `collectIterators` in `StreamExecutionEnvironment`. There are four test cases executing when running StringFunctionsITCase in a concurrent execution mode, that has chance for a thread to add a collectorIterator through `registerCollectIterator` while there is a foreach loop in executeAsync() in another thread. > ConcurrentModificationException occurred in StringFunctionsITCase.test > -- > > Key: FLINK-31120 > URL: https://issues.apache.org/jira/browse/FLINK-31120 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Priority: Blocker > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334 > {code} > Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 10.725 s <<< FAILURE! - in > org.apache.flink.table.planner.functions.StringFunctionsITCase > Feb 17 04:51:25 [ERROR] > org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4] > Time elapsed: 4.367 s <<< ERROR! > Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute > sql > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974) > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422) > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93) > Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31124) Add it case for HiveTableSink speculative execution
[ https://issues.apache.org/jira/browse/FLINK-31124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690524#comment-17690524 ] Shuiqiang Chen commented on FLINK-31124: Hi [~zhuzh] I would like to help finish to issue, could you please assign it to me? > Add it case for HiveTableSink speculative execution > --- > > Key: FLINK-31124 > URL: https://issues.apache.org/jira/browse/FLINK-31124 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Biao Liu >Priority: Minor > > The part of HiveTableSink has supported speculative execution in > https://issues.apache.org/jira/browse/FLINK-30823. We would like to add some > integration test cases for this feature. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31120) ConcurrentModificationException occurred in StringFunctionsITCase.test
[ https://issues.apache.org/jira/browse/FLINK-31120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690523#comment-17690523 ] Shuiqiang Chen commented on FLINK-31120: Maybe tt needs concurrent control for access to the static field `collectIterators` in `StreamExecutionEnvironment`. There are four test cases executing when running StringFunctionsITCase in a concurrent execution mode, that has chance for a thread to add a collectorIterator through `registerCollectIterator` while there is a foreach loop in executeAsync() in another thread. > ConcurrentModificationException occurred in StringFunctionsITCase.test > -- > > Key: FLINK-31120 > URL: https://issues.apache.org/jira/browse/FLINK-31120 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Priority: Blocker > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334 > {code} > Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 10.725 s <<< FAILURE! - in > org.apache.flink.table.planner.functions.StringFunctionsITCase > Feb 17 04:51:25 [ERROR] > org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4] > Time elapsed: 4.367 s <<< ERROR! > Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute > sql > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974) > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422) > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93) > Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] pgaref commented on a diff in pull request #21923: FLINK-13871: Consolidate volatile status fields in StreamTask
pgaref commented on code in PR #21923: URL: https://github.com/apache/flink/pull/21923#discussion_r1110168340 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java: ## @@ -215,6 +215,9 @@ */ private final StreamTaskActionExecutor actionExecutor; +/** Current state of the task, can be any of {@link TaskState}. */ +private TaskState taskState; Review Comment: Yup, good catch Anton! ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java: ## @@ -215,6 +215,9 @@ */ private final StreamTaskActionExecutor actionExecutor; +/** Current state of the task, can be any of {@link TaskState}. */ +private TaskState taskState; Review Comment: Yup, good catch, thanks Anton! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] akalash commented on a diff in pull request #21923: FLINK-13871: Consolidate volatile status fields in StreamTask
akalash commented on code in PR #21923: URL: https://github.com/apache/flink/pull/21923#discussion_r1110137878 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java: ## @@ -215,6 +215,9 @@ */ private final StreamTaskActionExecutor actionExecutor; +/** Current state of the task, can be any of {@link TaskState}. */ +private TaskState taskState; Review Comment: Shouldn't it be volatile? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-13871) Consolidate isRunning, canceled, isFinished fields in StreamTask and SourceStreamTask
[ https://issues.apache.org/jira/browse/FLINK-13871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Kalashnikov reassigned FLINK-13871: - Assignee: Panagiotis Garefalakis > Consolidate isRunning, canceled, isFinished fields in StreamTask and > SourceStreamTask > - > > Key: FLINK-13871 > URL: https://issues.apache.org/jira/browse/FLINK-13871 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Alex >Assignee: Panagiotis Garefalakis >Priority: Not a Priority > Labels: pull-request-available > > {{StreamTask}} has two {{volatile boolean}} fields ({{canceled}}, > {{isFinished}}) and {{SourceStreamTask}} has an additional {{isFinished}} > field. > In practice, those fields are mutually exclusive and reflect different stages > of stream task's lifecycle. It should be possible to represent all three > fields as one (enumerated) state field. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] pgaref commented on pull request #21923: FLINK-13871: Consolidate volatile status fields in StreamTask
pgaref commented on PR #21923: URL: https://github.com/apache/flink/pull/21923#issuecomment-1434978767 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pgaref commented on pull request #21923: FLINK-13871: Consolidate volatile status fields in StreamTask
pgaref commented on PR #21923: URL: https://github.com/apache/flink/pull/21923#issuecomment-1434911355 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] liuyongvs commented on pull request #21958: [FLINK-31118][table] Add ARRAY_UNION function.
liuyongvs commented on PR #21958: URL: https://github.com/apache/flink/pull/21958#issuecomment-1434909262 > Thanks for your contribution. > > To be honest i didn't get the logic. What i did: > > 1. Built it from the PR's branch > 2. Started standalone Flink > 3. Via sqlClient submitted several queries > > ```sql > SELECT array_union(array[1], array[2]); > -- result array[1, 2] > -- this is OK > ``` > > ```sql > SELECT array_union(array[1], array[2, 3]); > -- result array[1, 2, 3] > --- this is OK > ``` > > ```sql > SELECT array_union(array[1], array[2, 3, null]); > -- result array[1, 2, 3, 0] > --- this is NOT OK > ``` > > ```sql > SELECT array_union(array[1], array[map['this is a key', 'this is a value']]); > -- result [1, 68] > -- this is NOT OK > ``` > > ```sql > SELECT array_union(array[1], array['this is a string']); > -- result [1, 16] > -- this is NOT OK > ``` > > ```sql > SELECT array_union(array[1], array[array[1, 2, 3]]); > -- result [1, 24] > -- this is NOT OK > ``` the result is not ok like SELECT array_union(array[1], array[array[1, 2, 3]]); SELECT array_union(array[1], array['this is a string']); i said before this two array type should be exception. we should do it like ARRAY_CONCAT is calcite. but i do not find a good way to express it in flink. so i ask you do you have a good implements and it is important for other array op like array_except/array_intersect.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] liuyongvs closed pull request #21958: [FLINK-31118][table] Add ARRAY_UNION function.
liuyongvs closed pull request #21958: [FLINK-31118][table] Add ARRAY_UNION function. URL: https://github.com/apache/flink/pull/21958 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch
reta commented on code in PR #11: URL: https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1110068155 ## flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java: ## @@ -148,6 +150,7 @@ public OpensearchSinkBuilder setBulkFlushMaxActions(int numMaxActions) { * @param maxSizeMb the maximum size of buffered actions, in mb. * @return this builder */ +@SuppressWarnings("UnusedReturnValue") Review Comment: I don't understand why these suppression are needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] liuyongvs commented on pull request #21958: [FLINK-31118][table] Add ARRAY_UNION function.
liuyongvs commented on PR #21958: URL: https://github.com/apache/flink/pull/21958#issuecomment-1434905063 > ```sql > SELECT array_union(array[1], array[array[1, 2, 3]]); > ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch
reta commented on code in PR #11: URL: https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1110066429 ## flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java: ## @@ -122,10 +128,11 @@ } catch (Exception e) { throw new FlinkRuntimeException("Failed to open the OpensearchEmitter", e); } +this.failureHandler = failureHandler; } @Override -public void write(IN element, Context context) throws IOException, InterruptedException { +public void write(IN element, Context context) throws InterruptedException { Review Comment: Please revert these changes, they violate the `SinkWriter` interface [1] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SinkWriter.java#L41 ## flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java: ## @@ -134,7 +141,7 @@ public void write(IN element, Context context) throws IOException, InterruptedEx } @Override -public void flush(boolean endOfInput) throws IOException, InterruptedException { +public void flush(boolean endOfInput) throws InterruptedException { Review Comment: Same, please revert -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] liuyongvs commented on a diff in pull request #21947: [FLINK-31098][table] Add ARRAY_SIZE function.
liuyongvs commented on code in PR #21947: URL: https://github.com/apache/flink/pull/21947#discussion_r1110053725 ## docs/data/sql_functions.yml: ## @@ -617,6 +617,12 @@ collection: - sql: ARRAY_DISTINCT(haystack) table: haystack.arrayDistinct() description: Returns an array with unique elements. If the array itself is null, the function will return null. Keeps ordering of elements. + - sql: ARRAY_REMOVE(haystack, needle) +table: haystack.arrayRemove(needle) +description: Remove all elements that equal to element from array. If the array itself is null, the function will return null. + - sql: ARRAY_SIZE(haystack) +table: haystack.arraySize() +description: Returns the size of an array. If the array itself is null, the function will return null. Review Comment: thanks for your details response, i will drop the array_size commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] lilyevsky commented on pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch
lilyevsky commented on PR #11: URL: https://github.com/apache/flink-connector-opensearch/pull/11#issuecomment-1434889977 @reta I committed the changes you suggested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] liuyongvs commented on a diff in pull request #21947: [FLINK-31098][table] Add ARRAY_SIZE function.
liuyongvs commented on code in PR #21947: URL: https://github.com/apache/flink/pull/21947#discussion_r1110050019 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -178,6 +178,35 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) .runtimeClass( "org.apache.flink.table.runtime.functions.scalar.ArrayDistinctFunction") .build(); + +public static final BuiltInFunctionDefinition ARRAY_SIZE = +BuiltInFunctionDefinition.newBuilder() +.name("ARRAY_SIZE") +.kind(SCALAR) +.inputTypeStrategy( +sequence( +Collections.singletonList("haystack"), + Collections.singletonList(logical(LogicalTypeRoot.ARRAY +.outputTypeStrategy( +nullableIfArgs(ConstantArgumentCount.of(0), explicit(DataTypes.INT( +.runtimeClass( + "org.apache.flink.table.runtime.functions.scalar.ArraySizeFunction") +.build(); + +public static final BuiltInFunctionDefinition ARRAY_REMOVE = +BuiltInFunctionDefinition.newBuilder() +.name("ARRAY_REMOVE") +.kind(SCALAR) +.inputTypeStrategy( +sequence( +Arrays.asList("haystack", "needle"), +Arrays.asList( +logical(LogicalTypeRoot.ARRAY), ARRAY_ELEMENT_ARG))) Review Comment: i test SELECT array_contains(array[1, 2, 3], cast(null as int)) it also fails with exception. the reason is ARRAY_ELEMENT_ARG make the second arg must be same with array element type Caused by: org.apache.flink.table.api.ValidationException: Invalid function call: array_remove(ARRAY NOT NULL, INT). so select array_remove(array[1, 2, 3, null], cast(null as int)); return array[1, 2, 3]. do we should also fix the array_contains. ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -178,6 +178,35 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) .runtimeClass( "org.apache.flink.table.runtime.functions.scalar.ArrayDistinctFunction") .build(); + +public static final BuiltInFunctionDefinition ARRAY_SIZE = +BuiltInFunctionDefinition.newBuilder() +.name("ARRAY_SIZE") +.kind(SCALAR) +.inputTypeStrategy( +sequence( +Collections.singletonList("haystack"), + Collections.singletonList(logical(LogicalTypeRoot.ARRAY +.outputTypeStrategy( +nullableIfArgs(ConstantArgumentCount.of(0), explicit(DataTypes.INT( +.runtimeClass( + "org.apache.flink.table.runtime.functions.scalar.ArraySizeFunction") +.build(); + +public static final BuiltInFunctionDefinition ARRAY_REMOVE = +BuiltInFunctionDefinition.newBuilder() +.name("ARRAY_REMOVE") +.kind(SCALAR) +.inputTypeStrategy( +sequence( +Arrays.asList("haystack", "needle"), +Arrays.asList( +logical(LogicalTypeRoot.ARRAY), ARRAY_ELEMENT_ARG))) Review Comment: i test SELECT array_contains(array[1, 2, 3], cast(null as int)) it also fails with exception. the reason is ARRAY_ELEMENT_ARG make the second arg must be same with array element type Caused by: org.apache.flink.table.api.ValidationException: Invalid function call: array_remove(ARRAY NOT NULL, INT). so select array_remove(array[1, 2, 3, null], cast(null as int)); return array[1, 2, 3]. do we should also fix the array_contains. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] lilyevsky commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch
lilyevsky commented on code in PR #11: URL: https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1110040361 ## flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java: ## @@ -174,7 +175,8 @@ void testIncrementByteOutMetric() throws Exception { new BulkProcessorConfig(flushAfterNActions, -1, -1, FlushBackoffType.NONE, 0, 0); try (final OpensearchWriter> writer = -createWriter(index, false, bulkProcessorConfig, metricGroup)) { +createWriter( Review Comment: Agreed, it will be better not to touch that line. I did it because it needed to pass the metricGroup. So I am fixing the situation by adding yet another constructor, with metricGroup but without failureHandler. Also in the main constructor put the failureHandler at the end of the parameters list. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch
reta commented on code in PR #11: URL: https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1110047438 ## flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java: ## @@ -174,7 +175,8 @@ void testIncrementByteOutMetric() throws Exception { new BulkProcessorConfig(flushAfterNActions, -1, -1, FlushBackoffType.NONE, 0, 0); try (final OpensearchWriter> writer = -createWriter(index, false, bulkProcessorConfig, metricGroup)) { +createWriter( Review Comment: No need for constructor, `createWriter` is a method in this class -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] lilyevsky commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch
lilyevsky commented on code in PR #11: URL: https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1110040361 ## flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java: ## @@ -174,7 +175,8 @@ void testIncrementByteOutMetric() throws Exception { new BulkProcessorConfig(flushAfterNActions, -1, -1, FlushBackoffType.NONE, 0, 0); try (final OpensearchWriter> writer = -createWriter(index, false, bulkProcessorConfig, metricGroup)) { +createWriter( Review Comment: Agreed, it will be better not to touch that line. I did it because it needed to pass the metricGroup. So I am fixing the situation by adding yet another constructor, with metricGroup but without failureHandler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on a diff in pull request #21960: [FLINK-31036][test][checkpoint] FileSystemCheckpointStorage as the default for test
1996fanrui commented on code in PR #21960: URL: https://github.com/apache/flink/pull/21960#discussion_r1110001094 ## flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java: ## @@ -199,6 +200,9 @@ private void startMiniCluster() throws Exception { new Configuration(miniClusterResourceConfiguration.getConfiguration()); configuration.setString( CoreOptions.TMP_DIRS, temporaryFolder.newFolder().getAbsolutePath()); +configuration.set( +CheckpointingOptions.CHECKPOINTS_DIRECTORY, +temporaryFolder.newFolder().toURI().toString()); Review Comment: FileSystemCheckpointStorage is compatible with more scenarios than JobManagerCheckpointStorage, for example: scenarios with medium or large state sizes. But its performance may be a little bit worse in small state scenarios. I can fix the bug that the test fails, and then we look at the test running time. If there is no significant increase, `FileSystemCheckpointStorage` can be used as the default, what do you think? Actually, I don't think there be much impact on CI running time after FileSystemCheckpointStorage is the default, because `JobManagerCheckpointStorage` just works when state size < 5MB, it's small state. Small state should not consume too much time when using FileSystemCheckpointStorage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pnowojski commented on a diff in pull request #21960: [FLINK-31036][test][checkpoint] FileSystemCheckpointStorage as the default for test
pnowojski commented on code in PR #21960: URL: https://github.com/apache/flink/pull/21960#discussion_r1110017809 ## flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java: ## @@ -199,6 +200,9 @@ private void startMiniCluster() throws Exception { new Configuration(miniClusterResourceConfiguration.getConfiguration()); configuration.setString( CoreOptions.TMP_DIRS, temporaryFolder.newFolder().getAbsolutePath()); +configuration.set( +CheckpointingOptions.CHECKPOINTS_DIRECTORY, +temporaryFolder.newFolder().toURI().toString()); Review Comment: > If there is no significant increase, FileSystemCheckpointStorage can be used as the default, what do you think? That might be ok. In that case we would also need to check the benchmark results. @zentol had another suggestion to instead increase Akka's frame size and keep the memory storage? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #21947: [FLINK-31098][table] Add ARRAY_SIZE function.
snuyanzin commented on code in PR #21947: URL: https://github.com/apache/flink/pull/21947#discussion_r1110013444 ## docs/data/sql_functions.yml: ## @@ -617,6 +617,12 @@ collection: - sql: ARRAY_DISTINCT(haystack) table: haystack.arrayDistinct() description: Returns an array with unique elements. If the array itself is null, the function will return null. Keeps ordering of elements. + - sql: ARRAY_REMOVE(haystack, needle) +table: haystack.arrayRemove(needle) +description: Remove all elements that equal to element from array. If the array itself is null, the function will return null. + - sql: ARRAY_SIZE(haystack) +table: haystack.arraySize() +description: Returns the size of an array. If the array itself is null, the function will return null. Review Comment: Well that's very arguable since different engines support different syntax BigQuery has `ARRAY_LENGTH` Hive has `SIZE` Vertica has `ARRAY_LENGTH` Postgresql `ARRAY_LENGTH` why to support only snowflake and spark? Support of everything seems to be an overkill... I guess somewhere it should be stopped -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on a diff in pull request #21960: [FLINK-31036][test][checkpoint] FileSystemCheckpointStorage as the default for test
1996fanrui commented on code in PR #21960: URL: https://github.com/apache/flink/pull/21960#discussion_r1110001094 ## flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java: ## @@ -199,6 +200,9 @@ private void startMiniCluster() throws Exception { new Configuration(miniClusterResourceConfiguration.getConfiguration()); configuration.setString( CoreOptions.TMP_DIRS, temporaryFolder.newFolder().getAbsolutePath()); +configuration.set( +CheckpointingOptions.CHECKPOINTS_DIRECTORY, +temporaryFolder.newFolder().toURI().toString()); Review Comment: FileSystemCheckpointStorage is compatible with more scenarios than JobManagerCheckpointStorage, for example: scenarios with medium or large state sizes. But its performance may be a little bit worse in small state scenarios. I can fix the bug that the test fails, and then we look at the test duration. If there is no significant increase, `FileSystemCheckpointStorage` can be used as the default, what do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] liuyongvs commented on a diff in pull request #21947: [FLINK-31098][table] Add ARRAY_SIZE function.
liuyongvs commented on code in PR #21947: URL: https://github.com/apache/flink/pull/21947#discussion_r1109996439 ## docs/data/sql_functions.yml: ## @@ -617,6 +617,12 @@ collection: - sql: ARRAY_DISTINCT(haystack) table: haystack.arrayDistinct() description: Returns an array with unique elements. If the array itself is null, the function will return null. Keeps ordering of elements. + - sql: ARRAY_REMOVE(haystack, needle) +table: haystack.arrayRemove(needle) +description: Remove all elements that equal to element from array. If the array itself is null, the function will return null. + - sql: ARRAY_SIZE(haystack) +table: haystack.arraySize() +description: Returns the size of an array. If the array itself is null, the function will return null. Review Comment: To align the other engines like spark,snowflake, so that users can migrate easily -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-mongodb] zentol commented on a diff in pull request #3: [FLINK-31063] Prevent duplicate reading when restoring from a checkpoint.
zentol commented on code in PR #3: URL: https://github.com/apache/flink-connector-mongodb/pull/3#discussion_r1109995851 ## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/split/MongoSourceSplitState.java: ## @@ -19,28 +19,19 @@ import org.apache.flink.annotation.PublicEvolving; +import org.bson.BsonDocument; + /** MongoDB source split state. */ @PublicEvolving -public class MongoSourceSplitState { +public abstract class MongoSourceSplitState { Review Comment: Maybe consider introducing a `MongoSourceSplitState` interface instead. We have then more freedom to change the internals than when we make the implementation itself part of the API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pnowojski commented on a diff in pull request #21960: [FLINK-31036][test][checkpoint] FileSystemCheckpointStorage as the default for test
pnowojski commented on code in PR #21960: URL: https://github.com/apache/flink/pull/21960#discussion_r1109989565 ## flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java: ## @@ -199,6 +200,9 @@ private void startMiniCluster() throws Exception { new Configuration(miniClusterResourceConfiguration.getConfiguration()); configuration.setString( CoreOptions.TMP_DIRS, temporaryFolder.newFolder().getAbsolutePath()); +configuration.set( +CheckpointingOptions.CHECKPOINTS_DIRECTORY, +temporaryFolder.newFolder().toURI().toString()); Review Comment: I think with your change I would still set the `FileSystemCheckpointStorage` everytime we are using unaligned checkpoints. So the question is whether we want to use it by default always, or only for unaligned checkpoints. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on a diff in pull request #21960: [FLINK-31036][test][checkpoint] FileSystemCheckpointStorage as the default for test
1996fanrui commented on code in PR #21960: URL: https://github.com/apache/flink/pull/21960#discussion_r1109958093 ## flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java: ## @@ -199,6 +200,9 @@ private void startMiniCluster() throws Exception { new Configuration(miniClusterResourceConfiguration.getConfiguration()); configuration.setString( CoreOptions.TMP_DIRS, temporaryFolder.newFolder().getAbsolutePath()); +configuration.set( +CheckpointingOptions.CHECKPOINTS_DIRECTORY, +temporaryFolder.newFolder().toURI().toString()); Review Comment: Unaligned checkpoint or change log state backend are enabled in the `TestStreamEnvironment#randomizeConfiguration`. And I see a large number of tests calling `FsStateChangelogStorageFactory.configure()` to Configure DFS DSTL in FLINK-23279 [1], including: `StateChangelogOptions.STATE_CHANGE_LOG_STORAGE` and `FsStateChangelogOptions.BASE_PATH`. If we are worried about increasing the running time, we can set `CheckpointingOptions.CHECKPOINT_STORAGE` and `CheckpointingOptions.CHECKPOINTS_DIRECTORY` in a similar way, what do you think? [1] commit link: https://github.com/apache/flink/pull/16685/commits/9cc4fe09de5a540390d2d5ad1a72f551c0d0e875 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #21934: [FLINK-27998][table] Upgrade Calcite to 1.30.0
snuyanzin commented on code in PR #21934: URL: https://github.com/apache/flink/pull/21934#discussion_r1109964501 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TimeIndicatorRelDataType.scala: ## @@ -28,11 +28,11 @@ import java.lang * basic SQL type. */ class TimeIndicatorRelDataType( -val typeSystem: RelDataTypeSystem, +val typeSystemField: RelDataTypeSystem, Review Comment: Since `typeSystem` in `BasicSqlType` is now protected -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on a diff in pull request #21960: [FLINK-31036][test][checkpoint] FileSystemCheckpointStorage as the default for test
1996fanrui commented on code in PR #21960: URL: https://github.com/apache/flink/pull/21960#discussion_r1109958093 ## flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java: ## @@ -199,6 +200,9 @@ private void startMiniCluster() throws Exception { new Configuration(miniClusterResourceConfiguration.getConfiguration()); configuration.setString( CoreOptions.TMP_DIRS, temporaryFolder.newFolder().getAbsolutePath()); +configuration.set( +CheckpointingOptions.CHECKPOINTS_DIRECTORY, +temporaryFolder.newFolder().toURI().toString()); Review Comment: Unaligned checkpoint or change log state backend are enabled in the `TestStreamEnvironment#randomizeConfiguration`. And I see a large number of tests calling `FsStateChangelogStorageFactory.configure()` to Configure DFS DSTL in FLINK-23279, including: `StateChangelogOptions.STATE_CHANGE_LOG_STORAGE` and `FsStateChangelogOptions.BASE_PATH`. If we are worried about increasing the running time, we can set `CheckpointingOptions.CHECKPOINT_STORAGE` and `CheckpointingOptions.CHECKPOINTS_DIRECTORY` in a similar way, what do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-15550) testCancelTaskExceptionAfterTaskMarkedFailed failed on azure
[ https://issues.apache.org/jira/browse/FLINK-15550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Kalashnikov closed FLINK-15550. - Fix Version/s: 1.16.2 Resolution: Fixed > testCancelTaskExceptionAfterTaskMarkedFailed failed on azure > > > Key: FLINK-15550 > URL: https://issues.apache.org/jira/browse/FLINK-15550 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.11.0, 1.12.5, 1.13.6, 1.14.3, 1.16.0 >Reporter: Yun Tang >Assignee: Anton Kalashnikov >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.17.0, 1.16.2 > > > Instance: > https://dev.azure.com/rmetzger/Flink/_build/results?buildId=4241=ms.vss-test-web.build-test-results-tab=12434=108939=debug > {code:java} > java.lang.AssertionError: expected: but was: > at > org.apache.flink.runtime.taskmanager.TaskTest.testCancelTaskExceptionAfterTaskMarkedFailed(TaskTest.java:525) > {code} > {code:java} > expected: but was: > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-15550) testCancelTaskExceptionAfterTaskMarkedFailed failed on azure
[ https://issues.apache.org/jira/browse/FLINK-15550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690444#comment-17690444 ] Anton Kalashnikov commented on FLINK-15550: --- Merged to release-1.16: 658ba8eb, a3c9eb40, cb9f183f > testCancelTaskExceptionAfterTaskMarkedFailed failed on azure > > > Key: FLINK-15550 > URL: https://issues.apache.org/jira/browse/FLINK-15550 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.11.0, 1.12.5, 1.13.6, 1.14.3, 1.16.0 >Reporter: Yun Tang >Assignee: Anton Kalashnikov >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.17.0 > > > Instance: > https://dev.azure.com/rmetzger/Flink/_build/results?buildId=4241=ms.vss-test-web.build-test-results-tab=12434=108939=debug > {code:java} > java.lang.AssertionError: expected: but was: > at > org.apache.flink.runtime.taskmanager.TaskTest.testCancelTaskExceptionAfterTaskMarkedFailed(TaskTest.java:525) > {code} > {code:java} > expected: but was: > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch
reta commented on code in PR #11: URL: https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1109948032 ## flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/FailureHandler.java: ## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.sink; + +import org.apache.flink.annotation.Public; + +import java.io.Serializable; + +/** Handler to process failures. */ +@Public Review Comment: `@Public` -> `@PublicEvolving` would be probably better (to leave some space to evolve the API) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta commented on pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch
reta commented on PR #11: URL: https://github.com/apache/flink-connector-opensearch/pull/11#issuecomment-1434782202 @lilyevsky a few minor comments, LGTM otherwise! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch
reta commented on code in PR #11: URL: https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1109943802 ## flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java: ## @@ -174,7 +175,8 @@ void testIncrementByteOutMetric() throws Exception { new BulkProcessorConfig(flushAfterNActions, -1, -1, FlushBackoffType.NONE, 0, 0); try (final OpensearchWriter> writer = -createWriter(index, false, bulkProcessorConfig, metricGroup)) { +createWriter( Review Comment: You probably don't need this change since `createWriter` has an overloaded version without failure handler -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch
reta commented on code in PR #11: URL: https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1109940919 ## flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java: ## @@ -238,19 +240,68 @@ void testCurrentSendTime() throws Exception { } } +private class TestHandler implements FailureHandler { Review Comment: Probably `private static` is better -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch
reta commented on code in PR #11: URL: https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1109941466 ## flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java: ## @@ -238,19 +240,68 @@ void testCurrentSendTime() throws Exception { } } +private class TestHandler implements FailureHandler { +private boolean failed = false; + +private synchronized void setFailed() { +failed = true; +} + +public boolean isFailed() { +return failed; +} + +@java.lang.Override +public void onFailure(Throwable failure) { +setFailed(); +} +} + +@Test +void testWriteErrorOnUpdate() throws Exception { +final String index = "test-bulk-flush-with-error"; +final int flushAfterNActions = 1; +final BulkProcessorConfig bulkProcessorConfig = +new BulkProcessorConfig(flushAfterNActions, -1, -1, FlushBackoffType.NONE, 0, 0); + +final TestHandler testHandler = new TestHandler(); +try (final OpensearchWriter> writer = +createWriter(index, true, bulkProcessorConfig, testHandler)) { +// Trigger an error by updating non-existing document +writer.write(Tuple2.of(1, "u" + buildMessage(1)), null); +context.assertThatIdsAreNotWritten(index, 1); +assertThat(testHandler.isFailed()).isEqualTo(true); Review Comment: :+1: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch
reta commented on code in PR #11: URL: https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1109940526 ## flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java: ## @@ -238,19 +240,68 @@ void testCurrentSendTime() throws Exception { } } +private class TestHandler implements FailureHandler { +private boolean failed = false; + +private synchronized void setFailed() { +failed = true; +} + +public boolean isFailed() { +return failed; +} + +@java.lang.Override Review Comment: Super nit, just `@Override` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch
reta commented on code in PR #11: URL: https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1109939155 ## flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java: ## @@ -103,7 +107,8 @@ BulkProcessorConfig bulkProcessorConfig, NetworkClientConfig networkClientConfig, SinkWriterMetricGroup metricGroup, -MailboxExecutor mailboxExecutor) { +MailboxExecutor mailboxExecutor, Review Comment: Please update javadocs with `failureHandler` arg -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] akalash merged pull request #21919: [FLINK-15550][runtime] The static latches in TaskTest were replaced by latches from invokable objects
akalash merged PR #21919: URL: https://github.com/apache/flink/pull/21919 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] lindong28 commented on pull request #212: [FLINK-31125] Flink ML benchmark framework should minimize the source operator overhead
lindong28 commented on PR #212: URL: https://github.com/apache/flink-ml/pull/212#issuecomment-1434767219 @zhipeng93 Can you help review this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pnowojski commented on a diff in pull request #21960: [FLINK-31036][test][checkpoint] FileSystemCheckpointStorage as the default for test
pnowojski commented on code in PR #21960: URL: https://github.com/apache/flink/pull/21960#discussion_r1109920354 ## flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java: ## @@ -199,6 +200,9 @@ private void startMiniCluster() throws Exception { new Configuration(miniClusterResourceConfiguration.getConfiguration()); configuration.setString( CoreOptions.TMP_DIRS, temporaryFolder.newFolder().getAbsolutePath()); +configuration.set( +CheckpointingOptions.CHECKPOINTS_DIRECTORY, +temporaryFolder.newFolder().toURI().toString()); Review Comment: Question, should we have this changed for everything using mini cluster, or only for our tests that are using unaligned checkpoints? 樂 Maybe for example because of benchmarks, and overall running times (costs of our CI), we should keep using memory storage if not needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31095) FileSink doesn't work with s3a on EKS
[ https://issues.apache.org/jira/browse/FLINK-31095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690431#comment-17690431 ] Martijn Visser commented on FLINK-31095: I did a quick check on the PR belonging to FLINK-23487 which includes this comment from [~airblader] in https://github.com/apache/flink/pull/16592/files#r679835626 | For Hadoop you need special configuration because Hadoop ships its own credentials provider chain mechanism. I am by no means an S3 or a Hadoop expert, but shouldn't it then be possible to follow https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/assumed_roles.html to setup IRSA? > FileSink doesn't work with s3a on EKS > - > > Key: FLINK-31095 > URL: https://issues.apache.org/jira/browse/FLINK-31095 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.16.1 >Reporter: Sylvia Lin >Priority: Major > > FileSink gives below exception on AWS EKS cluster: > {code:java} > Caused by: java.lang.UnsupportedOperationException: This s3 file system > implementation does not support recoverable writers. > at > org.apache.flink.fs.s3.common.FlinkS3FileSystem.createRecoverableWriter(FlinkS3FileSystem.java:136) > ~[?:?] > at > org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:134) > ~[flink-dist-1.16.1.jar:1.16.1] > at > org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.createBucketWriter(FileSink.java:475) > ~[flink-connector-files-1.16.1.jar:1.16.1] > at > org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer(FileSink.java:466) > ~[flink-connector-files-1.16.1.jar:1.16.1] > at > org.apache.flink.connector.file.sink.FileSink.getCommittableSerializer(FileSink.java:175) > ~[flink-connector-files-1.16.1.jar:1.16.1]{code} > [https://github.com/apache/flink/blob/278dc7b793303d228f7816585054629708983af6/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java#LL136C16-L136C16] > And this may be related to > https://issues.apache.org/jira/browse/FLINK-23487?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31036) StateCheckpointedITCase timed out
[ https://issues.apache.org/jira/browse/FLINK-31036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-31036: --- Fix Version/s: 1.17.0 1.18.0 > StateCheckpointedITCase timed out > - > > Key: FLINK-31036 > URL: https://issues.apache.org/jira/browse/FLINK-31036 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Tests >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available, test-stability > Fix For: 1.17.0, 1.18.0 > > Attachments: image-2023-02-16-20-29-52-050.png > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46023=logs=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3=0c010d0c-3dec-5bf1-d408-7b18988b1b2b=10608 > {code} > "Legacy Source Thread - Source: Custom Source -> Filter (6/12)#69980" > #13718026 prio=5 os_prio=0 tid=0x7f05f44f0800 nid=0x128157 waiting on > condition [0x7f059feef000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xf0a974e8> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > at > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:384) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:356) > at > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:414) > at > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:390) > at > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:328) > at > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:161) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:91) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:59) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:31) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils$$Lambda$1311/1256184070.accept(Unknown > Source) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513) > - locked <0xd55035c0> (a java.lang.Object) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103) > at > org.apache.flink.test.checkpointing.StateCheckpointedITCase$StringGeneratingSourceFunction.run(StateCheckpointedITCase.java:178) > - locked <0xd55035c0> (a java.lang.Object) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) > at >
[jira] [Updated] (FLINK-31125) Flink ML benchmark framework should minimize the source operator overhead
[ https://issues.apache.org/jira/browse/FLINK-31125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31125: --- Labels: pull-request-available (was: ) > Flink ML benchmark framework should minimize the source operator overhead > - > > Key: FLINK-31125 > URL: https://issues.apache.org/jira/browse/FLINK-31125 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > Labels: pull-request-available > Fix For: ml-2.2.0 > > > Flink ML benchmark framework estimates the throughput by having a source > operator generate a given number (e.g. 10^7) of input records with random > values, let the given AlgoOperator process these input records, and divide > the number of records by the total execution time. > The overhead of generating random values for all input records has observable > impact on the estimated throughput. We would like to minimize the overhead of > the source operator so that the benchmark result can focus on the throughput > of the AlgoOperator as much as possible. > Note that [spark-sql-perf|https://github.com/databricks/spark-sql-perf] > generates all input records in advance into memory before running the > benchmark. This allows Spark ML benchmark to read records from memory instead > of generating values for those records during the benchmark. > We can generate value once and re-use it for all input records. This approach > minimizes the source operator head and allows us to compare Flink ML > benchmark result with Spark ML benchmark result (from spark-sql-perf) fairly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-ml] lindong28 opened a new pull request, #212: [FLINK-31125] Flink ML benchmark framework should minimize the source operator overhead
lindong28 opened a new pull request, #212: URL: https://github.com/apache/flink-ml/pull/212 ## What is the purpose of the change Minimize the source operator overhead in Flink ML benchmark. ## Brief change log - Updated RowGenerator to generate row value once and re-use the value for all emitted rows. - Renamed RowGenerator#nextRow to RowGenerator#getRow. This is because there is no order of rows emitted by `nextRow`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31125) Flink ML benchmark framework should minimize the source operator overhead
[ https://issues.apache.org/jira/browse/FLINK-31125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated FLINK-31125: - Description: Flink ML benchmark framework estimates the throughput by having a source operator generate a given number (e.g. 10^7) of input records with random values, let the given AlgoOperator process these input records, and divide the number of records by the total execution time. The overhead of generating random values for all input records has observable impact on the estimated throughput. We would like to minimize the overhead of the source operator so that the benchmark result can focus on the throughput of the AlgoOperator as much as possible. Note that [spark-sql-perf|https://github.com/databricks/spark-sql-perf] generates all input records in advance into memory before running the benchmark. This allows Spark ML benchmark to read records from memory instead of generating values for those records during the benchmark. We can generate value once and re-use it for all input records. This approach minimizes the source operator head and allows us to compare Flink ML benchmark result with Spark ML benchmark result (from spark-sql-perf) fairly. was: Flink ML benchmark framework estimates the throughput by having a source operator generate a given number (e.g. 10^7) of input records with random values, let the given AlgoOperator process these input records, and divide the number of records by the total execution time. The overhead of generating random values for all input records has observable impact on the estimated throughput. We would like to minimize the overhead of the source operator so that the benchmark result can focus on the throughput of the AlgoOperator as much as possible. Note that [spark-sql-perf|https://github.com/databricks/spark-sql-perf] generates all input records in advance into memory before running the benchmark. This allows Spark ML benchmark to read records from memory instead of generating values for those records during the benchmark. We can generate value once and re-use it for all input records. This approach minimizes the overhead of source operator and allow us to compare the Flink ML benchmark result with Spark ML benchmark result (using spark-sql-perf) fairly. > Flink ML benchmark framework should minimize the source operator overhead > - > > Key: FLINK-31125 > URL: https://issues.apache.org/jira/browse/FLINK-31125 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > Fix For: ml-2.2.0 > > > Flink ML benchmark framework estimates the throughput by having a source > operator generate a given number (e.g. 10^7) of input records with random > values, let the given AlgoOperator process these input records, and divide > the number of records by the total execution time. > The overhead of generating random values for all input records has observable > impact on the estimated throughput. We would like to minimize the overhead of > the source operator so that the benchmark result can focus on the throughput > of the AlgoOperator as much as possible. > Note that [spark-sql-perf|https://github.com/databricks/spark-sql-perf] > generates all input records in advance into memory before running the > benchmark. This allows Spark ML benchmark to read records from memory instead > of generating values for those records during the benchmark. > We can generate value once and re-use it for all input records. This approach > minimizes the source operator head and allows us to compare Flink ML > benchmark result with Spark ML benchmark result (from spark-sql-perf) fairly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-opensearch] lilyevsky commented on pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch
lilyevsky commented on PR #11: URL: https://github.com/apache/flink-connector-opensearch/pull/11#issuecomment-1434732687 @reta Added the test. I think now it is good. I also tested the test itself, by simulating the "bad" handler, making the test fail, just to make sure it checks it properly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31125) Flink ML benchmark framework should minimize the source operator overhead
[ https://issues.apache.org/jira/browse/FLINK-31125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated FLINK-31125: - Summary: Flink ML benchmark framework should minimize the source operator overhead (was: Flink ML benchmark result should not include data generation overhead) > Flink ML benchmark framework should minimize the source operator overhead > - > > Key: FLINK-31125 > URL: https://issues.apache.org/jira/browse/FLINK-31125 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > Fix For: ml-2.2.0 > > > Flink ML benchmark framework estimates the throughput by having a source > operator generate a given number (e.g. 10^7) of input records with random > values, let the given AlgoOperator process these input records, and divide > the number of records by the total execution time. > The overhead of generating random values for all input records has observable > impact on the estimated throughput. We would like to minimize the overhead of > the source operator so that the benchmark result can focus on the throughput > of the AlgoOperator as much as possible. > Note that [spark-sql-perf|https://github.com/databricks/spark-sql-perf] > generates all input records in advance into memory before running the > benchmark. This allows Spark ML benchmark to read records from memory instead > of generating values for those records during the benchmark. > We can generate value once and re-use it for all input records. This approach > minimizes the overhead of source operator and allow us to compare the Flink > ML benchmark result with Spark ML benchmark result (using spark-sql-perf) > fairly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31125) Flink ML benchmark result should not include data generation overhead
[ https://issues.apache.org/jira/browse/FLINK-31125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated FLINK-31125: - Description: Flink ML benchmark framework estimates the throughput by having a source operator generate a given number (e.g. 10^7) of input records with random values, let the given AlgoOperator process these input records, and divide the number of records by the total execution time. The overhead of generating random values for all input records has observable impact on the estimated throughput. We would like to minimize the overhead of the source operator so that the benchmark result can focus on the throughput of the AlgoOperator as much as possible. Note that [spark-sql-perf|https://github.com/databricks/spark-sql-perf] generates all input records in advance into memory before running the benchmark. This allows Spark ML benchmark to read records from memory instead of generating values for those records during the benchmark. We can generate value once and re-use it for all input records. This approach minimizes the overhead of source operator and allow us to compare the Flink ML benchmark result with Spark ML benchmark result (using spark-sql-perf) fairly. > Flink ML benchmark result should not include data generation overhead > - > > Key: FLINK-31125 > URL: https://issues.apache.org/jira/browse/FLINK-31125 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > Fix For: ml-2.2.0 > > > Flink ML benchmark framework estimates the throughput by having a source > operator generate a given number (e.g. 10^7) of input records with random > values, let the given AlgoOperator process these input records, and divide > the number of records by the total execution time. > The overhead of generating random values for all input records has observable > impact on the estimated throughput. We would like to minimize the overhead of > the source operator so that the benchmark result can focus on the throughput > of the AlgoOperator as much as possible. > Note that [spark-sql-perf|https://github.com/databricks/spark-sql-perf] > generates all input records in advance into memory before running the > benchmark. This allows Spark ML benchmark to read records from memory instead > of generating values for those records during the benchmark. > We can generate value once and re-use it for all input records. This approach > minimizes the overhead of source operator and allow us to compare the Flink > ML benchmark result with Spark ML benchmark result (using spark-sql-perf) > fairly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31084) DataGen sequence generator requires the definition of start/end values
[ https://issues.apache.org/jira/browse/FLINK-31084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690427#comment-17690427 ] xzw0223 commented on FLINK-31084: - My idea is that we don't have to pre-load all the data, we generate the data when we call the next method, then we record the value of the current generated data and save it as state, and when ck and restore we just need to recalculate the new value of the location we have sent to and send it downstream. This avoids the time consuming process of preloading all the data. [~mapohl] I would like to hear your views,looking forward for your reply, thank you. > DataGen sequence generator requires the definition of start/end values > -- > > Key: FLINK-31084 > URL: https://issues.apache.org/jira/browse/FLINK-31084 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.17.0, 1.15.3, 1.16.1 >Reporter: Matthias Pohl >Assignee: xzw0223 >Priority: Major > Labels: starter > > The [DataGen connector's > parameters|https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/datagen/#connector-options] > are not precisely documented: The start/end parameters are labeled as > required for the sequence generator even though they are (see > [SequenceGeneratorVisitor:85ff|https://github.com/apache/flink/blob/ef9ce854a3169014001f39e0d5908c703453f2b8/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/SequenceGeneratorVisitor.java#L85]). > But instead of updating the documentation, we should just come up with > reasonable default values. As a user, I would expect the positive integer > values to be returned starting from 0 if I don't specify anything here. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31084) DataGen sequence generator requires the definition of start/end values
[ https://issues.apache.org/jira/browse/FLINK-31084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690426#comment-17690426 ] xzw0223 commented on FLINK-31084: - There is another problem, because we need to write the data generation into the deque in advance, the processing time at this stage is very time-consuming, and this problem will also exist when triggering ck and restoring the state, because their processing methods are similar, both need for loop. See also https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java#L91 I run into this problem a lot when testing with dataGen. > DataGen sequence generator requires the definition of start/end values > -- > > Key: FLINK-31084 > URL: https://issues.apache.org/jira/browse/FLINK-31084 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.17.0, 1.15.3, 1.16.1 >Reporter: Matthias Pohl >Assignee: xzw0223 >Priority: Major > Labels: starter > > The [DataGen connector's > parameters|https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/datagen/#connector-options] > are not precisely documented: The start/end parameters are labeled as > required for the sequence generator even though they are (see > [SequenceGeneratorVisitor:85ff|https://github.com/apache/flink/blob/ef9ce854a3169014001f39e0d5908c703453f2b8/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/SequenceGeneratorVisitor.java#L85]). > But instead of updating the documentation, we should just come up with > reasonable default values. As a user, I would expect the positive integer > values to be returned starting from 0 if I don't specify anything here. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31125) Flink ML benchmark result should not include data generation overhead
Dong Lin created FLINK-31125: Summary: Flink ML benchmark result should not include data generation overhead Key: FLINK-31125 URL: https://issues.apache.org/jira/browse/FLINK-31125 Project: Flink Issue Type: Improvement Components: Library / Machine Learning Reporter: Dong Lin Fix For: ml-2.2.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31125) Flink ML benchmark result should not include data generation overhead
[ https://issues.apache.org/jira/browse/FLINK-31125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin reassigned FLINK-31125: Assignee: Dong Lin > Flink ML benchmark result should not include data generation overhead > - > > Key: FLINK-31125 > URL: https://issues.apache.org/jira/browse/FLINK-31125 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > Fix For: ml-2.2.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-opensearch] lilyevsky commented on pull request #12: [FLINK-30998] Add test for FailureHandler
lilyevsky commented on PR #12: URL: https://github.com/apache/flink-connector-opensearch/pull/12#issuecomment-1434706713 Opened by mistake -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] lilyevsky closed pull request #12: [FLINK-30998] Add test for FailureHandler
lilyevsky closed pull request #12: [FLINK-30998] Add test for FailureHandler URL: https://github.com/apache/flink-connector-opensearch/pull/12 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta commented on pull request #10: [FLINK-31068] Document how to use Opensearch connector with OpenSearch 1.x / 2.x / 3.x (upcoming) clusters
reta commented on PR #10: URL: https://github.com/apache/flink-connector-opensearch/pull/10#issuecomment-1434684899 @MartijnVisser could you have a second to look (only docs update), thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] akalash commented on pull request #21919: [FLINK-15550][runtime] The static latches in TaskTest were replaced by latches from invokable objects
akalash commented on PR #21919: URL: https://github.com/apache/flink/pull/21919#issuecomment-1434659103 @pnowojski, Can you take a look at it please? it is just a backport of the ticket which you have reviewed already. There was no any conflicts -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #21960: [FLINK-31036][test][checkpoint] FileSystemCheckpointStorage as the default for test
flinkbot commented on PR #21960: URL: https://github.com/apache/flink/pull/21960#issuecomment-1434642730 ## CI report: * 52974fcfa9fa636359d179eb556af6f82bf1ff96 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31036) StateCheckpointedITCase timed out
[ https://issues.apache.org/jira/browse/FLINK-31036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan updated FLINK-31036: Component/s: Tests > StateCheckpointedITCase timed out > - > > Key: FLINK-31036 > URL: https://issues.apache.org/jira/browse/FLINK-31036 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Tests >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available, test-stability > Attachments: image-2023-02-16-20-29-52-050.png > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46023=logs=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3=0c010d0c-3dec-5bf1-d408-7b18988b1b2b=10608 > {code} > "Legacy Source Thread - Source: Custom Source -> Filter (6/12)#69980" > #13718026 prio=5 os_prio=0 tid=0x7f05f44f0800 nid=0x128157 waiting on > condition [0x7f059feef000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xf0a974e8> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > at > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:384) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:356) > at > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:414) > at > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:390) > at > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:328) > at > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:161) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:91) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:59) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:31) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils$$Lambda$1311/1256184070.accept(Unknown > Source) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513) > - locked <0xd55035c0> (a java.lang.Object) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103) > at > org.apache.flink.test.checkpointing.StateCheckpointedITCase$StringGeneratingSourceFunction.run(StateCheckpointedITCase.java:178) > - locked <0xd55035c0> (a java.lang.Object) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333) > {code} -- This
[jira] [Updated] (FLINK-31036) StateCheckpointedITCase timed out
[ https://issues.apache.org/jira/browse/FLINK-31036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31036: --- Labels: pull-request-available test-stability (was: test-stability) > StateCheckpointedITCase timed out > - > > Key: FLINK-31036 > URL: https://issues.apache.org/jira/browse/FLINK-31036 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available, test-stability > Attachments: image-2023-02-16-20-29-52-050.png > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46023=logs=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3=0c010d0c-3dec-5bf1-d408-7b18988b1b2b=10608 > {code} > "Legacy Source Thread - Source: Custom Source -> Filter (6/12)#69980" > #13718026 prio=5 os_prio=0 tid=0x7f05f44f0800 nid=0x128157 waiting on > condition [0x7f059feef000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xf0a974e8> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > at > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:384) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:356) > at > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:414) > at > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:390) > at > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:328) > at > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:161) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:91) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:59) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:31) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils$$Lambda$1311/1256184070.accept(Unknown > Source) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513) > - locked <0xd55035c0> (a java.lang.Object) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103) > at > org.apache.flink.test.checkpointing.StateCheckpointedITCase$StringGeneratingSourceFunction.run(StateCheckpointedITCase.java:178) > - locked <0xd55035c0> (a java.lang.Object) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) > at >
[GitHub] [flink] 1996fanrui opened a new pull request, #21960: [FLINK-31036][test][checkpoint] FileSystemCheckpointStorage as the default for test
1996fanrui opened a new pull request, #21960: URL: https://github.com/apache/flink/pull/21960 ## What is the purpose of the change FileSystemCheckpointStorage as the default for test, because the checkpoint size of JobManagerCheckpointStorage isn't enough in some tests. ## Brief change log FileSystemCheckpointStorage as the default for test. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? (not documented -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-31124) Add it case for HiveTableSink speculative execution
Biao Liu created FLINK-31124: Summary: Add it case for HiveTableSink speculative execution Key: FLINK-31124 URL: https://issues.apache.org/jira/browse/FLINK-31124 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Biao Liu The part of HiveTableSink has supported speculative execution in https://issues.apache.org/jira/browse/FLINK-30823. We would like to add some integration test cases for this feature. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] snuyanzin commented on a diff in pull request #21934: [FLINK-27998][table] Upgrade Calcite to 1.30.0
snuyanzin commented on code in PR #21934: URL: https://github.com/apache/flink/pull/21934#discussion_r1109781704 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala: ## @@ -174,6 +174,7 @@ object FlinkLogicalRelFactories { def createCorrelate( left: RelNode, right: RelNode, +hints: util.List[RelHint], Review Comment: This change came from https://issues.apache.org/jira/browse/CALCITE-4967 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31084) DataGen sequence generator requires the definition of start/end values
[ https://issues.apache.org/jira/browse/FLINK-31084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690380#comment-17690380 ] xzw0223 commented on FLINK-31084: - [~mapohl] Hi, I have encountered a problem, and I would like to ask your opinion. If kind is set as' sequence ', end is set as Integer.MAX_VALUE and parallelism is set as 1,https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java#L117 will have a problem here. This is an existing problem. Do I need to raise an issue again and record this problem? Or can I solve it all together? > DataGen sequence generator requires the definition of start/end values > -- > > Key: FLINK-31084 > URL: https://issues.apache.org/jira/browse/FLINK-31084 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.17.0, 1.15.3, 1.16.1 >Reporter: Matthias Pohl >Assignee: xzw0223 >Priority: Major > Labels: starter > > The [DataGen connector's > parameters|https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/datagen/#connector-options] > are not precisely documented: The start/end parameters are labeled as > required for the sequence generator even though they are (see > [SequenceGeneratorVisitor:85ff|https://github.com/apache/flink/blob/ef9ce854a3169014001f39e0d5908c703453f2b8/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/SequenceGeneratorVisitor.java#L85]). > But instead of updating the documentation, we should just come up with > reasonable default values. As a user, I would expect the positive integer > values to be returned starting from 0 if I don't specify anything here. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31123) Add it case for FileSink speculative execution
Biao Liu created FLINK-31123: Summary: Add it case for FileSink speculative execution Key: FLINK-31123 URL: https://issues.apache.org/jira/browse/FLINK-31123 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Biao Liu The FileSink has supported speculative execution in https://issues.apache.org/jira/browse/FLINK-30823. We would like to add some integration test cases for this feature. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31041) Race condition in DefaultScheduler results in memory leak and busy loop
[ https://issues.apache.org/jira/browse/FLINK-31041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690379#comment-17690379 ] Danny Cranmer commented on FLINK-31041: --- Hey [~zhuzh] / [~huwh] . Thank-you for the deep dive and explanation. Yes [this change|https://github.com/zhuzhurk/flink/commit/a522edeb4e23ab1ce3f5a34eb3269116723e77a5] solve the problem for me. Thanks [~huwh] for offering to pick up the fix, that is most appreciated. > Race condition in DefaultScheduler results in memory leak and busy loop > --- > > Key: FLINK-31041 > URL: https://issues.apache.org/jira/browse/FLINK-31041 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.15.3, 1.16.1 >Reporter: Danny Cranmer >Assignee: Danny Cranmer >Priority: Critical > Labels: pull-request-available > Fix For: 1.17.0, 1.15.4, 1.16.2 > > Attachments: failovers.log, flink-31041-heap-dump.png, > test-restart-strategy.log > > > h4. Context > When a job creates multiple sources that use the {{SourceCoordinator}} > (FLIP-27), there is a failure race condition that results in: > * Memory leak of {{ExecutionVertexVersion}} > * Busy loop constantly trying to restart job > * Restart strategy is not respected > This results in the Job Manager becoming unresponsive. > h4. !flink-31041-heap-dump.png! > h4. Reproduction Steps > This can be reproduced by a job that creates multiple sources that fail in > the {{{}SplitEnumerator{}}}. We observed this with multiple {{KafkaSource's}} > trying to load a non-existent cert from the file system and throwing FNFE. > Thus, here is a simple job to reproduce (BE WARNED: running this locally will > lock up your IDE): > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > env.setRestartStrategy(new > RestartStrategies.FailureRateRestartStrategyConfiguration(1, Time.of(10, > TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS))); > KafkaSource source = KafkaSource.builder() > .setProperty("security.protocol", "SASL_SSL") > // SSL configurations > // Configure the path of truststore (CA) provided by the server > .setProperty("ssl.truststore.location", > "/path/to/kafka.client.truststore.jks") > .setProperty("ssl.truststore.password", "test1234") > // Configure the path of keystore (private key) if client > authentication is required > .setProperty("ssl.keystore.location", > "/path/to/kafka.client.keystore.jks") > .setProperty("ssl.keystore.password", "test1234") > // SASL configurations > // Set SASL mechanism as SCRAM-SHA-256 > .setProperty("sasl.mechanism", "SCRAM-SHA-256") > // Set JAAS configurations > .setProperty("sasl.jaas.config", > "org.apache.kafka.common.security.scram.ScramLoginModule required > username=\"username\" password=\"password\";") > .setBootstrapServers("http://localhost:3456;) > .setTopics("input-topic") > .setGroupId("my-group") > .setStartingOffsets(OffsetsInitializer.earliest()) > .setValueOnlyDeserializer(new SimpleStringSchema()) > .build(); > List> sources = IntStream.range(0, 32) > .mapToObj(i -> env > .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka > Source " + i).uid("source-" + i) > .keyBy(s -> s.charAt(0)) > .map(s -> s)) > .collect(Collectors.toList()); > env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka > Source").uid("source") > .keyBy(s -> s.charAt(0)) > .union(sources.toArray(new SingleOutputStreamOperator[] {})) > .print(); > env.execute("test job"); {code} > h4. Root Cause > We can see that the {{OperatorCoordinatorHolder}} already has a [debounce > mechanism|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L609], > however the {{DefaultScheduler}} does not. We need a debounce mechanism in > the {{DefaultScheduler}} since it handles many > {{{}OperatorCoordinatorHolder{}}}. > h4. Fix > I have managed to fix this, I will open a PR, but would need feedback from > people who understand this code better than me! > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-31041) Race condition in DefaultScheduler results in memory leak and busy loop
[ https://issues.apache.org/jira/browse/FLINK-31041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690379#comment-17690379 ] Danny Cranmer edited comment on FLINK-31041 at 2/17/23 12:58 PM: - Hey [~zhuzh] / [~huwh] . Thank-you for the deep dive and explanation. Yes [this change|https://github.com/zhuzhurk/flink/commit/a522edeb4e23ab1ce3f5a34eb3269116723e77a5] solved the problem for me. Thanks [~huwh] for offering to pick up the fix, that is most appreciated. was (Author: dannycranmer): Hey [~zhuzh] / [~huwh] . Thank-you for the deep dive and explanation. Yes [this change|https://github.com/zhuzhurk/flink/commit/a522edeb4e23ab1ce3f5a34eb3269116723e77a5] solve the problem for me. Thanks [~huwh] for offering to pick up the fix, that is most appreciated. > Race condition in DefaultScheduler results in memory leak and busy loop > --- > > Key: FLINK-31041 > URL: https://issues.apache.org/jira/browse/FLINK-31041 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.15.3, 1.16.1 >Reporter: Danny Cranmer >Assignee: Danny Cranmer >Priority: Critical > Labels: pull-request-available > Fix For: 1.17.0, 1.15.4, 1.16.2 > > Attachments: failovers.log, flink-31041-heap-dump.png, > test-restart-strategy.log > > > h4. Context > When a job creates multiple sources that use the {{SourceCoordinator}} > (FLIP-27), there is a failure race condition that results in: > * Memory leak of {{ExecutionVertexVersion}} > * Busy loop constantly trying to restart job > * Restart strategy is not respected > This results in the Job Manager becoming unresponsive. > h4. !flink-31041-heap-dump.png! > h4. Reproduction Steps > This can be reproduced by a job that creates multiple sources that fail in > the {{{}SplitEnumerator{}}}. We observed this with multiple {{KafkaSource's}} > trying to load a non-existent cert from the file system and throwing FNFE. > Thus, here is a simple job to reproduce (BE WARNED: running this locally will > lock up your IDE): > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > env.setRestartStrategy(new > RestartStrategies.FailureRateRestartStrategyConfiguration(1, Time.of(10, > TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS))); > KafkaSource source = KafkaSource.builder() > .setProperty("security.protocol", "SASL_SSL") > // SSL configurations > // Configure the path of truststore (CA) provided by the server > .setProperty("ssl.truststore.location", > "/path/to/kafka.client.truststore.jks") > .setProperty("ssl.truststore.password", "test1234") > // Configure the path of keystore (private key) if client > authentication is required > .setProperty("ssl.keystore.location", > "/path/to/kafka.client.keystore.jks") > .setProperty("ssl.keystore.password", "test1234") > // SASL configurations > // Set SASL mechanism as SCRAM-SHA-256 > .setProperty("sasl.mechanism", "SCRAM-SHA-256") > // Set JAAS configurations > .setProperty("sasl.jaas.config", > "org.apache.kafka.common.security.scram.ScramLoginModule required > username=\"username\" password=\"password\";") > .setBootstrapServers("http://localhost:3456;) > .setTopics("input-topic") > .setGroupId("my-group") > .setStartingOffsets(OffsetsInitializer.earliest()) > .setValueOnlyDeserializer(new SimpleStringSchema()) > .build(); > List> sources = IntStream.range(0, 32) > .mapToObj(i -> env > .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka > Source " + i).uid("source-" + i) > .keyBy(s -> s.charAt(0)) > .map(s -> s)) > .collect(Collectors.toList()); > env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka > Source").uid("source") > .keyBy(s -> s.charAt(0)) > .union(sources.toArray(new SingleOutputStreamOperator[] {})) > .print(); > env.execute("test job"); {code} > h4. Root Cause > We can see that the {{OperatorCoordinatorHolder}} already has a [debounce > mechanism|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L609], > however the {{DefaultScheduler}} does not. We need a debounce mechanism in > the {{DefaultScheduler}} since it handles many > {{{}OperatorCoordinatorHolder{}}}. > h4. Fix > I have managed to fix this, I will open a PR, but would need feedback from > people who understand this code better than me! > > -- This message was sent by Atlassian Jira
[GitHub] [flink] snuyanzin commented on a diff in pull request #21934: [FLINK-27998][table] Upgrade Calcite to 1.30.0
snuyanzin commented on code in PR #21934: URL: https://github.com/apache/flink/pull/21934#discussion_r1109761449 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlRowConstructor.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.sql; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlOperatorBinding; +import org.apache.calcite.sql.SqlUtil; +import org.apache.calcite.sql.fun.SqlRowOperator; +import org.apache.calcite.util.Pair; + +import java.util.AbstractList; +import java.util.Map; + +/** {@link SqlOperator} for ROW, which behaves same way as in Calcite 1.29.0. */ +public class SqlRowConstructor extends SqlRowOperator { +public SqlRowConstructor() { +super("ROW"); +} + +@Override +public RelDataType inferReturnType(SqlOperatorBinding opBinding) { +// The type of a ROW(e1,e2) expression is a record with the types +// {e1type,e2type}. According to the standard, field names are +// implementation-defined. +return opBinding +.getTypeFactory() +.createStructType( +new AbstractList>() { +@Override +public Map.Entry get(int index) { +return Pair.of( +SqlUtil.deriveAliasFromOrdinal(index), +opBinding.getOperandType(index)); +} + +@Override +public int size() { +return opBinding.getOperandCount(); +} +}); +} +} Review Comment: In https://issues.apache.org/jira/browse/CALCITE-3627 there has been changed a NullPolicy for ROW. It leads to failure of `org.apache.flink.table.planner.functions.RowFunctionITCase` since table api and sql are becoming inconsistent in context of NullPolicy for ROW type -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31121) KafkaSink should be able to catch and ignore exp via config on/off
[ https://issues.apache.org/jira/browse/FLINK-31121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690378#comment-17690378 ] Jing Ge commented on FLINK-31121: - [~zjureel] it's your, thanks! > KafkaSink should be able to catch and ignore exp via config on/off > -- > > Key: FLINK-31121 > URL: https://issues.apache.org/jira/browse/FLINK-31121 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.17.0 >Reporter: Jing Ge >Assignee: Shammon >Priority: Major > Fix For: 1.18.0 > > > It is a common requirement for users to catch and ignore exp while sinking > the event to to downstream system like Kafka. It will be convenient for some > use cases, if Flink Sink can provide built-in functionality and config to > turn it on and off, especially for cases that data consistency is not very > important or the stream contains dirty events. [1][2] > First of all, consider doing it for KafkaSink. Long term, a common solution > that can be used by any connector would be even better. > > [1][https://lists.apache.org/thread/wy31s8wb9qnskq29wn03kp608z4vrwv8] > [2]https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31121) KafkaSink should be able to catch and ignore exp via config on/off
[ https://issues.apache.org/jira/browse/FLINK-31121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge reassigned FLINK-31121: --- Assignee: Shammon > KafkaSink should be able to catch and ignore exp via config on/off > -- > > Key: FLINK-31121 > URL: https://issues.apache.org/jira/browse/FLINK-31121 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.17.0 >Reporter: Jing Ge >Assignee: Shammon >Priority: Major > Fix For: 1.18.0 > > > It is a common requirement for users to catch and ignore exp while sinking > the event to to downstream system like Kafka. It will be convenient for some > use cases, if Flink Sink can provide built-in functionality and config to > turn it on and off, especially for cases that data consistency is not very > important or the stream contains dirty events. [1][2] > First of all, consider doing it for KafkaSink. Long term, a common solution > that can be used by any connector would be even better. > > [1][https://lists.apache.org/thread/wy31s8wb9qnskq29wn03kp608z4vrwv8] > [2]https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-31119) JobRecoveryITCase.testTaskFailureRecovery failed due to the job not finishing successfully
[ https://issues.apache.org/jira/browse/FLINK-31119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690355#comment-17690355 ] Matthias Pohl edited comment on FLINK-31119 at 2/17/23 12:30 PM: - Running the test locally 8500 times didn't reveal anything. [~chesnay] I'm curious about your opinion. The tests ran at around the same time (2:24:14 and 1:07:36) on two different Alibaba machines. I don't like to blame it on a hick-up. Browsing through git logs doesn't reveal anything either. One other thing I could think of is that an external process send packages to the port that is used for the communication. was (Author: mapohl): Running the test locally 8500 times didn't reveal anything. [~chesnay] I'm curious about your opinion. The tests ran at around the same time (2:24:14 and 1:07:36) on two different Alibaba machines. I don't like to blame it on a hick-up. Browsing through git logs doesn't reveal anything either. > JobRecoveryITCase.testTaskFailureRecovery failed due to the job not finishing > successfully > -- > > Key: FLINK-31119 > URL: https://issues.apache.org/jira/browse/FLINK-31119 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Priority: Blocker > Labels: test-stability > Attachments: FLINK-31119.20230217.1.log, FLINK-31119.20230217.4.log > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46247=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=7c1d86e3-35bd-5fd5-3b7c-30c126a78702=8523 > {code} > Feb 17 02:24:35 [ERROR] Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, > Time elapsed: 24.074 s <<< FAILURE! - in > org.apache.flink.runtime.jobmaster.JobRecoveryITCase > Feb 17 02:24:35 [ERROR] > org.apache.flink.runtime.jobmaster.JobRecoveryITCase.testTaskFailureRecovery > Time elapsed: 20.981 s <<< FAILURE! > Feb 17 02:24:35 java.lang.AssertionError: > Feb 17 02:24:35 > Feb 17 02:24:35 Expected: is > Feb 17 02:24:35 but: was > Feb 17 02:24:35 at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > Feb 17 02:24:35 at org.junit.Assert.assertThat(Assert.java:964) > Feb 17 02:24:35 at org.junit.Assert.assertThat(Assert.java:930) > Feb 17 02:24:35 at > org.apache.flink.runtime.jobmaster.JobRecoveryITCase.runTaskFailureRecoveryTest(JobRecoveryITCase.java:79) > Feb 17 02:24:35 at > org.apache.flink.runtime.jobmaster.JobRecoveryITCase.testTaskFailureRecovery(JobRecoveryITCase.java:63) > Feb 17 02:24:35 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > [...] > {code} > The actual cause is that unexpected data was received: > {code} > 02:24:35,301 [Receiver (5/5)#1] WARN > org.apache.flink.runtime.taskmanager.Task[] - Receiver > (5/5)#1 > (d88e16a5e3c6f2c08cf3924d93ea18e2_28065fbb1d26fe99e018d3b846860dd3_4_1) > switched from RUNNING to FAILED with failure cause: > java.lang.Exception: Wrong data received. > at > org.apache.flink.runtime.jobmaster.TestingAbstractInvokables$Receiver.invoke(TestingAbstractInvokables.java:83) > ~[test-classes/:?] > at > org.apache.flink.runtime.jobmaster.JobRecoveryITCase$FailingOnceReceiver.invoke(JobRecoveryITCase.java:126) > ~[test-classes/:?] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > ~[classes/:?] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > [classes/:?] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > [classes/:?] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > [classes/:?] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31120) ConcurrentModificationException occurred in StringFunctionsITCase.test
[ https://issues.apache.org/jira/browse/FLINK-31120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690372#comment-17690372 ] Weijie Guo commented on FLINK-31120: Sorry, I accidentally touched it. > ConcurrentModificationException occurred in StringFunctionsITCase.test > -- > > Key: FLINK-31120 > URL: https://issues.apache.org/jira/browse/FLINK-31120 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Priority: Blocker > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334 > {code} > Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 10.725 s <<< FAILURE! - in > org.apache.flink.table.planner.functions.StringFunctionsITCase > Feb 17 04:51:25 [ERROR] > org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4] > Time elapsed: 4.367 s <<< ERROR! > Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute > sql > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974) > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422) > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113) > Feb 17 04:51:25 at > org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93) > Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31120) ConcurrentModificationException occurred in StringFunctionsITCase.test
[ https://issues.apache.org/jira/browse/FLINK-31120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-31120: --- Description: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334 {code} Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 10.725 s <<< FAILURE! - in org.apache.flink.table.planner.functions.StringFunctionsITCase Feb 17 04:51:25 [ERROR] org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4] Time elapsed: 4.367 s <<< ERROR! Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute sql Feb 17 04:51:25 at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974) Feb 17 04:51:25 at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422) Feb 17 04:51:25 at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476) Feb 17 04:51:25 at org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354) Feb 17 04:51:25 at org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320) Feb 17 04:51:25 at org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113) Feb 17 04:51:25 at org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93) Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) [...] {code} was: # https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334 {code} Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 10.725 s <<< FAILURE! - in org.apache.flink.table.planner.functions.StringFunctionsITCase Feb 17 04:51:25 [ERROR] org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4] Time elapsed: 4.367 s <<< ERROR! Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute sql Feb 17 04:51:25 at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974) Feb 17 04:51:25 at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422) Feb 17 04:51:25 at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476) Feb 17 04:51:25 at org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354) Feb 17 04:51:25 at org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320) Feb 17 04:51:25 at org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113) Feb 17 04:51:25 at org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93) Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) [...] {code} > ConcurrentModificationException occurred in StringFunctionsITCase.test > -- > > Key: FLINK-31120 > URL: https://issues.apache.org/jira/browse/FLINK-31120 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Priority: Blocker > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334 > {code} > Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 10.725 s <<< FAILURE! - in > org.apache.flink.table.planner.functions.StringFunctionsITCase > Feb 17 04:51:25 [ERROR] > org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4] > Time elapsed: 4.367 s <<< ERROR! > Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute > sql > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974) > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422) > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476) > Feb 17 04:51:25 at >
[jira] [Updated] (FLINK-31120) ConcurrentModificationException occurred in StringFunctionsITCase.test
[ https://issues.apache.org/jira/browse/FLINK-31120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-31120: --- Description: # https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334 {code} Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 10.725 s <<< FAILURE! - in org.apache.flink.table.planner.functions.StringFunctionsITCase Feb 17 04:51:25 [ERROR] org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4] Time elapsed: 4.367 s <<< ERROR! Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute sql Feb 17 04:51:25 at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974) Feb 17 04:51:25 at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422) Feb 17 04:51:25 at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476) Feb 17 04:51:25 at org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354) Feb 17 04:51:25 at org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320) Feb 17 04:51:25 at org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113) Feb 17 04:51:25 at org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93) Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) [...] {code} was: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334 {code} Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 10.725 s <<< FAILURE! - in org.apache.flink.table.planner.functions.StringFunctionsITCase Feb 17 04:51:25 [ERROR] org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4] Time elapsed: 4.367 s <<< ERROR! Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute sql Feb 17 04:51:25 at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974) Feb 17 04:51:25 at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422) Feb 17 04:51:25 at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476) Feb 17 04:51:25 at org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354) Feb 17 04:51:25 at org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320) Feb 17 04:51:25 at org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113) Feb 17 04:51:25 at org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93) Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) [...] {code} > ConcurrentModificationException occurred in StringFunctionsITCase.test > -- > > Key: FLINK-31120 > URL: https://issues.apache.org/jira/browse/FLINK-31120 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Priority: Blocker > Labels: test-stability > > # > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334 > {code} > Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 10.725 s <<< FAILURE! - in > org.apache.flink.table.planner.functions.StringFunctionsITCase > Feb 17 04:51:25 [ERROR] > org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4] > Time elapsed: 4.367 s <<< ERROR! > Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute > sql > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974) > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422) > Feb 17 04:51:25 at > org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476) > Feb 17 04:51:25 at >
[jira] [Comment Edited] (FLINK-31119) JobRecoveryITCase.testTaskFailureRecovery failed due to the job not finishing successfully
[ https://issues.apache.org/jira/browse/FLINK-31119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690314#comment-17690314 ] Matthias Pohl edited comment on FLINK-31119 at 2/17/23 12:24 PM: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46250=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=7c1d86e3-35bd-5fd5-3b7c-30c126a78702=8521 {code} 01:07:57,099 [Receiver (1/6)#1] WARN org.apache.flink.runtime.taskmanager.Task[] - Receiver (1/6)#1 (e701d0caf3247ea7554acfb5dd8df541_cb0a5d4bcd60528ae7c4e8c99900a321_0_1) switched from RUNNING to FAILED with failure cause: java.lang.NullPointerException: null at org.apache.flink.runtime.jobmaster.TestingAbstractInvokables$Receiver.invoke(TestingAbstractInvokables.java:82) ~[test-classes/:?] at org.apache.flink.runtime.jobmaster.JobRecoveryITCase$FailingOnceReceiver.invoke(JobRecoveryITCase.java:126) ~[test-classes/:?] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[classes/:?] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) [classes/:?] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) [classes/:?] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [classes/:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292] {code} This one fails with a {{NullPointerException}} in the same method [TestingAbstractInvokables.Receiver#invoke:71ff|https://github.com/apache/flink/blob/026675a5cb8a3704c51802fb549d6b0bc4759835/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java#L71]. Essentially, the data that has been received seems to be corrupted Update: There was a Wrong data exception also thrown in this case. It appeared while cancelling the tasks which was caused by the expected {{FlinkRuntimeException}}. It didn't have an impact because the job was already transitioning into CANCELLING, I guess. was (Author: mapohl): https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46250=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=7c1d86e3-35bd-5fd5-3b7c-30c126a78702=8521 {code} 01:07:57,099 [Receiver (1/6)#1] WARN org.apache.flink.runtime.taskmanager.Task[] - Receiver (1/6)#1 (e701d0caf3247ea7554acfb5dd8df541_cb0a5d4bcd60528ae7c4e8c99900a321_0_1) switched from RUNNING to FAILED with failure cause: java.lang.NullPointerException: null at org.apache.flink.runtime.jobmaster.TestingAbstractInvokables$Receiver.invoke(TestingAbstractInvokables.java:82) ~[test-classes/:?] at org.apache.flink.runtime.jobmaster.JobRecoveryITCase$FailingOnceReceiver.invoke(JobRecoveryITCase.java:126) ~[test-classes/:?] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[classes/:?] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) [classes/:?] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) [classes/:?] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [classes/:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292] {code} This one fails with a {{NullPointerException}} in the same method [TestingAbstractInvokables.Receiver#invoke:71ff|https://github.com/apache/flink/blob/026675a5cb8a3704c51802fb549d6b0bc4759835/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java#L71]. Essentially, the data that has been received seems to be corrupted > JobRecoveryITCase.testTaskFailureRecovery failed due to the job not finishing > successfully > -- > > Key: FLINK-31119 > URL: https://issues.apache.org/jira/browse/FLINK-31119 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Priority: Blocker > Labels: test-stability > Attachments: FLINK-31119.20230217.1.log, FLINK-31119.20230217.4.log > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46247=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=7c1d86e3-35bd-5fd5-3b7c-30c126a78702=8523 > {code} > Feb 17 02:24:35 [ERROR] Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, > Time elapsed: 24.074 s <<< FAILURE! - in > org.apache.flink.runtime.jobmaster.JobRecoveryITCase > Feb 17 02:24:35 [ERROR] > org.apache.flink.runtime.jobmaster.JobRecoveryITCase.testTaskFailureRecovery > Time elapsed: 20.981 s <<< FAILURE! > Feb 17 02:24:35 java.lang.AssertionError: > Feb 17 02:24:35 > Feb 17 02:24:35 Expected: is > Feb 17 02:24:35 but: was > Feb 17 02:24:35 at >
[jira] [Created] (FLINK-31122) Expose MetricGroup to Committer of new Unified Sink v2
Theo Diefenthal created FLINK-31122: --- Summary: Expose MetricGroup to Committer of new Unified Sink v2 Key: FLINK-31122 URL: https://issues.apache.org/jira/browse/FLINK-31122 Project: Flink Issue Type: Improvement Components: Runtime / Metrics Reporter: Theo Diefenthal When writing my own committer, I want to include metrics on the operations performed as for any other custom function in the pipeline. The goal of this story is to provide MetricGroup to Committer in some way so that in sink2.Committer, within the commit method, I somehow have access to the MetricGroup. The usecase for me with regards to this story: I want to notify Impala about newly added partitions written via FileSink and track via metrics the number of performed Impala queries and their duration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] snuyanzin commented on a diff in pull request #21958: [FLINK-31118][table] Add ARRAY_UNION function.
snuyanzin commented on code in PR #21958: URL: https://github.com/apache/flink/pull/21958#discussion_r1109698942 ## flink-python/pyflink/table/expression.py: ## @@ -1487,6 +1487,13 @@ def array_distinct(self) -> 'Expression': """ return _binary_op("arrayDistinct")(self) +def array_union(self, array) -> 'Expression': +""" +Returns an array of the elements in the union of array1 and array2, without duplicates. +If both of the array are null, the function will return null. Review Comment: same here ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java: ## @@ -1359,6 +1360,16 @@ public OutType arrayDistinct() { return toApiSpecificExpression(unresolvedCall(ARRAY_DISTINCT, toExpr())); } +/** + * Returns an array of the elements in the union of array1 and array2, without duplicates. Review Comment: What about ordering? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #21958: [FLINK-31118][table] Add ARRAY_UNION function.
snuyanzin commented on PR #21958: URL: https://github.com/apache/flink/pull/21958#issuecomment-1434551541 Thanks for your contribution. To be honest i didn't get the logic. What i did: 1. Built it from the PR's branch 2. Started standalone Flink 3. Via sqlClient submitted several queries ```sql SELECT array_union(array[1], array[2]); -- result array[1, 2] -- this is OK ``` ```sql SELECT array_union(array[1], array[2, 3]); -- result array[1, 2, 3] --- this is OK ``` ```sql SELECT array_union(array[1], array[2, 3, null]); -- result array[1, 2, 3, 0] --- this is NOT OK ``` ```sql SELECT array_union(array[1], array['this is a string']); -- result [1, 16] -- this is NOT OK ``` ```sql SELECT array_union(array[1], array['this is a string']); -- result [1, 16] -- this is NOT OK ``` ```sql SELECT array_union(array[1], array[array[1, 2, 3]]); -- result [1, 24] -- this is NOT OK ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas closed pull request #209: [FLINK-27103] Don't store redundant primary key fields
SteNicholas closed pull request #209: [FLINK-27103] Don't store redundant primary key fields URL: https://github.com/apache/flink-table-store/pull/209 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #21958: [FLINK-31118][table] Add ARRAY_UNION function.
snuyanzin commented on code in PR #21958: URL: https://github.com/apache/flink/pull/21958#discussion_r1109698642 ## docs/data/sql_functions.yml: ## @@ -617,6 +617,9 @@ collection: - sql: ARRAY_DISTINCT(haystack) table: haystack.arrayDistinct() description: Returns an array with unique elements. If the array itself is null, the function will return null. Keeps ordering of elements. + - sql: ARRAY_UNION(array1, array2) +table: haystack.arrayUnion(array) +description: Returns an array of the elements in the union of array1 and array2, without duplicates. If both of the array are null, the function will return null. Review Comment: From the description it is not clear what happens if only one array is `null` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #21934: [FLINK-27998][table] Upgrade Calcite to 1.30.0
snuyanzin commented on code in PR #21934: URL: https://github.com/apache/flink/pull/21934#discussion_r1109660825 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtilTest.scala: ## @@ -206,7 +206,7 @@ class FlinkRexUtilTest { assertEquals(RexUtil.toCnf(rexBuilder, predicate).toString, newPredicate3.toString) val newPredicate4 = FlinkRexUtil.toCnf(rexBuilder, Int.MaxValue, predicate) -assertFalse(RexUtil.eq(predicate, newPredicate4)) Review Comment: Deprecated in https://issues.apache.org/jira/browse/CALCITE-2632 in favor of `e1.equals(e2)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #21934: [FLINK-27998][table] Upgrade Calcite to 1.30.0
snuyanzin commented on code in PR #21934: URL: https://github.com/apache/flink/pull/21934#discussion_r1109660535 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkSubQueryRemoveRule.scala: ## @@ -304,13 +304,13 @@ class FlinkSubQueryRemoveRule( replacement: RexNode): RexNode = { condition.accept(new RexShuttle() { override def visitSubQuery(subQuery: RexSubQuery): RexNode = { -if (RexUtil.eq(subQuery, oldSubQueryCall)) replacement else subQuery Review Comment: Deprecated in https://issues.apache.org/jira/browse/CALCITE-2632 in favor of `e1.equals(e2)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #21934: [FLINK-27998][table] Upgrade Calcite to 1.30.0
snuyanzin commented on code in PR #21934: URL: https://github.com/apache/flink/pull/21934#discussion_r1109652438 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/FlinkRelDistribution.scala: ## @@ -119,7 +119,7 @@ class FlinkRelDistribution private ( try { val i = mapping.getTargetOpt(fieldCollation.getFieldIndex) if (i >= 0) { - newFieldCollations.add(fieldCollation.copy(i)) Review Comment: deprecated at https://github.com/apache/calcite/commit/4fdf241dfb8cc555ab28c6cba7a152e4cc4ec169#diff-c324ef65e6a4e0a729b606cf5f4998aa6769bed7ce52f94995b85dcee93b9d96R219-R227 in favor of `withXXX` methods ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/TraitUtil.scala: ## @@ -49,7 +49,7 @@ object TraitUtil { fieldCollation => try { val i = mapping.getTargetOpt(fieldCollation.getFieldIndex) -if (i >= 0) newFieldCollations.add(fieldCollation.copy(i)) Review Comment: deprecated at https://github.com/apache/calcite/commit/4fdf241dfb8cc555ab28c6cba7a152e4cc4ec169#diff-c324ef65e6a4e0a729b606cf5f4998aa6769bed7ce52f94995b85dcee93b9d96R219-R227 in favor of `withXXX` methods -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #21934: [FLINK-27998][table] Upgrade Calcite to 1.30.0
snuyanzin commented on code in PR #21934: URL: https://github.com/apache/flink/pull/21934#discussion_r1109652438 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/FlinkRelDistribution.scala: ## @@ -119,7 +119,7 @@ class FlinkRelDistribution private ( try { val i = mapping.getTargetOpt(fieldCollation.getFieldIndex) if (i >= 0) { - newFieldCollations.add(fieldCollation.copy(i)) Review Comment: deprecated at https://github.com/apache/calcite/commit/513f4d2510a47e8d0b55f35dac56785c16fa7d5f in favor of `withXXX` methods ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/TraitUtil.scala: ## @@ -49,7 +49,7 @@ object TraitUtil { fieldCollation => try { val i = mapping.getTargetOpt(fieldCollation.getFieldIndex) -if (i >= 0) newFieldCollations.add(fieldCollation.copy(i)) Review Comment: deprecated at https://github.com/apache/calcite/commit/513f4d2510a47e8d0b55f35dac56785c16fa7d5f in favor of `withXXX` methods -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #21934: [FLINK-27998][table] Upgrade Calcite to 1.30.0
snuyanzin commented on code in PR #21934: URL: https://github.com/apache/flink/pull/21934#discussion_r1109651676 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRule.scala: ## @@ -328,7 +328,7 @@ class DecomposeGroupingSetsRule val res: Long = call.getArgList.foldLeft(0L)( (res, arg) => (res << 1L) + (if (groups.contains(arg)) 0L else 1L)) builder.makeLiteral(res, call.getType, false) - case _ => builder.constantNull() Review Comment: Deprecated in https://github.com/apache/calcite/commit/b432756e2be9ad0557a56254550eb4438dd0efcf in favor of `#makeNullLiteral(RelDataType)` because > because it produces untyped NULL literals that make planning difficult -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org