[GitHub] [flink] reswqa commented on pull request #21953: [FLINK-30989][runtime] Some config options related to sorting and spilling are not valid.

2023-02-17 Thread via GitHub


reswqa commented on PR #21953:
URL: https://github.com/apache/flink/pull/21953#issuecomment-1435495267

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31041) Race condition in DefaultScheduler results in memory leak and busy loop

2023-02-17 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690678#comment-17690678
 ] 

Zhu Zhu commented on FLINK-31041:
-

I have assigned you the ticket. [~huwh]
Feel free to open a pr for it.

> Race condition in DefaultScheduler results in memory leak and busy loop
> ---
>
> Key: FLINK-31041
> URL: https://issues.apache.org/jira/browse/FLINK-31041
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.15.3, 1.16.1
>Reporter: Danny Cranmer
>Assignee: Weihua Hu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.15.4, 1.16.2
>
> Attachments: failovers.log, flink-31041-heap-dump.png, 
> test-restart-strategy.log
>
>
> h4. Context
> When a job creates multiple sources that use the {{SourceCoordinator}} 
> (FLIP-27), there is a failure race condition that results in:
>  * Memory leak of {{ExecutionVertexVersion}}
>  * Busy loop constantly trying to restart job
>  * Restart strategy is not respected
> This results in the Job Manager becoming unresponsive.
> h4. !flink-31041-heap-dump.png!
> h4. Reproduction Steps
> This can be reproduced by a job that creates multiple sources that fail in 
> the {{{}SplitEnumerator{}}}. We observed this with multiple {{KafkaSource's}} 
> trying to load a non-existent cert from the file system and throwing FNFE. 
> Thus, here is a simple job to reproduce (BE WARNED: running this locally will 
> lock up your IDE):
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.setRestartStrategy(new 
> RestartStrategies.FailureRateRestartStrategyConfiguration(1, Time.of(10, 
> TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS)));
> KafkaSource source = KafkaSource.builder()
> .setProperty("security.protocol", "SASL_SSL")
> // SSL configurations
> // Configure the path of truststore (CA) provided by the server
> .setProperty("ssl.truststore.location", 
> "/path/to/kafka.client.truststore.jks")
> .setProperty("ssl.truststore.password", "test1234")
> // Configure the path of keystore (private key) if client 
> authentication is required
> .setProperty("ssl.keystore.location", 
> "/path/to/kafka.client.keystore.jks")
> .setProperty("ssl.keystore.password", "test1234")
> // SASL configurations
> // Set SASL mechanism as SCRAM-SHA-256
> .setProperty("sasl.mechanism", "SCRAM-SHA-256")
> // Set JAAS configurations
> .setProperty("sasl.jaas.config", 
> "org.apache.kafka.common.security.scram.ScramLoginModule required 
> username=\"username\" password=\"password\";")
> .setBootstrapServers("http://localhost:3456;)
> .setTopics("input-topic")
> .setGroupId("my-group")
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .build();
> List> sources = IntStream.range(0, 32)
> .mapToObj(i -> env
> .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka 
> Source " + i).uid("source-" + i)
> .keyBy(s -> s.charAt(0))
> .map(s -> s))
> .collect(Collectors.toList());
> env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka 
> Source").uid("source")
> .keyBy(s -> s.charAt(0))
> .union(sources.toArray(new SingleOutputStreamOperator[] {}))
> .print();
> env.execute("test job"); {code}
> h4. Root Cause
> We can see that the {{OperatorCoordinatorHolder}} already has a [debounce 
> mechanism|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L609],
>  however the {{DefaultScheduler}} does not. We need a debounce mechanism in 
> the {{DefaultScheduler}} since it handles many 
> {{{}OperatorCoordinatorHolder{}}}.
> h4. Fix
> I have managed to fix this, I will open a PR, but would need feedback from 
> people who understand this code better than me!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31041) Race condition in DefaultScheduler results in memory leak and busy loop

2023-02-17 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-31041:
---

Assignee: Weihua Hu  (was: Danny Cranmer)

> Race condition in DefaultScheduler results in memory leak and busy loop
> ---
>
> Key: FLINK-31041
> URL: https://issues.apache.org/jira/browse/FLINK-31041
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.15.3, 1.16.1
>Reporter: Danny Cranmer
>Assignee: Weihua Hu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.15.4, 1.16.2
>
> Attachments: failovers.log, flink-31041-heap-dump.png, 
> test-restart-strategy.log
>
>
> h4. Context
> When a job creates multiple sources that use the {{SourceCoordinator}} 
> (FLIP-27), there is a failure race condition that results in:
>  * Memory leak of {{ExecutionVertexVersion}}
>  * Busy loop constantly trying to restart job
>  * Restart strategy is not respected
> This results in the Job Manager becoming unresponsive.
> h4. !flink-31041-heap-dump.png!
> h4. Reproduction Steps
> This can be reproduced by a job that creates multiple sources that fail in 
> the {{{}SplitEnumerator{}}}. We observed this with multiple {{KafkaSource's}} 
> trying to load a non-existent cert from the file system and throwing FNFE. 
> Thus, here is a simple job to reproduce (BE WARNED: running this locally will 
> lock up your IDE):
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.setRestartStrategy(new 
> RestartStrategies.FailureRateRestartStrategyConfiguration(1, Time.of(10, 
> TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS)));
> KafkaSource source = KafkaSource.builder()
> .setProperty("security.protocol", "SASL_SSL")
> // SSL configurations
> // Configure the path of truststore (CA) provided by the server
> .setProperty("ssl.truststore.location", 
> "/path/to/kafka.client.truststore.jks")
> .setProperty("ssl.truststore.password", "test1234")
> // Configure the path of keystore (private key) if client 
> authentication is required
> .setProperty("ssl.keystore.location", 
> "/path/to/kafka.client.keystore.jks")
> .setProperty("ssl.keystore.password", "test1234")
> // SASL configurations
> // Set SASL mechanism as SCRAM-SHA-256
> .setProperty("sasl.mechanism", "SCRAM-SHA-256")
> // Set JAAS configurations
> .setProperty("sasl.jaas.config", 
> "org.apache.kafka.common.security.scram.ScramLoginModule required 
> username=\"username\" password=\"password\";")
> .setBootstrapServers("http://localhost:3456;)
> .setTopics("input-topic")
> .setGroupId("my-group")
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .build();
> List> sources = IntStream.range(0, 32)
> .mapToObj(i -> env
> .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka 
> Source " + i).uid("source-" + i)
> .keyBy(s -> s.charAt(0))
> .map(s -> s))
> .collect(Collectors.toList());
> env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka 
> Source").uid("source")
> .keyBy(s -> s.charAt(0))
> .union(sources.toArray(new SingleOutputStreamOperator[] {}))
> .print();
> env.execute("test job"); {code}
> h4. Root Cause
> We can see that the {{OperatorCoordinatorHolder}} already has a [debounce 
> mechanism|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L609],
>  however the {{DefaultScheduler}} does not. We need a debounce mechanism in 
> the {{DefaultScheduler}} since it handles many 
> {{{}OperatorCoordinatorHolder{}}}.
> h4. Fix
> I have managed to fix this, I will open a PR, but would need feedback from 
> people who understand this code better than me!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31124) Add it case for HiveTableSink speculative execution

2023-02-17 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690677#comment-17690677
 ] 

Zhu Zhu commented on FLINK-31124:
-

[~csq] Thanks for volunteering!
I have assigned you the ticket.

> Add it case for HiveTableSink speculative execution
> ---
>
> Key: FLINK-31124
> URL: https://issues.apache.org/jira/browse/FLINK-31124
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Assignee: chenshuiqiang
>Priority: Major
>
> The part of HiveTableSink has supported speculative execution in 
> https://issues.apache.org/jira/browse/FLINK-30823. We would like to add some 
> integration test cases for this feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31124) Add it case for HiveTableSink speculative execution

2023-02-17 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-31124:

Priority: Major  (was: Minor)

> Add it case for HiveTableSink speculative execution
> ---
>
> Key: FLINK-31124
> URL: https://issues.apache.org/jira/browse/FLINK-31124
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Priority: Major
>
> The part of HiveTableSink has supported speculative execution in 
> https://issues.apache.org/jira/browse/FLINK-30823. We would like to add some 
> integration test cases for this feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31124) Add it case for HiveTableSink speculative execution

2023-02-17 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-31124:
---

Assignee: chenshuiqiang

> Add it case for HiveTableSink speculative execution
> ---
>
> Key: FLINK-31124
> URL: https://issues.apache.org/jira/browse/FLINK-31124
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Assignee: chenshuiqiang
>Priority: Major
>
> The part of HiveTableSink has supported speculative execution in 
> https://issues.apache.org/jira/browse/FLINK-30823. We would like to add some 
> integration test cases for this feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] 1996fanrui commented on pull request #21960: [FLINK-31036][test][checkpoint] FileSystemCheckpointStorage as the default for test

2023-02-17 Thread via GitHub


1996fanrui commented on PR #21960:
URL: https://github.com/apache/flink/pull/21960#issuecomment-1435470941

   Record some CI here to compare the running time: 
   
   First CI: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46277=results


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-31049) Add support for Kafka record headers to KafkaSink

2023-02-17 Thread Jing Ge (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jing Ge reassigned FLINK-31049:
---

Assignee: Alex Gout

> Add support for Kafka record headers to KafkaSink
> -
>
> Key: FLINK-31049
> URL: https://issues.apache.org/jira/browse/FLINK-31049
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Alex Gout
>Assignee: Alex Gout
>Priority: Minor
>  Labels: KafkaSink
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> The default org.apache.flink.connector.kafka.sink.KafkaSink does not support 
> adding Kafka record headers. In some implementations, downstream consumers 
> might rely on Kafka record headers being set.
>  
> A way to add Headers would be to create a custom 
> KafkaRecordSerializationSchema and inject that into the KafkaSink.
> However, I'm assuming the KafkaRecordSerializationSchemaBuilder was added for 
> convenience and allows a more usable approach of creating a KafkaSink without 
> having to deal with details like the RecordProducer directly. This builder 
> does not support adding record headers.
> This is where I think it should be added.
> The code responsible for creating the Kafka record involves  
> org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaWrapper 
> where the RecordProducer is created. 
> It is relatively simple to add support for record headers by adding a 
> "HeaderProducer" to the KafkaRecordSerializationSchemaBuilder next to the key 
> and value serializers and using the appropriate RecordProducer constructor.
>  
> The issue was discussed 
> [here|https://lists.apache.org/thread/shlbbcqho0q9w5shjwdlscnsywjvbfro].
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30945) FTS does not support multiple writers into the same table and topic

2023-02-17 Thread Xinbin Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690568#comment-17690568
 ] 

Xinbin Huang commented on FLINK-30945:
--

[~lzljs3620320] would you assign this to me?

> FTS does not support multiple writers into the same table and topic
> ---
>
> Key: FLINK-30945
> URL: https://issues.apache.org/jira/browse/FLINK-30945
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Vicky Papavasileiou
>Priority: Major
>
> When creating two different streaming jobs that INSERT INTO the same table 
> and kafka topic, the second job is never able to make progress as the 
> transaction gets constantly aborted due to the producer getting fenced.
> FTS should set the transactionalIdPrefix to avoid transactions of different 
> jobs clashing.
> {code:java}
> 2023-02-06 17:13:36,088 WARN org.apache.flink.runtime.taskmanager.Task [] - 
> Writer -> Global Committer -> Sink: end (1/1)#0 
> (8cf4197af9716623c3c19e7fa3d7c071_b5c8d46f3e7b141acf271f12622e752b_0_0) 
> switched from RUNNING to FAILED with failure cause: 
> org.apache.flink.util.FlinkRuntimeException: Committing one of transactions 
> failed, logging first encountered failure at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:323)
>  at 
> org.apache.flink.table.store.connector.sink.StoreWriteOperator.notifyCheckpointComplete(StoreWriteOperator.java:175)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104)
>  at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145)
>  at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:479)
>  at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:413)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1412)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$15(StreamTask.java:1353)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$18(StreamTask.java:1392)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>  at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>  at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>  at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) 
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at 
> java.lang.Thread.run(Thread.java:750) Caused by: 
> org.apache.flink.table.store.shaded.org.apache.kafka.common.errors.ProducerFencedException:
>  There is a newer producer with the same transactionalId which fences the 
> current one. {code}
> Sample queries:
>  
>  
> {code:java}
> CREATE CATALOG table_store_catalog WITH (
>     'type'='table-store',
>     'warehouse'='s3://my-bucket/table-store'
>  );
> USE CATALOG table_store_catalog;
> SET 'execution.checkpointing.interval' = '10 s';
> CREATE TABLE word_count_kafka (
>      word STRING PRIMARY KEY NOT ENFORCED,
>      cnt BIGINT
>  ) WITH (
>      'log.system' = 'kafka',
>      'kafka.bootstrap.servers' = 'broker:9092',
>      'kafka.topic' = 'word_count_log'
>  );
> CREATE TEMPORARY TABLE word_table (
>      word STRING
>  ) WITH (
>      'connector' = 'datagen',
>      'fields.word.length' = '1'
>  );
> {code}
>  
> And the two INSERT jobs:
> {code:java}
> INSERT INTO word_count_kafka SELECT word, COUNT(*) FROM word_table GROUP BY 
> word;{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-opensearch] lilyevsky commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch

2023-02-17 Thread via GitHub


lilyevsky commented on code in PR #11:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1110240788


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java:
##
@@ -148,6 +150,7 @@ public OpensearchSinkBuilder setBulkFlushMaxActions(int 
numMaxActions) {
  * @param maxSizeMb the maximum size of buffered actions, in mb.
  * @return this builder
  */
+@SuppressWarnings("UnusedReturnValue")

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31120) ConcurrentModificationException occurred in StringFunctionsITCase.test

2023-02-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31120:
---
Labels: pull-request-available test-stability  (was: test-stability)

> ConcurrentModificationException occurred in StringFunctionsITCase.test
> --
>
> Key: FLINK-31120
> URL: https://issues.apache.org/jira/browse/FLINK-31120
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Blocker
>  Labels: pull-request-available, test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334
> {code}
> Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 10.725 s <<< FAILURE! - in 
> org.apache.flink.table.planner.functions.StringFunctionsITCase
> Feb 17 04:51:25 [ERROR] 
> org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4]
>  Time elapsed: 4.367 s <<< ERROR!
> Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute 
> sql
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974)
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422)
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93)
> Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #21961: [FLINK-31120] Add concurrent access control for collectorIterators in StreamExecutionEnvironment

2023-02-17 Thread via GitHub


flinkbot commented on PR #21961:
URL: https://github.com/apache/flink/pull/21961#issuecomment-1435091671

   
   ## CI report:
   
   * d48d6ecdba52849419a0c4a53a977b2ce7562a97 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] shuiqiangchen opened a new pull request, #21961: [FLINK-29879] Add concurrent access control for collectorIterators in StreamExecutionEnvironment

2023-02-17 Thread via GitHub


shuiqiangchen opened a new pull request, #21961:
URL: https://github.com/apache/flink/pull/21961

   
   
   ## What is the purpose of the change
   
   *This change mainly to add a concurrent access control for the static list 
`collectIterators` in `StreamExecutionEnvironment`. There are still chances to 
cause a ConcurrentModifyException, although most of time we are accessing 
StreamExecutionEnvironment in a single thread.*
   
   
   ## Brief change log
   
   - Made collectorIterators a synchronizedList in StreamExecutionEnvironment.
   
   
   ## Verifying this change
   
   This change has test cases covered by ITCases which are executed in 
concurrent mode. It's a stability issue that might have a small probability to 
reproduce a failure case. 
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch

2023-02-17 Thread via GitHub


reta commented on code in PR #11:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1110194101


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java:
##
@@ -148,6 +150,7 @@ public OpensearchSinkBuilder setBulkFlushMaxActions(int 
numMaxActions) {
  * @param maxSizeMb the maximum size of buffered actions, in mb.
  * @return this builder
  */
+@SuppressWarnings("UnusedReturnValue")

Review Comment:
   Please remove, thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] lilyevsky commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch

2023-02-17 Thread via GitHub


lilyevsky commented on code in PR #11:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1110179327


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java:
##
@@ -148,6 +150,7 @@ public OpensearchSinkBuilder setBulkFlushMaxActions(int 
numMaxActions) {
  * @param maxSizeMb the maximum size of buffered actions, in mb.
  * @return this builder
  */
+@SuppressWarnings("UnusedReturnValue")

Review Comment:
   I put it because compiler gave me a warning about it. I can remove that 
suppression if you prefer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-31120) ConcurrentModificationException occurred in StringFunctionsITCase.test

2023-02-17 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690523#comment-17690523
 ] 

Shuiqiang Chen edited comment on FLINK-31120 at 2/17/23 6:17 PM:
-

Maybe it needs concurrent control for access to the static field 
`collectIterators` in `StreamExecutionEnvironment`. There are four test cases 
executing when running StringFunctionsITCase in a concurrent execution mode, 
that has chance  for a thread to add a collectorIterator through 
`registerCollectIterator` while there is a foreach loop in executeAsync() in 
another thread.


was (Author: csq):
Maybe tt needs concurrent control for access to the static field 
`collectIterators` in `StreamExecutionEnvironment`. There are four test cases 
executing when running StringFunctionsITCase in a concurrent execution mode, 
that has chance  for a thread to add a collectorIterator through 
`registerCollectIterator` while there is a foreach loop in executeAsync() in 
another thread.

> ConcurrentModificationException occurred in StringFunctionsITCase.test
> --
>
> Key: FLINK-31120
> URL: https://issues.apache.org/jira/browse/FLINK-31120
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Blocker
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334
> {code}
> Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 10.725 s <<< FAILURE! - in 
> org.apache.flink.table.planner.functions.StringFunctionsITCase
> Feb 17 04:51:25 [ERROR] 
> org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4]
>  Time elapsed: 4.367 s <<< ERROR!
> Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute 
> sql
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974)
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422)
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93)
> Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31124) Add it case for HiveTableSink speculative execution

2023-02-17 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690524#comment-17690524
 ] 

Shuiqiang Chen commented on FLINK-31124:


Hi [~zhuzh] I would like to help finish to issue, could you please assign it to 
me?

> Add it case for HiveTableSink speculative execution
> ---
>
> Key: FLINK-31124
> URL: https://issues.apache.org/jira/browse/FLINK-31124
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Biao Liu
>Priority: Minor
>
> The part of HiveTableSink has supported speculative execution in 
> https://issues.apache.org/jira/browse/FLINK-30823. We would like to add some 
> integration test cases for this feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31120) ConcurrentModificationException occurred in StringFunctionsITCase.test

2023-02-17 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690523#comment-17690523
 ] 

Shuiqiang Chen commented on FLINK-31120:


Maybe tt needs concurrent control for access to the static field 
`collectIterators` in `StreamExecutionEnvironment`. There are four test cases 
executing when running StringFunctionsITCase in a concurrent execution mode, 
that has chance  for a thread to add a collectorIterator through 
`registerCollectIterator` while there is a foreach loop in executeAsync() in 
another thread.

> ConcurrentModificationException occurred in StringFunctionsITCase.test
> --
>
> Key: FLINK-31120
> URL: https://issues.apache.org/jira/browse/FLINK-31120
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Blocker
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334
> {code}
> Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 10.725 s <<< FAILURE! - in 
> org.apache.flink.table.planner.functions.StringFunctionsITCase
> Feb 17 04:51:25 [ERROR] 
> org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4]
>  Time elapsed: 4.367 s <<< ERROR!
> Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute 
> sql
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974)
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422)
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93)
> Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] pgaref commented on a diff in pull request #21923: FLINK-13871: Consolidate volatile status fields in StreamTask

2023-02-17 Thread via GitHub


pgaref commented on code in PR #21923:
URL: https://github.com/apache/flink/pull/21923#discussion_r1110168340


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##
@@ -215,6 +215,9 @@
  */
 private final StreamTaskActionExecutor actionExecutor;
 
+/** Current state of the task, can be any of {@link TaskState}. */
+private TaskState taskState;

Review Comment:
   Yup, good catch Anton! 



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##
@@ -215,6 +215,9 @@
  */
 private final StreamTaskActionExecutor actionExecutor;
 
+/** Current state of the task, can be any of {@link TaskState}. */
+private TaskState taskState;

Review Comment:
   Yup, good catch, thanks Anton! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] akalash commented on a diff in pull request #21923: FLINK-13871: Consolidate volatile status fields in StreamTask

2023-02-17 Thread via GitHub


akalash commented on code in PR #21923:
URL: https://github.com/apache/flink/pull/21923#discussion_r1110137878


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##
@@ -215,6 +215,9 @@
  */
 private final StreamTaskActionExecutor actionExecutor;
 
+/** Current state of the task, can be any of {@link TaskState}. */
+private TaskState taskState;

Review Comment:
   Shouldn't it be volatile?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-13871) Consolidate isRunning, canceled, isFinished fields in StreamTask and SourceStreamTask

2023-02-17 Thread Anton Kalashnikov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Kalashnikov reassigned FLINK-13871:
-

Assignee: Panagiotis Garefalakis

> Consolidate isRunning, canceled, isFinished fields in StreamTask and 
> SourceStreamTask
> -
>
> Key: FLINK-13871
> URL: https://issues.apache.org/jira/browse/FLINK-13871
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Alex
>Assignee: Panagiotis Garefalakis
>Priority: Not a Priority
>  Labels: pull-request-available
>
> {{StreamTask}} has two {{volatile boolean}} fields ({{canceled}}, 
> {{isFinished}}) and {{SourceStreamTask}} has an additional {{isFinished}} 
> field.
> In practice, those fields are mutually exclusive and reflect different stages 
> of stream task's lifecycle. It should be possible to represent all three 
> fields as one (enumerated) state field.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] pgaref commented on pull request #21923: FLINK-13871: Consolidate volatile status fields in StreamTask

2023-02-17 Thread via GitHub


pgaref commented on PR #21923:
URL: https://github.com/apache/flink/pull/21923#issuecomment-1434978767

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pgaref commented on pull request #21923: FLINK-13871: Consolidate volatile status fields in StreamTask

2023-02-17 Thread via GitHub


pgaref commented on PR #21923:
URL: https://github.com/apache/flink/pull/21923#issuecomment-1434911355

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liuyongvs commented on pull request #21958: [FLINK-31118][table] Add ARRAY_UNION function.

2023-02-17 Thread via GitHub


liuyongvs commented on PR #21958:
URL: https://github.com/apache/flink/pull/21958#issuecomment-1434909262

   > Thanks for your contribution.
   > 
   > To be honest i didn't get the logic. What i did:
   > 
   > 1. Built it from the PR's branch
   > 2. Started standalone Flink
   > 3. Via sqlClient submitted several queries
   > 
   > ```sql
   > SELECT array_union(array[1], array[2]);
   > -- result array[1, 2]
   > -- this is OK
   > ```
   > 
   > ```sql
   > SELECT array_union(array[1], array[2, 3]);
   > -- result array[1, 2, 3]
   > --- this is OK
   > ```
   > 
   > ```sql
   > SELECT array_union(array[1], array[2, 3, null]);
   > -- result array[1, 2, 3, 0]
   > --- this is NOT OK
   > ```
   > 
   > ```sql
   > SELECT array_union(array[1], array[map['this is a key', 'this is a 
value']]);
   > -- result [1, 68]
   > -- this is NOT OK
   > ```
   > 
   > ```sql
   > SELECT array_union(array[1], array['this is a string']);
   > -- result [1, 16]
   > -- this is NOT OK
   > ```
   > 
   > ```sql
   > SELECT array_union(array[1], array[array[1, 2, 3]]);
   > -- result [1, 24]
   > -- this is NOT OK
   > ```
   
   the result is not ok like SELECT array_union(array[1], array[array[1, 2, 
3]]); SELECT array_union(array[1], array['this is a string']); i said before 
this two array type should be exception. we should do it like ARRAY_CONCAT is 
calcite. but i do not find a good way to express it in flink. so i ask you do 
you have a good implements and it is important for other array op like
array_except/array_intersect.. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liuyongvs closed pull request #21958: [FLINK-31118][table] Add ARRAY_UNION function.

2023-02-17 Thread via GitHub


liuyongvs closed pull request #21958: [FLINK-31118][table] Add ARRAY_UNION 
function.
URL: https://github.com/apache/flink/pull/21958


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch

2023-02-17 Thread via GitHub


reta commented on code in PR #11:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1110068155


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java:
##
@@ -148,6 +150,7 @@ public OpensearchSinkBuilder setBulkFlushMaxActions(int 
numMaxActions) {
  * @param maxSizeMb the maximum size of buffered actions, in mb.
  * @return this builder
  */
+@SuppressWarnings("UnusedReturnValue")

Review Comment:
   I don't understand why these suppression are needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liuyongvs commented on pull request #21958: [FLINK-31118][table] Add ARRAY_UNION function.

2023-02-17 Thread via GitHub


liuyongvs commented on PR #21958:
URL: https://github.com/apache/flink/pull/21958#issuecomment-1434905063

   > ```sql
   > SELECT array_union(array[1], array[array[1, 2, 3]]);
   > ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch

2023-02-17 Thread via GitHub


reta commented on code in PR #11:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1110066429


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java:
##
@@ -122,10 +128,11 @@
 } catch (Exception e) {
 throw new FlinkRuntimeException("Failed to open the 
OpensearchEmitter", e);
 }
+this.failureHandler = failureHandler;
 }
 
 @Override
-public void write(IN element, Context context) throws IOException, 
InterruptedException {
+public void write(IN element, Context context) throws InterruptedException 
{

Review Comment:
   Please revert these changes, they violate the `SinkWriter` interface 
   
   [1] 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SinkWriter.java#L41



##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java:
##
@@ -134,7 +141,7 @@ public void write(IN element, Context context) throws 
IOException, InterruptedEx
 }
 
 @Override
-public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
+public void flush(boolean endOfInput) throws InterruptedException {

Review Comment:
   Same, please revert



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liuyongvs commented on a diff in pull request #21947: [FLINK-31098][table] Add ARRAY_SIZE function.

2023-02-17 Thread via GitHub


liuyongvs commented on code in PR #21947:
URL: https://github.com/apache/flink/pull/21947#discussion_r1110053725


##
docs/data/sql_functions.yml:
##
@@ -617,6 +617,12 @@ collection:
   - sql: ARRAY_DISTINCT(haystack)
 table: haystack.arrayDistinct()
 description: Returns an array with unique elements. If the array itself is 
null, the function will return null. Keeps ordering of elements.
+  - sql: ARRAY_REMOVE(haystack, needle)
+table: haystack.arrayRemove(needle)
+description: Remove all elements that equal to element from array. If the 
array itself is null, the function will return null.
+  - sql: ARRAY_SIZE(haystack)
+table: haystack.arraySize()
+description: Returns the size of an array. If the array itself is null, 
the function will return null.

Review Comment:
   thanks for your details response, i will drop the array_size commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] lilyevsky commented on pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch

2023-02-17 Thread via GitHub


lilyevsky commented on PR #11:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/11#issuecomment-1434889977

   @reta I committed the changes you suggested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liuyongvs commented on a diff in pull request #21947: [FLINK-31098][table] Add ARRAY_SIZE function.

2023-02-17 Thread via GitHub


liuyongvs commented on code in PR #21947:
URL: https://github.com/apache/flink/pull/21947#discussion_r1110050019


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -178,6 +178,35 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 .runtimeClass(
 
"org.apache.flink.table.runtime.functions.scalar.ArrayDistinctFunction")
 .build();
+
+public static final BuiltInFunctionDefinition ARRAY_SIZE =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SIZE")
+.kind(SCALAR)
+.inputTypeStrategy(
+sequence(
+Collections.singletonList("haystack"),
+
Collections.singletonList(logical(LogicalTypeRoot.ARRAY
+.outputTypeStrategy(
+nullableIfArgs(ConstantArgumentCount.of(0), 
explicit(DataTypes.INT(
+.runtimeClass(
+
"org.apache.flink.table.runtime.functions.scalar.ArraySizeFunction")
+.build();
+
+public static final BuiltInFunctionDefinition ARRAY_REMOVE =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_REMOVE")
+.kind(SCALAR)
+.inputTypeStrategy(
+sequence(
+Arrays.asList("haystack", "needle"),
+Arrays.asList(
+logical(LogicalTypeRoot.ARRAY), 
ARRAY_ELEMENT_ARG)))

Review Comment:
   i test  SELECT array_contains(array[1, 2, 3], cast(null as int)) it also 
fails with exception.
   the reason is ARRAY_ELEMENT_ARG make the second arg must be same with array 
element type
   Caused by: org.apache.flink.table.api.ValidationException: Invalid function 
call:
   array_remove(ARRAY NOT NULL, INT).
   
   
   so select array_remove(array[1, 2, 3, null], cast(null as int));
   return array[1, 2, 3].
   
   do we should also fix the array_contains.



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -178,6 +178,35 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 .runtimeClass(
 
"org.apache.flink.table.runtime.functions.scalar.ArrayDistinctFunction")
 .build();
+
+public static final BuiltInFunctionDefinition ARRAY_SIZE =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SIZE")
+.kind(SCALAR)
+.inputTypeStrategy(
+sequence(
+Collections.singletonList("haystack"),
+
Collections.singletonList(logical(LogicalTypeRoot.ARRAY
+.outputTypeStrategy(
+nullableIfArgs(ConstantArgumentCount.of(0), 
explicit(DataTypes.INT(
+.runtimeClass(
+
"org.apache.flink.table.runtime.functions.scalar.ArraySizeFunction")
+.build();
+
+public static final BuiltInFunctionDefinition ARRAY_REMOVE =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_REMOVE")
+.kind(SCALAR)
+.inputTypeStrategy(
+sequence(
+Arrays.asList("haystack", "needle"),
+Arrays.asList(
+logical(LogicalTypeRoot.ARRAY), 
ARRAY_ELEMENT_ARG)))

Review Comment:
   i test  SELECT array_contains(array[1, 2, 3], cast(null as int)) it also 
fails with exception.
   the reason is ARRAY_ELEMENT_ARG make the second arg must be same with array 
element type
   Caused by: org.apache.flink.table.api.ValidationException: Invalid function 
call:
   array_remove(ARRAY NOT NULL, INT).
   
   
   so select array_remove(array[1, 2, 3, null], cast(null as int));
   return array[1, 2, 3].
   
   do we should also fix the array_contains.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] lilyevsky commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch

2023-02-17 Thread via GitHub


lilyevsky commented on code in PR #11:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1110040361


##
flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java:
##
@@ -174,7 +175,8 @@ void testIncrementByteOutMetric() throws Exception {
 new BulkProcessorConfig(flushAfterNActions, -1, -1, 
FlushBackoffType.NONE, 0, 0);
 
 try (final OpensearchWriter> writer =
-createWriter(index, false, bulkProcessorConfig, metricGroup)) {
+createWriter(

Review Comment:
   Agreed, it will be better not to touch that line. I did it because it needed 
to pass the metricGroup.
   So I am fixing the situation by adding yet another constructor, with 
metricGroup but without failureHandler.
   Also in the main constructor put the failureHandler at the end of the 
parameters list.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch

2023-02-17 Thread via GitHub


reta commented on code in PR #11:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1110047438


##
flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java:
##
@@ -174,7 +175,8 @@ void testIncrementByteOutMetric() throws Exception {
 new BulkProcessorConfig(flushAfterNActions, -1, -1, 
FlushBackoffType.NONE, 0, 0);
 
 try (final OpensearchWriter> writer =
-createWriter(index, false, bulkProcessorConfig, metricGroup)) {
+createWriter(

Review Comment:
   No need for constructor, `createWriter` is a method in this class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] lilyevsky commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch

2023-02-17 Thread via GitHub


lilyevsky commented on code in PR #11:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1110040361


##
flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java:
##
@@ -174,7 +175,8 @@ void testIncrementByteOutMetric() throws Exception {
 new BulkProcessorConfig(flushAfterNActions, -1, -1, 
FlushBackoffType.NONE, 0, 0);
 
 try (final OpensearchWriter> writer =
-createWriter(index, false, bulkProcessorConfig, metricGroup)) {
+createWriter(

Review Comment:
   Agreed, it will be better not to touch that line. I did it because it needed 
to pass the metricGroup.
   So I am fixing the situation by adding yet another constructor, with 
metricGroup but without failureHandler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #21960: [FLINK-31036][test][checkpoint] FileSystemCheckpointStorage as the default for test

2023-02-17 Thread via GitHub


1996fanrui commented on code in PR #21960:
URL: https://github.com/apache/flink/pull/21960#discussion_r1110001094


##
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java:
##
@@ -199,6 +200,9 @@ private void startMiniCluster() throws Exception {
 new 
Configuration(miniClusterResourceConfiguration.getConfiguration());
 configuration.setString(
 CoreOptions.TMP_DIRS, 
temporaryFolder.newFolder().getAbsolutePath());
+configuration.set(
+CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+temporaryFolder.newFolder().toURI().toString());

Review Comment:
   FileSystemCheckpointStorage is compatible with more scenarios than 
JobManagerCheckpointStorage, for example: scenarios with medium or large state 
sizes. But its performance may be a little bit worse in small state scenarios.
   
   I can fix the bug that the test fails, and then we look at the test running 
time. If there is no significant increase, `FileSystemCheckpointStorage` can be 
used as the default, what do you think?
   
   Actually, I don't think there be much impact on CI running time after 
FileSystemCheckpointStorage is the default, because 
`JobManagerCheckpointStorage` just works when state size < 5MB, it's small 
state. Small state should not consume too much time when using 
FileSystemCheckpointStorage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pnowojski commented on a diff in pull request #21960: [FLINK-31036][test][checkpoint] FileSystemCheckpointStorage as the default for test

2023-02-17 Thread via GitHub


pnowojski commented on code in PR #21960:
URL: https://github.com/apache/flink/pull/21960#discussion_r1110017809


##
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java:
##
@@ -199,6 +200,9 @@ private void startMiniCluster() throws Exception {
 new 
Configuration(miniClusterResourceConfiguration.getConfiguration());
 configuration.setString(
 CoreOptions.TMP_DIRS, 
temporaryFolder.newFolder().getAbsolutePath());
+configuration.set(
+CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+temporaryFolder.newFolder().toURI().toString());

Review Comment:
   > If there is no significant increase, FileSystemCheckpointStorage can be 
used as the default, what do you think?
   
   That might be ok. In that case we would also need to check the benchmark 
results.
   
   @zentol had another suggestion to instead increase Akka's frame size and 
keep the memory storage?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #21947: [FLINK-31098][table] Add ARRAY_SIZE function.

2023-02-17 Thread via GitHub


snuyanzin commented on code in PR #21947:
URL: https://github.com/apache/flink/pull/21947#discussion_r1110013444


##
docs/data/sql_functions.yml:
##
@@ -617,6 +617,12 @@ collection:
   - sql: ARRAY_DISTINCT(haystack)
 table: haystack.arrayDistinct()
 description: Returns an array with unique elements. If the array itself is 
null, the function will return null. Keeps ordering of elements.
+  - sql: ARRAY_REMOVE(haystack, needle)
+table: haystack.arrayRemove(needle)
+description: Remove all elements that equal to element from array. If the 
array itself is null, the function will return null.
+  - sql: ARRAY_SIZE(haystack)
+table: haystack.arraySize()
+description: Returns the size of an array. If the array itself is null, 
the function will return null.

Review Comment:
   Well that's very arguable since different engines support different syntax
   BigQuery has `ARRAY_LENGTH`
   Hive has `SIZE`
   Vertica has `ARRAY_LENGTH`
   Postgresql `ARRAY_LENGTH`
   
   why to support only snowflake and spark?
   Support of everything seems to be an overkill...
   I guess somewhere it should be stopped



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #21960: [FLINK-31036][test][checkpoint] FileSystemCheckpointStorage as the default for test

2023-02-17 Thread via GitHub


1996fanrui commented on code in PR #21960:
URL: https://github.com/apache/flink/pull/21960#discussion_r1110001094


##
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java:
##
@@ -199,6 +200,9 @@ private void startMiniCluster() throws Exception {
 new 
Configuration(miniClusterResourceConfiguration.getConfiguration());
 configuration.setString(
 CoreOptions.TMP_DIRS, 
temporaryFolder.newFolder().getAbsolutePath());
+configuration.set(
+CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+temporaryFolder.newFolder().toURI().toString());

Review Comment:
   FileSystemCheckpointStorage is compatible with more scenarios than 
JobManagerCheckpointStorage, for example: scenarios with medium or large state 
sizes. But its performance may be a little bit worse in small state scenarios.
   
   I can fix the bug that the test fails, and then we look at the test 
duration. If there is no significant increase, `FileSystemCheckpointStorage` 
can be used as the default, what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liuyongvs commented on a diff in pull request #21947: [FLINK-31098][table] Add ARRAY_SIZE function.

2023-02-17 Thread via GitHub


liuyongvs commented on code in PR #21947:
URL: https://github.com/apache/flink/pull/21947#discussion_r1109996439


##
docs/data/sql_functions.yml:
##
@@ -617,6 +617,12 @@ collection:
   - sql: ARRAY_DISTINCT(haystack)
 table: haystack.arrayDistinct()
 description: Returns an array with unique elements. If the array itself is 
null, the function will return null. Keeps ordering of elements.
+  - sql: ARRAY_REMOVE(haystack, needle)
+table: haystack.arrayRemove(needle)
+description: Remove all elements that equal to element from array. If the 
array itself is null, the function will return null.
+  - sql: ARRAY_SIZE(haystack)
+table: haystack.arraySize()
+description: Returns the size of an array. If the array itself is null, 
the function will return null.

Review Comment:
   To align the other engines like spark,snowflake, so that users can migrate 
easily
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-mongodb] zentol commented on a diff in pull request #3: [FLINK-31063] Prevent duplicate reading when restoring from a checkpoint.

2023-02-17 Thread via GitHub


zentol commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/3#discussion_r1109995851


##
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/split/MongoSourceSplitState.java:
##
@@ -19,28 +19,19 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 
+import org.bson.BsonDocument;
+
 /** MongoDB source split state. */
 @PublicEvolving
-public class MongoSourceSplitState {
+public abstract class MongoSourceSplitState {

Review Comment:
   Maybe consider introducing a `MongoSourceSplitState` interface instead. We 
have then more freedom to change the internals than when we make the 
implementation itself part of the API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pnowojski commented on a diff in pull request #21960: [FLINK-31036][test][checkpoint] FileSystemCheckpointStorage as the default for test

2023-02-17 Thread via GitHub


pnowojski commented on code in PR #21960:
URL: https://github.com/apache/flink/pull/21960#discussion_r1109989565


##
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java:
##
@@ -199,6 +200,9 @@ private void startMiniCluster() throws Exception {
 new 
Configuration(miniClusterResourceConfiguration.getConfiguration());
 configuration.setString(
 CoreOptions.TMP_DIRS, 
temporaryFolder.newFolder().getAbsolutePath());
+configuration.set(
+CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+temporaryFolder.newFolder().toURI().toString());

Review Comment:
   I think with your change I would still set the `FileSystemCheckpointStorage` 
everytime we are using unaligned checkpoints. So the question is whether we 
want to use it by default always, or only for unaligned checkpoints.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #21960: [FLINK-31036][test][checkpoint] FileSystemCheckpointStorage as the default for test

2023-02-17 Thread via GitHub


1996fanrui commented on code in PR #21960:
URL: https://github.com/apache/flink/pull/21960#discussion_r1109958093


##
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java:
##
@@ -199,6 +200,9 @@ private void startMiniCluster() throws Exception {
 new 
Configuration(miniClusterResourceConfiguration.getConfiguration());
 configuration.setString(
 CoreOptions.TMP_DIRS, 
temporaryFolder.newFolder().getAbsolutePath());
+configuration.set(
+CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+temporaryFolder.newFolder().toURI().toString());

Review Comment:
   Unaligned checkpoint or change log state backend are enabled in the 
`TestStreamEnvironment#randomizeConfiguration`. And I see a large number of 
tests calling `FsStateChangelogStorageFactory.configure()` to Configure DFS 
DSTL in FLINK-23279 [1], including: 
`StateChangelogOptions.STATE_CHANGE_LOG_STORAGE` and 
`FsStateChangelogOptions.BASE_PATH`. 
   
   If we are worried about increasing the running time, we can set 
`CheckpointingOptions.CHECKPOINT_STORAGE` and 
`CheckpointingOptions.CHECKPOINTS_DIRECTORY` in a similar way, what do you 
think?
   
   [1] commit link: 
https://github.com/apache/flink/pull/16685/commits/9cc4fe09de5a540390d2d5ad1a72f551c0d0e875



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #21934: [FLINK-27998][table] Upgrade Calcite to 1.30.0

2023-02-17 Thread via GitHub


snuyanzin commented on code in PR #21934:
URL: https://github.com/apache/flink/pull/21934#discussion_r1109964501


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TimeIndicatorRelDataType.scala:
##
@@ -28,11 +28,11 @@ import java.lang
  * basic SQL type.
  */
 class TimeIndicatorRelDataType(
-val typeSystem: RelDataTypeSystem,
+val typeSystemField: RelDataTypeSystem,

Review Comment:
   Since `typeSystem` in `BasicSqlType` is now protected



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #21960: [FLINK-31036][test][checkpoint] FileSystemCheckpointStorage as the default for test

2023-02-17 Thread via GitHub


1996fanrui commented on code in PR #21960:
URL: https://github.com/apache/flink/pull/21960#discussion_r1109958093


##
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java:
##
@@ -199,6 +200,9 @@ private void startMiniCluster() throws Exception {
 new 
Configuration(miniClusterResourceConfiguration.getConfiguration());
 configuration.setString(
 CoreOptions.TMP_DIRS, 
temporaryFolder.newFolder().getAbsolutePath());
+configuration.set(
+CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+temporaryFolder.newFolder().toURI().toString());

Review Comment:
   Unaligned checkpoint or change log state backend are enabled in the 
`TestStreamEnvironment#randomizeConfiguration`. And I see a large number of 
tests calling `FsStateChangelogStorageFactory.configure()` to Configure DFS 
DSTL in FLINK-23279, including: 
`StateChangelogOptions.STATE_CHANGE_LOG_STORAGE` and 
`FsStateChangelogOptions.BASE_PATH`. 
   
   If we are worried about increasing the running time, we can set 
`CheckpointingOptions.CHECKPOINT_STORAGE` and 
`CheckpointingOptions.CHECKPOINTS_DIRECTORY` in a similar way, what do you 
think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-15550) testCancelTaskExceptionAfterTaskMarkedFailed failed on azure

2023-02-17 Thread Anton Kalashnikov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Kalashnikov closed FLINK-15550.
-
Fix Version/s: 1.16.2
   Resolution: Fixed

> testCancelTaskExceptionAfterTaskMarkedFailed failed on azure
> 
>
> Key: FLINK-15550
> URL: https://issues.apache.org/jira/browse/FLINK-15550
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.11.0, 1.12.5, 1.13.6, 1.14.3, 1.16.0
>Reporter: Yun Tang
>Assignee: Anton Kalashnikov
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.17.0, 1.16.2
>
>
> Instance: 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=4241=ms.vss-test-web.build-test-results-tab=12434=108939=debug
> {code:java}
> java.lang.AssertionError: expected: but was:
>   at 
> org.apache.flink.runtime.taskmanager.TaskTest.testCancelTaskExceptionAfterTaskMarkedFailed(TaskTest.java:525)
> {code}
> {code:java}
> expected: but was:
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-15550) testCancelTaskExceptionAfterTaskMarkedFailed failed on azure

2023-02-17 Thread Anton Kalashnikov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690444#comment-17690444
 ] 

Anton Kalashnikov commented on FLINK-15550:
---

Merged to release-1.16: 658ba8eb, a3c9eb40, cb9f183f

> testCancelTaskExceptionAfterTaskMarkedFailed failed on azure
> 
>
> Key: FLINK-15550
> URL: https://issues.apache.org/jira/browse/FLINK-15550
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.11.0, 1.12.5, 1.13.6, 1.14.3, 1.16.0
>Reporter: Yun Tang
>Assignee: Anton Kalashnikov
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.17.0
>
>
> Instance: 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=4241=ms.vss-test-web.build-test-results-tab=12434=108939=debug
> {code:java}
> java.lang.AssertionError: expected: but was:
>   at 
> org.apache.flink.runtime.taskmanager.TaskTest.testCancelTaskExceptionAfterTaskMarkedFailed(TaskTest.java:525)
> {code}
> {code:java}
> expected: but was:
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch

2023-02-17 Thread via GitHub


reta commented on code in PR #11:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1109948032


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/FailureHandler.java:
##
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.annotation.Public;
+
+import java.io.Serializable;
+
+/** Handler to process failures. */
+@Public

Review Comment:
   `@Public` -> `@PublicEvolving` would be probably better (to leave some space 
to evolve the API)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch

2023-02-17 Thread via GitHub


reta commented on PR #11:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/11#issuecomment-1434782202

   @lilyevsky a few minor comments, LGTM otherwise!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch

2023-02-17 Thread via GitHub


reta commented on code in PR #11:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1109943802


##
flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java:
##
@@ -174,7 +175,8 @@ void testIncrementByteOutMetric() throws Exception {
 new BulkProcessorConfig(flushAfterNActions, -1, -1, 
FlushBackoffType.NONE, 0, 0);
 
 try (final OpensearchWriter> writer =
-createWriter(index, false, bulkProcessorConfig, metricGroup)) {
+createWriter(

Review Comment:
   You probably don't need this change since `createWriter` has an overloaded 
version without failure handler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch

2023-02-17 Thread via GitHub


reta commented on code in PR #11:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1109940919


##
flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java:
##
@@ -238,19 +240,68 @@ void testCurrentSendTime() throws Exception {
 }
 }
 
+private class TestHandler implements FailureHandler {

Review Comment:
   Probably `private static` is better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch

2023-02-17 Thread via GitHub


reta commented on code in PR #11:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1109941466


##
flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java:
##
@@ -238,19 +240,68 @@ void testCurrentSendTime() throws Exception {
 }
 }
 
+private class TestHandler implements FailureHandler {
+private boolean failed = false;
+
+private synchronized void setFailed() {
+failed = true;
+}
+
+public boolean isFailed() {
+return failed;
+}
+
+@java.lang.Override
+public void onFailure(Throwable failure) {
+setFailed();
+}
+}
+
+@Test
+void testWriteErrorOnUpdate() throws Exception {
+final String index = "test-bulk-flush-with-error";
+final int flushAfterNActions = 1;
+final BulkProcessorConfig bulkProcessorConfig =
+new BulkProcessorConfig(flushAfterNActions, -1, -1, 
FlushBackoffType.NONE, 0, 0);
+
+final TestHandler testHandler = new TestHandler();
+try (final OpensearchWriter> writer =
+createWriter(index, true, bulkProcessorConfig, testHandler)) {
+// Trigger an error by updating non-existing document
+writer.write(Tuple2.of(1, "u" + buildMessage(1)), null);
+context.assertThatIdsAreNotWritten(index, 1);
+assertThat(testHandler.isFailed()).isEqualTo(true);

Review Comment:
   :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch

2023-02-17 Thread via GitHub


reta commented on code in PR #11:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1109940526


##
flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java:
##
@@ -238,19 +240,68 @@ void testCurrentSendTime() throws Exception {
 }
 }
 
+private class TestHandler implements FailureHandler {
+private boolean failed = false;
+
+private synchronized void setFailed() {
+failed = true;
+}
+
+public boolean isFailed() {
+return failed;
+}
+
+@java.lang.Override

Review Comment:
   Super nit, just `@Override`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch

2023-02-17 Thread via GitHub


reta commented on code in PR #11:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/11#discussion_r1109939155


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java:
##
@@ -103,7 +107,8 @@
 BulkProcessorConfig bulkProcessorConfig,
 NetworkClientConfig networkClientConfig,
 SinkWriterMetricGroup metricGroup,
-MailboxExecutor mailboxExecutor) {
+MailboxExecutor mailboxExecutor,

Review Comment:
   Please update javadocs with `failureHandler` arg



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] akalash merged pull request #21919: [FLINK-15550][runtime] The static latches in TaskTest were replaced by latches from invokable objects

2023-02-17 Thread via GitHub


akalash merged PR #21919:
URL: https://github.com/apache/flink/pull/21919


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] lindong28 commented on pull request #212: [FLINK-31125] Flink ML benchmark framework should minimize the source operator overhead

2023-02-17 Thread via GitHub


lindong28 commented on PR #212:
URL: https://github.com/apache/flink-ml/pull/212#issuecomment-1434767219

   @zhipeng93 Can you help review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pnowojski commented on a diff in pull request #21960: [FLINK-31036][test][checkpoint] FileSystemCheckpointStorage as the default for test

2023-02-17 Thread via GitHub


pnowojski commented on code in PR #21960:
URL: https://github.com/apache/flink/pull/21960#discussion_r1109920354


##
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java:
##
@@ -199,6 +200,9 @@ private void startMiniCluster() throws Exception {
 new 
Configuration(miniClusterResourceConfiguration.getConfiguration());
 configuration.setString(
 CoreOptions.TMP_DIRS, 
temporaryFolder.newFolder().getAbsolutePath());
+configuration.set(
+CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+temporaryFolder.newFolder().toURI().toString());

Review Comment:
   Question, should we have this changed for everything using mini cluster, or 
only for our tests that are using unaligned checkpoints? 樂 
   
   Maybe for example because of benchmarks, and overall running times (costs of 
our CI), we should keep using memory storage if not needed? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31095) FileSink doesn't work with s3a on EKS

2023-02-17 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690431#comment-17690431
 ] 

Martijn Visser commented on FLINK-31095:


I did a quick check on the PR belonging to FLINK-23487 which includes this 
comment from [~airblader] in 
https://github.com/apache/flink/pull/16592/files#r679835626

| For Hadoop you need special configuration because Hadoop ships its own 
credentials provider chain mechanism.

I am by no means an S3 or a Hadoop expert, but shouldn't it then be possible to 
follow 
https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/assumed_roles.html
 to setup IRSA?

> FileSink doesn't work with s3a on EKS
> -
>
> Key: FLINK-31095
> URL: https://issues.apache.org/jira/browse/FLINK-31095
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: Sylvia Lin
>Priority: Major
>
> FileSink gives below exception on AWS EKS cluster:
> {code:java}
> Caused by: java.lang.UnsupportedOperationException: This s3 file system 
> implementation does not support recoverable writers.
>   at 
> org.apache.flink.fs.s3.common.FlinkS3FileSystem.createRecoverableWriter(FlinkS3FileSystem.java:136)
>  ~[?:?]
>   at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:134)
>  ~[flink-dist-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.createBucketWriter(FileSink.java:475)
>  ~[flink-connector-files-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer(FileSink.java:466)
>  ~[flink-connector-files-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.connector.file.sink.FileSink.getCommittableSerializer(FileSink.java:175)
>  ~[flink-connector-files-1.16.1.jar:1.16.1]{code}
> [https://github.com/apache/flink/blob/278dc7b793303d228f7816585054629708983af6/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java#LL136C16-L136C16]
> And this may be related to 
> https://issues.apache.org/jira/browse/FLINK-23487?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31036) StateCheckpointedITCase timed out

2023-02-17 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-31036:
---
Fix Version/s: 1.17.0
   1.18.0

> StateCheckpointedITCase timed out
> -
>
> Key: FLINK-31036
> URL: https://issues.apache.org/jira/browse/FLINK-31036
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.17.0, 1.18.0
>
> Attachments: image-2023-02-16-20-29-52-050.png
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46023=logs=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3=0c010d0c-3dec-5bf1-d408-7b18988b1b2b=10608
> {code}
> "Legacy Source Thread - Source: Custom Source -> Filter (6/12)#69980" 
> #13718026 prio=5 os_prio=0 tid=0x7f05f44f0800 nid=0x128157 waiting on 
> condition [0x7f059feef000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xf0a974e8> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:384)
>   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:356)
>   at 
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:414)
>   at 
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:390)
>   at 
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:328)
>   at 
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:161)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>   at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:91)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:59)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:31)
>   at 
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
>   at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils$$Lambda$1311/1256184070.accept(Unknown
>  Source)
>   at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
>   at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
>   at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
>   - locked <0xd55035c0> (a java.lang.Object)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
>   at 
> org.apache.flink.test.checkpointing.StateCheckpointedITCase$StringGeneratingSourceFunction.run(StateCheckpointedITCase.java:178)
>   - locked <0xd55035c0> (a java.lang.Object)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
>   at 
> 

[jira] [Updated] (FLINK-31125) Flink ML benchmark framework should minimize the source operator overhead

2023-02-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31125:
---
Labels: pull-request-available  (was: )

> Flink ML benchmark framework should minimize the source operator overhead
> -
>
> Key: FLINK-31125
> URL: https://issues.apache.org/jira/browse/FLINK-31125
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.2.0
>
>
> Flink ML benchmark framework estimates the throughput by having a source 
> operator generate a given number (e.g. 10^7) of input records with random 
> values, let the given AlgoOperator process these input records, and divide 
> the number of records by the total execution time. 
> The overhead of generating random values for all input records has observable 
> impact on the estimated throughput. We would like to minimize the overhead of 
> the source operator so that the benchmark result can focus on the throughput 
> of the AlgoOperator as much as possible.
> Note that [spark-sql-perf|https://github.com/databricks/spark-sql-perf] 
> generates all input records in advance into memory before running the 
> benchmark. This allows Spark ML benchmark to read records from memory instead 
> of generating values for those records during the benchmark.
> We can generate value once and re-use it for all input records. This approach 
> minimizes the source operator head and allows us to compare Flink ML 
> benchmark result with Spark ML benchmark result (from spark-sql-perf) fairly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] lindong28 opened a new pull request, #212: [FLINK-31125] Flink ML benchmark framework should minimize the source operator overhead

2023-02-17 Thread via GitHub


lindong28 opened a new pull request, #212:
URL: https://github.com/apache/flink-ml/pull/212

   ## What is the purpose of the change
   
   Minimize the source operator overhead in Flink ML benchmark.
   
   ## Brief change log
   
   - Updated RowGenerator to generate row value once and re-use the value for 
all emitted rows.
   - Renamed RowGenerator#nextRow to RowGenerator#getRow. This is because there 
is no order of rows emitted by `nextRow`. 
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31125) Flink ML benchmark framework should minimize the source operator overhead

2023-02-17 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-31125:
-
Description: 
Flink ML benchmark framework estimates the throughput by having a source 
operator generate a given number (e.g. 10^7) of input records with random 
values, let the given AlgoOperator process these input records, and divide the 
number of records by the total execution time. 

The overhead of generating random values for all input records has observable 
impact on the estimated throughput. We would like to minimize the overhead of 
the source operator so that the benchmark result can focus on the throughput of 
the AlgoOperator as much as possible.

Note that [spark-sql-perf|https://github.com/databricks/spark-sql-perf] 
generates all input records in advance into memory before running the 
benchmark. This allows Spark ML benchmark to read records from memory instead 
of generating values for those records during the benchmark.

We can generate value once and re-use it for all input records. This approach 
minimizes the source operator head and allows us to compare Flink ML benchmark 
result with Spark ML benchmark result (from spark-sql-perf) fairly.




  was:
Flink ML benchmark framework estimates the throughput by having a source 
operator generate a given number (e.g. 10^7) of input records with random 
values, let the given AlgoOperator process these input records, and divide the 
number of records by the total execution time. 

The overhead of generating random values for all input records has observable 
impact on the estimated throughput. We would like to minimize the overhead of 
the source operator so that the benchmark result can focus on the throughput of 
the AlgoOperator as much as possible.

Note that [spark-sql-perf|https://github.com/databricks/spark-sql-perf] 
generates all input records in advance into memory before running the 
benchmark. This allows Spark ML benchmark to read records from memory instead 
of generating values for those records during the benchmark.

We can generate value once and re-use it for all input records. This approach 
minimizes the overhead of source operator and allow us to compare the Flink ML 
benchmark result with Spark ML benchmark result (using spark-sql-perf) fairly.





> Flink ML benchmark framework should minimize the source operator overhead
> -
>
> Key: FLINK-31125
> URL: https://issues.apache.org/jira/browse/FLINK-31125
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: ml-2.2.0
>
>
> Flink ML benchmark framework estimates the throughput by having a source 
> operator generate a given number (e.g. 10^7) of input records with random 
> values, let the given AlgoOperator process these input records, and divide 
> the number of records by the total execution time. 
> The overhead of generating random values for all input records has observable 
> impact on the estimated throughput. We would like to minimize the overhead of 
> the source operator so that the benchmark result can focus on the throughput 
> of the AlgoOperator as much as possible.
> Note that [spark-sql-perf|https://github.com/databricks/spark-sql-perf] 
> generates all input records in advance into memory before running the 
> benchmark. This allows Spark ML benchmark to read records from memory instead 
> of generating values for those records during the benchmark.
> We can generate value once and re-use it for all input records. This approach 
> minimizes the source operator head and allows us to compare Flink ML 
> benchmark result with Spark ML benchmark result (from spark-sql-perf) fairly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-opensearch] lilyevsky commented on pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch

2023-02-17 Thread via GitHub


lilyevsky commented on PR #11:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/11#issuecomment-1434732687

   @reta Added the test. I think now it is good.
   I also tested the test itself, by simulating the "bad" handler, making the 
test fail, just to make sure it checks it properly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31125) Flink ML benchmark framework should minimize the source operator overhead

2023-02-17 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-31125:
-
Summary: Flink ML benchmark framework should minimize the source operator 
overhead  (was: Flink ML benchmark result should not include data generation 
overhead)

> Flink ML benchmark framework should minimize the source operator overhead
> -
>
> Key: FLINK-31125
> URL: https://issues.apache.org/jira/browse/FLINK-31125
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: ml-2.2.0
>
>
> Flink ML benchmark framework estimates the throughput by having a source 
> operator generate a given number (e.g. 10^7) of input records with random 
> values, let the given AlgoOperator process these input records, and divide 
> the number of records by the total execution time. 
> The overhead of generating random values for all input records has observable 
> impact on the estimated throughput. We would like to minimize the overhead of 
> the source operator so that the benchmark result can focus on the throughput 
> of the AlgoOperator as much as possible.
> Note that [spark-sql-perf|https://github.com/databricks/spark-sql-perf] 
> generates all input records in advance into memory before running the 
> benchmark. This allows Spark ML benchmark to read records from memory instead 
> of generating values for those records during the benchmark.
> We can generate value once and re-use it for all input records. This approach 
> minimizes the overhead of source operator and allow us to compare the Flink 
> ML benchmark result with Spark ML benchmark result (using spark-sql-perf) 
> fairly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31125) Flink ML benchmark result should not include data generation overhead

2023-02-17 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-31125:
-
Description: 
Flink ML benchmark framework estimates the throughput by having a source 
operator generate a given number (e.g. 10^7) of input records with random 
values, let the given AlgoOperator process these input records, and divide the 
number of records by the total execution time. 

The overhead of generating random values for all input records has observable 
impact on the estimated throughput. We would like to minimize the overhead of 
the source operator so that the benchmark result can focus on the throughput of 
the AlgoOperator as much as possible.

Note that [spark-sql-perf|https://github.com/databricks/spark-sql-perf] 
generates all input records in advance into memory before running the 
benchmark. This allows Spark ML benchmark to read records from memory instead 
of generating values for those records during the benchmark.

We can generate value once and re-use it for all input records. This approach 
minimizes the overhead of source operator and allow us to compare the Flink ML 
benchmark result with Spark ML benchmark result (using spark-sql-perf) fairly.




> Flink ML benchmark result should not include data generation overhead
> -
>
> Key: FLINK-31125
> URL: https://issues.apache.org/jira/browse/FLINK-31125
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: ml-2.2.0
>
>
> Flink ML benchmark framework estimates the throughput by having a source 
> operator generate a given number (e.g. 10^7) of input records with random 
> values, let the given AlgoOperator process these input records, and divide 
> the number of records by the total execution time. 
> The overhead of generating random values for all input records has observable 
> impact on the estimated throughput. We would like to minimize the overhead of 
> the source operator so that the benchmark result can focus on the throughput 
> of the AlgoOperator as much as possible.
> Note that [spark-sql-perf|https://github.com/databricks/spark-sql-perf] 
> generates all input records in advance into memory before running the 
> benchmark. This allows Spark ML benchmark to read records from memory instead 
> of generating values for those records during the benchmark.
> We can generate value once and re-use it for all input records. This approach 
> minimizes the overhead of source operator and allow us to compare the Flink 
> ML benchmark result with Spark ML benchmark result (using spark-sql-perf) 
> fairly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31084) DataGen sequence generator requires the definition of start/end values

2023-02-17 Thread xzw0223 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690427#comment-17690427
 ] 

xzw0223 commented on FLINK-31084:
-

My idea is that we don't have to pre-load all the data, we generate the data 
when we call the next method, then we record the value of the current generated 
data and save it as state, and when ck and restore we just need to recalculate 
the new value of the location we have sent to and send it downstream.

This avoids the time consuming process of preloading all the data.

[~mapohl] I would like to hear your views,looking forward for your reply, thank 
you.

> DataGen sequence generator requires the definition of start/end values
> --
>
> Key: FLINK-31084
> URL: https://issues.apache.org/jira/browse/FLINK-31084
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Matthias Pohl
>Assignee: xzw0223
>Priority: Major
>  Labels: starter
>
> The [DataGen connector's 
> parameters|https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/datagen/#connector-options]
>  are not precisely documented: The start/end parameters are labeled as 
> required for the sequence generator even though they are (see 
> [SequenceGeneratorVisitor:85ff|https://github.com/apache/flink/blob/ef9ce854a3169014001f39e0d5908c703453f2b8/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/SequenceGeneratorVisitor.java#L85]).
> But instead of updating the documentation, we should just come up with 
> reasonable default values. As a user, I would expect the positive integer 
> values to be returned starting from 0 if I don't specify anything here.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31084) DataGen sequence generator requires the definition of start/end values

2023-02-17 Thread xzw0223 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690426#comment-17690426
 ] 

xzw0223 commented on FLINK-31084:
-

There is another problem, because we need to write the data generation into the 
deque in advance, the processing time at this stage is very time-consuming, and 
this problem will also exist when triggering ck and restoring the state, 
because their processing methods are similar, both need for loop.

See also 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java#L91

I run into this problem a lot when testing with dataGen.

 

> DataGen sequence generator requires the definition of start/end values
> --
>
> Key: FLINK-31084
> URL: https://issues.apache.org/jira/browse/FLINK-31084
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Matthias Pohl
>Assignee: xzw0223
>Priority: Major
>  Labels: starter
>
> The [DataGen connector's 
> parameters|https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/datagen/#connector-options]
>  are not precisely documented: The start/end parameters are labeled as 
> required for the sequence generator even though they are (see 
> [SequenceGeneratorVisitor:85ff|https://github.com/apache/flink/blob/ef9ce854a3169014001f39e0d5908c703453f2b8/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/SequenceGeneratorVisitor.java#L85]).
> But instead of updating the documentation, we should just come up with 
> reasonable default values. As a user, I would expect the positive integer 
> values to be returned starting from 0 if I don't specify anything here.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31125) Flink ML benchmark result should not include data generation overhead

2023-02-17 Thread Dong Lin (Jira)
Dong Lin created FLINK-31125:


 Summary: Flink ML benchmark result should not include data 
generation overhead
 Key: FLINK-31125
 URL: https://issues.apache.org/jira/browse/FLINK-31125
 Project: Flink
  Issue Type: Improvement
  Components: Library / Machine Learning
Reporter: Dong Lin
 Fix For: ml-2.2.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31125) Flink ML benchmark result should not include data generation overhead

2023-02-17 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin reassigned FLINK-31125:


Assignee: Dong Lin

> Flink ML benchmark result should not include data generation overhead
> -
>
> Key: FLINK-31125
> URL: https://issues.apache.org/jira/browse/FLINK-31125
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: ml-2.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-opensearch] lilyevsky commented on pull request #12: [FLINK-30998] Add test for FailureHandler

2023-02-17 Thread via GitHub


lilyevsky commented on PR #12:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/12#issuecomment-1434706713

   Opened by mistake


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] lilyevsky closed pull request #12: [FLINK-30998] Add test for FailureHandler

2023-02-17 Thread via GitHub


lilyevsky closed pull request #12: [FLINK-30998] Add test for FailureHandler
URL: https://github.com/apache/flink-connector-opensearch/pull/12


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on pull request #10: [FLINK-31068] Document how to use Opensearch connector with OpenSearch 1.x / 2.x / 3.x (upcoming) clusters

2023-02-17 Thread via GitHub


reta commented on PR #10:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/10#issuecomment-1434684899

   @MartijnVisser could you have a second to look (only docs update), thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] akalash commented on pull request #21919: [FLINK-15550][runtime] The static latches in TaskTest were replaced by latches from invokable objects

2023-02-17 Thread via GitHub


akalash commented on PR #21919:
URL: https://github.com/apache/flink/pull/21919#issuecomment-1434659103

   @pnowojski, Can you take a look at it please? it is just a backport of the 
ticket which you have reviewed already. There was no any conflicts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21960: [FLINK-31036][test][checkpoint] FileSystemCheckpointStorage as the default for test

2023-02-17 Thread via GitHub


flinkbot commented on PR #21960:
URL: https://github.com/apache/flink/pull/21960#issuecomment-1434642730

   
   ## CI report:
   
   * 52974fcfa9fa636359d179eb556af6f82bf1ff96 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31036) StateCheckpointedITCase timed out

2023-02-17 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-31036:

Component/s: Tests

> StateCheckpointedITCase timed out
> -
>
> Key: FLINK-31036
> URL: https://issues.apache.org/jira/browse/FLINK-31036
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available, test-stability
> Attachments: image-2023-02-16-20-29-52-050.png
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46023=logs=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3=0c010d0c-3dec-5bf1-d408-7b18988b1b2b=10608
> {code}
> "Legacy Source Thread - Source: Custom Source -> Filter (6/12)#69980" 
> #13718026 prio=5 os_prio=0 tid=0x7f05f44f0800 nid=0x128157 waiting on 
> condition [0x7f059feef000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xf0a974e8> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:384)
>   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:356)
>   at 
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:414)
>   at 
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:390)
>   at 
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:328)
>   at 
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:161)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>   at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:91)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:59)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:31)
>   at 
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
>   at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils$$Lambda$1311/1256184070.accept(Unknown
>  Source)
>   at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
>   at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
>   at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
>   - locked <0xd55035c0> (a java.lang.Object)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
>   at 
> org.apache.flink.test.checkpointing.StateCheckpointedITCase$StringGeneratingSourceFunction.run(StateCheckpointedITCase.java:178)
>   - locked <0xd55035c0> (a java.lang.Object)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)
> {code}



--
This 

[jira] [Updated] (FLINK-31036) StateCheckpointedITCase timed out

2023-02-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31036:
---
Labels: pull-request-available test-stability  (was: test-stability)

> StateCheckpointedITCase timed out
> -
>
> Key: FLINK-31036
> URL: https://issues.apache.org/jira/browse/FLINK-31036
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available, test-stability
> Attachments: image-2023-02-16-20-29-52-050.png
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46023=logs=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3=0c010d0c-3dec-5bf1-d408-7b18988b1b2b=10608
> {code}
> "Legacy Source Thread - Source: Custom Source -> Filter (6/12)#69980" 
> #13718026 prio=5 os_prio=0 tid=0x7f05f44f0800 nid=0x128157 waiting on 
> condition [0x7f059feef000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xf0a974e8> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:384)
>   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:356)
>   at 
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:414)
>   at 
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:390)
>   at 
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:328)
>   at 
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:161)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>   at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:91)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:59)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:31)
>   at 
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
>   at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils$$Lambda$1311/1256184070.accept(Unknown
>  Source)
>   at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
>   at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
>   at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
>   - locked <0xd55035c0> (a java.lang.Object)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
>   at 
> org.apache.flink.test.checkpointing.StateCheckpointedITCase$StringGeneratingSourceFunction.run(StateCheckpointedITCase.java:178)
>   - locked <0xd55035c0> (a java.lang.Object)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
>   at 
> 

[GitHub] [flink] 1996fanrui opened a new pull request, #21960: [FLINK-31036][test][checkpoint] FileSystemCheckpointStorage as the default for test

2023-02-17 Thread via GitHub


1996fanrui opened a new pull request, #21960:
URL: https://github.com/apache/flink/pull/21960

   ## What is the purpose of the change
   
   FileSystemCheckpointStorage as the default for test, because the checkpoint 
size of JobManagerCheckpointStorage isn't enough in some tests.
   
   ## Brief change log
   
   FileSystemCheckpointStorage as the default for test.
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? (not documented
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-31124) Add it case for HiveTableSink speculative execution

2023-02-17 Thread Biao Liu (Jira)
Biao Liu created FLINK-31124:


 Summary: Add it case for HiveTableSink speculative execution
 Key: FLINK-31124
 URL: https://issues.apache.org/jira/browse/FLINK-31124
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Biao Liu


The part of HiveTableSink has supported speculative execution in 
https://issues.apache.org/jira/browse/FLINK-30823. We would like to add some 
integration test cases for this feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] snuyanzin commented on a diff in pull request #21934: [FLINK-27998][table] Upgrade Calcite to 1.30.0

2023-02-17 Thread via GitHub


snuyanzin commented on code in PR #21934:
URL: https://github.com/apache/flink/pull/21934#discussion_r1109781704


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala:
##
@@ -174,6 +174,7 @@ object FlinkLogicalRelFactories {
 def createCorrelate(
 left: RelNode,
 right: RelNode,
+hints: util.List[RelHint],

Review Comment:
   This change came from https://issues.apache.org/jira/browse/CALCITE-4967



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31084) DataGen sequence generator requires the definition of start/end values

2023-02-17 Thread xzw0223 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690380#comment-17690380
 ] 

xzw0223 commented on FLINK-31084:
-

[~mapohl]  Hi, I have encountered a problem, and I would like to ask your 
opinion. If kind is set as' sequence ', end is set as Integer.MAX_VALUE and 
parallelism is set as 
1,https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java#L117
 will have a problem here.

This is an existing problem. Do I need to raise an issue again and record this 
problem? Or can I solve it all together?

 

> DataGen sequence generator requires the definition of start/end values
> --
>
> Key: FLINK-31084
> URL: https://issues.apache.org/jira/browse/FLINK-31084
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Matthias Pohl
>Assignee: xzw0223
>Priority: Major
>  Labels: starter
>
> The [DataGen connector's 
> parameters|https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/datagen/#connector-options]
>  are not precisely documented: The start/end parameters are labeled as 
> required for the sequence generator even though they are (see 
> [SequenceGeneratorVisitor:85ff|https://github.com/apache/flink/blob/ef9ce854a3169014001f39e0d5908c703453f2b8/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/SequenceGeneratorVisitor.java#L85]).
> But instead of updating the documentation, we should just come up with 
> reasonable default values. As a user, I would expect the positive integer 
> values to be returned starting from 0 if I don't specify anything here.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31123) Add it case for FileSink speculative execution

2023-02-17 Thread Biao Liu (Jira)
Biao Liu created FLINK-31123:


 Summary: Add it case for FileSink speculative execution
 Key: FLINK-31123
 URL: https://issues.apache.org/jira/browse/FLINK-31123
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Biao Liu


The FileSink has supported speculative execution in 
https://issues.apache.org/jira/browse/FLINK-30823. We would like to add some 
integration test cases for this feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31041) Race condition in DefaultScheduler results in memory leak and busy loop

2023-02-17 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690379#comment-17690379
 ] 

Danny Cranmer commented on FLINK-31041:
---

Hey [~zhuzh] / [~huwh] . Thank-you for the deep dive and explanation. Yes [this 
change|https://github.com/zhuzhurk/flink/commit/a522edeb4e23ab1ce3f5a34eb3269116723e77a5]
 solve the problem for me. Thanks [~huwh] for offering to pick up the fix, that 
is most appreciated. 

> Race condition in DefaultScheduler results in memory leak and busy loop
> ---
>
> Key: FLINK-31041
> URL: https://issues.apache.org/jira/browse/FLINK-31041
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.15.3, 1.16.1
>Reporter: Danny Cranmer
>Assignee: Danny Cranmer
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.15.4, 1.16.2
>
> Attachments: failovers.log, flink-31041-heap-dump.png, 
> test-restart-strategy.log
>
>
> h4. Context
> When a job creates multiple sources that use the {{SourceCoordinator}} 
> (FLIP-27), there is a failure race condition that results in:
>  * Memory leak of {{ExecutionVertexVersion}}
>  * Busy loop constantly trying to restart job
>  * Restart strategy is not respected
> This results in the Job Manager becoming unresponsive.
> h4. !flink-31041-heap-dump.png!
> h4. Reproduction Steps
> This can be reproduced by a job that creates multiple sources that fail in 
> the {{{}SplitEnumerator{}}}. We observed this with multiple {{KafkaSource's}} 
> trying to load a non-existent cert from the file system and throwing FNFE. 
> Thus, here is a simple job to reproduce (BE WARNED: running this locally will 
> lock up your IDE):
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.setRestartStrategy(new 
> RestartStrategies.FailureRateRestartStrategyConfiguration(1, Time.of(10, 
> TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS)));
> KafkaSource source = KafkaSource.builder()
> .setProperty("security.protocol", "SASL_SSL")
> // SSL configurations
> // Configure the path of truststore (CA) provided by the server
> .setProperty("ssl.truststore.location", 
> "/path/to/kafka.client.truststore.jks")
> .setProperty("ssl.truststore.password", "test1234")
> // Configure the path of keystore (private key) if client 
> authentication is required
> .setProperty("ssl.keystore.location", 
> "/path/to/kafka.client.keystore.jks")
> .setProperty("ssl.keystore.password", "test1234")
> // SASL configurations
> // Set SASL mechanism as SCRAM-SHA-256
> .setProperty("sasl.mechanism", "SCRAM-SHA-256")
> // Set JAAS configurations
> .setProperty("sasl.jaas.config", 
> "org.apache.kafka.common.security.scram.ScramLoginModule required 
> username=\"username\" password=\"password\";")
> .setBootstrapServers("http://localhost:3456;)
> .setTopics("input-topic")
> .setGroupId("my-group")
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .build();
> List> sources = IntStream.range(0, 32)
> .mapToObj(i -> env
> .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka 
> Source " + i).uid("source-" + i)
> .keyBy(s -> s.charAt(0))
> .map(s -> s))
> .collect(Collectors.toList());
> env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka 
> Source").uid("source")
> .keyBy(s -> s.charAt(0))
> .union(sources.toArray(new SingleOutputStreamOperator[] {}))
> .print();
> env.execute("test job"); {code}
> h4. Root Cause
> We can see that the {{OperatorCoordinatorHolder}} already has a [debounce 
> mechanism|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L609],
>  however the {{DefaultScheduler}} does not. We need a debounce mechanism in 
> the {{DefaultScheduler}} since it handles many 
> {{{}OperatorCoordinatorHolder{}}}.
> h4. Fix
> I have managed to fix this, I will open a PR, but would need feedback from 
> people who understand this code better than me!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31041) Race condition in DefaultScheduler results in memory leak and busy loop

2023-02-17 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690379#comment-17690379
 ] 

Danny Cranmer edited comment on FLINK-31041 at 2/17/23 12:58 PM:
-

Hey [~zhuzh] / [~huwh] . Thank-you for the deep dive and explanation. Yes [this 
change|https://github.com/zhuzhurk/flink/commit/a522edeb4e23ab1ce3f5a34eb3269116723e77a5]
 solved the problem for me. Thanks [~huwh] for offering to pick up the fix, 
that is most appreciated. 


was (Author: dannycranmer):
Hey [~zhuzh] / [~huwh] . Thank-you for the deep dive and explanation. Yes [this 
change|https://github.com/zhuzhurk/flink/commit/a522edeb4e23ab1ce3f5a34eb3269116723e77a5]
 solve the problem for me. Thanks [~huwh] for offering to pick up the fix, that 
is most appreciated. 

> Race condition in DefaultScheduler results in memory leak and busy loop
> ---
>
> Key: FLINK-31041
> URL: https://issues.apache.org/jira/browse/FLINK-31041
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.15.3, 1.16.1
>Reporter: Danny Cranmer
>Assignee: Danny Cranmer
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.15.4, 1.16.2
>
> Attachments: failovers.log, flink-31041-heap-dump.png, 
> test-restart-strategy.log
>
>
> h4. Context
> When a job creates multiple sources that use the {{SourceCoordinator}} 
> (FLIP-27), there is a failure race condition that results in:
>  * Memory leak of {{ExecutionVertexVersion}}
>  * Busy loop constantly trying to restart job
>  * Restart strategy is not respected
> This results in the Job Manager becoming unresponsive.
> h4. !flink-31041-heap-dump.png!
> h4. Reproduction Steps
> This can be reproduced by a job that creates multiple sources that fail in 
> the {{{}SplitEnumerator{}}}. We observed this with multiple {{KafkaSource's}} 
> trying to load a non-existent cert from the file system and throwing FNFE. 
> Thus, here is a simple job to reproduce (BE WARNED: running this locally will 
> lock up your IDE):
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.setRestartStrategy(new 
> RestartStrategies.FailureRateRestartStrategyConfiguration(1, Time.of(10, 
> TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS)));
> KafkaSource source = KafkaSource.builder()
> .setProperty("security.protocol", "SASL_SSL")
> // SSL configurations
> // Configure the path of truststore (CA) provided by the server
> .setProperty("ssl.truststore.location", 
> "/path/to/kafka.client.truststore.jks")
> .setProperty("ssl.truststore.password", "test1234")
> // Configure the path of keystore (private key) if client 
> authentication is required
> .setProperty("ssl.keystore.location", 
> "/path/to/kafka.client.keystore.jks")
> .setProperty("ssl.keystore.password", "test1234")
> // SASL configurations
> // Set SASL mechanism as SCRAM-SHA-256
> .setProperty("sasl.mechanism", "SCRAM-SHA-256")
> // Set JAAS configurations
> .setProperty("sasl.jaas.config", 
> "org.apache.kafka.common.security.scram.ScramLoginModule required 
> username=\"username\" password=\"password\";")
> .setBootstrapServers("http://localhost:3456;)
> .setTopics("input-topic")
> .setGroupId("my-group")
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .build();
> List> sources = IntStream.range(0, 32)
> .mapToObj(i -> env
> .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka 
> Source " + i).uid("source-" + i)
> .keyBy(s -> s.charAt(0))
> .map(s -> s))
> .collect(Collectors.toList());
> env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka 
> Source").uid("source")
> .keyBy(s -> s.charAt(0))
> .union(sources.toArray(new SingleOutputStreamOperator[] {}))
> .print();
> env.execute("test job"); {code}
> h4. Root Cause
> We can see that the {{OperatorCoordinatorHolder}} already has a [debounce 
> mechanism|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L609],
>  however the {{DefaultScheduler}} does not. We need a debounce mechanism in 
> the {{DefaultScheduler}} since it handles many 
> {{{}OperatorCoordinatorHolder{}}}.
> h4. Fix
> I have managed to fix this, I will open a PR, but would need feedback from 
> people who understand this code better than me!
>  
>  



--
This message was sent by Atlassian Jira

[GitHub] [flink] snuyanzin commented on a diff in pull request #21934: [FLINK-27998][table] Upgrade Calcite to 1.30.0

2023-02-17 Thread via GitHub


snuyanzin commented on code in PR #21934:
URL: https://github.com/apache/flink/pull/21934#discussion_r1109761449


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlRowConstructor.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.sql;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.SqlUtil;
+import org.apache.calcite.sql.fun.SqlRowOperator;
+import org.apache.calcite.util.Pair;
+
+import java.util.AbstractList;
+import java.util.Map;
+
+/** {@link SqlOperator} for ROW, which behaves same way as in 
Calcite 1.29.0. */
+public class SqlRowConstructor extends SqlRowOperator {
+public SqlRowConstructor() {
+super("ROW");
+}
+
+@Override
+public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
+// The type of a ROW(e1,e2) expression is a record with the types
+// {e1type,e2type}.  According to the standard, field names are
+// implementation-defined.
+return opBinding
+.getTypeFactory()
+.createStructType(
+new AbstractList>() {
+@Override
+public Map.Entry get(int 
index) {
+return Pair.of(
+SqlUtil.deriveAliasFromOrdinal(index),
+opBinding.getOperandType(index));
+}
+
+@Override
+public int size() {
+return opBinding.getOperandCount();
+}
+});
+}
+}

Review Comment:
   In https://issues.apache.org/jira/browse/CALCITE-3627 there has been changed 
a NullPolicy for ROW. 
   It leads to failure of 
`org.apache.flink.table.planner.functions.RowFunctionITCase` since table api 
and sql are becoming inconsistent in context of NullPolicy for ROW type 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31121) KafkaSink should be able to catch and ignore exp via config on/off

2023-02-17 Thread Jing Ge (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690378#comment-17690378
 ] 

Jing Ge commented on FLINK-31121:
-

[~zjureel] it's your, thanks!

> KafkaSink should be able to catch and ignore exp via config on/off
> --
>
> Key: FLINK-31121
> URL: https://issues.apache.org/jira/browse/FLINK-31121
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.17.0
>Reporter: Jing Ge
>Assignee: Shammon
>Priority: Major
> Fix For: 1.18.0
>
>
> It is a common requirement for users to catch and ignore exp while sinking 
> the event to to downstream system like Kafka. It will be convenient for some 
> use cases, if Flink Sink can provide built-in functionality and config to 
> turn it on and off, especially for cases that data consistency is not very 
> important or the stream contains dirty events. [1][2]
> First of all, consider doing it for KafkaSink. Long term, a common solution 
> that can be used by any connector would be even better.
>  
> [1][https://lists.apache.org/thread/wy31s8wb9qnskq29wn03kp608z4vrwv8]
> [2]https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31121) KafkaSink should be able to catch and ignore exp via config on/off

2023-02-17 Thread Jing Ge (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jing Ge reassigned FLINK-31121:
---

Assignee: Shammon

> KafkaSink should be able to catch and ignore exp via config on/off
> --
>
> Key: FLINK-31121
> URL: https://issues.apache.org/jira/browse/FLINK-31121
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.17.0
>Reporter: Jing Ge
>Assignee: Shammon
>Priority: Major
> Fix For: 1.18.0
>
>
> It is a common requirement for users to catch and ignore exp while sinking 
> the event to to downstream system like Kafka. It will be convenient for some 
> use cases, if Flink Sink can provide built-in functionality and config to 
> turn it on and off, especially for cases that data consistency is not very 
> important or the stream contains dirty events. [1][2]
> First of all, consider doing it for KafkaSink. Long term, a common solution 
> that can be used by any connector would be even better.
>  
> [1][https://lists.apache.org/thread/wy31s8wb9qnskq29wn03kp608z4vrwv8]
> [2]https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31119) JobRecoveryITCase.testTaskFailureRecovery failed due to the job not finishing successfully

2023-02-17 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690355#comment-17690355
 ] 

Matthias Pohl edited comment on FLINK-31119 at 2/17/23 12:30 PM:
-

Running the test locally 8500 times didn't reveal anything.

[~chesnay] I'm curious about your opinion. The tests ran at around the same 
time (2:24:14 and 1:07:36) on two different Alibaba machines. I don't like to 
blame it on a hick-up. Browsing through git logs doesn't reveal anything either.

One other thing I could think of is that an external process send packages to 
the port that is used for the communication.


was (Author: mapohl):
Running the test locally 8500 times didn't reveal anything.

[~chesnay] I'm curious about your opinion. The tests ran at around the same 
time (2:24:14 and 1:07:36) on two different Alibaba machines. I don't like to 
blame it on a hick-up. Browsing through git logs doesn't reveal anything either.

> JobRecoveryITCase.testTaskFailureRecovery failed due to the job not finishing 
> successfully
> --
>
> Key: FLINK-31119
> URL: https://issues.apache.org/jira/browse/FLINK-31119
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Blocker
>  Labels: test-stability
> Attachments: FLINK-31119.20230217.1.log, FLINK-31119.20230217.4.log
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46247=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=7c1d86e3-35bd-5fd5-3b7c-30c126a78702=8523
> {code}
> Feb 17 02:24:35 [ERROR] Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 24.074 s <<< FAILURE! - in 
> org.apache.flink.runtime.jobmaster.JobRecoveryITCase
> Feb 17 02:24:35 [ERROR] 
> org.apache.flink.runtime.jobmaster.JobRecoveryITCase.testTaskFailureRecovery  
> Time elapsed: 20.981 s  <<< FAILURE!
> Feb 17 02:24:35 java.lang.AssertionError: 
> Feb 17 02:24:35 
> Feb 17 02:24:35 Expected: is 
> Feb 17 02:24:35  but: was 
> Feb 17 02:24:35   at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> Feb 17 02:24:35   at org.junit.Assert.assertThat(Assert.java:964)
> Feb 17 02:24:35   at org.junit.Assert.assertThat(Assert.java:930)
> Feb 17 02:24:35   at 
> org.apache.flink.runtime.jobmaster.JobRecoveryITCase.runTaskFailureRecoveryTest(JobRecoveryITCase.java:79)
> Feb 17 02:24:35   at 
> org.apache.flink.runtime.jobmaster.JobRecoveryITCase.testTaskFailureRecovery(JobRecoveryITCase.java:63)
> Feb 17 02:24:35   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> [...]
> {code}
> The actual cause is that unexpected data was received:
> {code}
> 02:24:35,301 [Receiver (5/5)#1] WARN  
> org.apache.flink.runtime.taskmanager.Task[] - Receiver 
> (5/5)#1 
> (d88e16a5e3c6f2c08cf3924d93ea18e2_28065fbb1d26fe99e018d3b846860dd3_4_1) 
> switched from RUNNING to FAILED with failure cause:
> java.lang.Exception: Wrong data received.
> at 
> org.apache.flink.runtime.jobmaster.TestingAbstractInvokables$Receiver.invoke(TestingAbstractInvokables.java:83)
>  ~[test-classes/:?]
> at 
> org.apache.flink.runtime.jobmaster.JobRecoveryITCase$FailingOnceReceiver.invoke(JobRecoveryITCase.java:126)
>  ~[test-classes/:?]
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>  ~[classes/:?]
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) 
> [classes/:?]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
> [classes/:?]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
> [classes/:?]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31120) ConcurrentModificationException occurred in StringFunctionsITCase.test

2023-02-17 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690372#comment-17690372
 ] 

Weijie Guo commented on FLINK-31120:


Sorry, I accidentally touched it.

> ConcurrentModificationException occurred in StringFunctionsITCase.test
> --
>
> Key: FLINK-31120
> URL: https://issues.apache.org/jira/browse/FLINK-31120
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Blocker
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334
> {code}
> Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 10.725 s <<< FAILURE! - in 
> org.apache.flink.table.planner.functions.StringFunctionsITCase
> Feb 17 04:51:25 [ERROR] 
> org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4]
>  Time elapsed: 4.367 s <<< ERROR!
> Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute 
> sql
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974)
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422)
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113)
> Feb 17 04:51:25 at 
> org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93)
> Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31120) ConcurrentModificationException occurred in StringFunctionsITCase.test

2023-02-17 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-31120:
---
Description: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334

{code}
Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 10.725 s <<< FAILURE! - in 
org.apache.flink.table.planner.functions.StringFunctionsITCase
Feb 17 04:51:25 [ERROR] 
org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4]
 Time elapsed: 4.367 s <<< ERROR!
Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute sql
Feb 17 04:51:25 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974)
Feb 17 04:51:25 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422)
Feb 17 04:51:25 at 
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476)
Feb 17 04:51:25 at 
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354)
Feb 17 04:51:25 at 
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320)
Feb 17 04:51:25 at 
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113)
Feb 17 04:51:25 at 
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93)
Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[...]
{code}

  was:
# 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334

{code}
Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 10.725 s <<< FAILURE! - in 
org.apache.flink.table.planner.functions.StringFunctionsITCase
Feb 17 04:51:25 [ERROR] 
org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4]
  Time elapsed: 4.367 s  <<< ERROR!
Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute sql
Feb 17 04:51:25 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974)
Feb 17 04:51:25 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422)
Feb 17 04:51:25 at 
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476)
Feb 17 04:51:25 at 
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354)
Feb 17 04:51:25 at 
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320)
Feb 17 04:51:25 at 
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113)
Feb 17 04:51:25 at 
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93)
Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
[...]
{code}


> ConcurrentModificationException occurred in StringFunctionsITCase.test
> --
>
> Key: FLINK-31120
> URL: https://issues.apache.org/jira/browse/FLINK-31120
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Blocker
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334
> {code}
> Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 10.725 s <<< FAILURE! - in 
> org.apache.flink.table.planner.functions.StringFunctionsITCase
> Feb 17 04:51:25 [ERROR] 
> org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4]
>  Time elapsed: 4.367 s <<< ERROR!
> Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute 
> sql
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974)
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422)
> Feb 17 04:51:25 at 
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476)
> Feb 17 04:51:25 at 
> 

[jira] [Updated] (FLINK-31120) ConcurrentModificationException occurred in StringFunctionsITCase.test

2023-02-17 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-31120:
---
Description: 
# 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334

{code}
Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 10.725 s <<< FAILURE! - in 
org.apache.flink.table.planner.functions.StringFunctionsITCase
Feb 17 04:51:25 [ERROR] 
org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4]
  Time elapsed: 4.367 s  <<< ERROR!
Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute sql
Feb 17 04:51:25 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974)
Feb 17 04:51:25 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422)
Feb 17 04:51:25 at 
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476)
Feb 17 04:51:25 at 
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354)
Feb 17 04:51:25 at 
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320)
Feb 17 04:51:25 at 
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113)
Feb 17 04:51:25 at 
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93)
Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
[...]
{code}

  was:
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334

{code}
Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 10.725 s <<< FAILURE! - in 
org.apache.flink.table.planner.functions.StringFunctionsITCase
Feb 17 04:51:25 [ERROR] 
org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4]
  Time elapsed: 4.367 s  <<< ERROR!
Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute sql
Feb 17 04:51:25 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974)
Feb 17 04:51:25 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422)
Feb 17 04:51:25 at 
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476)
Feb 17 04:51:25 at 
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:354)
Feb 17 04:51:25 at 
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:320)
Feb 17 04:51:25 at 
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:113)
Feb 17 04:51:25 at 
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:93)
Feb 17 04:51:25 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
[...]
{code}


> ConcurrentModificationException occurred in StringFunctionsITCase.test
> --
>
> Key: FLINK-31120
> URL: https://issues.apache.org/jira/browse/FLINK-31120
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Blocker
>  Labels: test-stability
>
> # 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46255=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12334
> {code}
> Feb 17 04:51:25 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 10.725 s <<< FAILURE! - in 
> org.apache.flink.table.planner.functions.StringFunctionsITCase
> Feb 17 04:51:25 [ERROR] 
> org.apache.flink.table.planner.functions.StringFunctionsITCase.test(TestCase)[4]
>   Time elapsed: 4.367 s  <<< ERROR!
> Feb 17 04:51:25 org.apache.flink.table.api.TableException: Failed to execute 
> sql
> Feb 17 04:51:25   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974)
> Feb 17 04:51:25   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422)
> Feb 17 04:51:25   at 
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476)
> Feb 17 04:51:25   at 
> 

[jira] [Comment Edited] (FLINK-31119) JobRecoveryITCase.testTaskFailureRecovery failed due to the job not finishing successfully

2023-02-17 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690314#comment-17690314
 ] 

Matthias Pohl edited comment on FLINK-31119 at 2/17/23 12:24 PM:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46250=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=7c1d86e3-35bd-5fd5-3b7c-30c126a78702=8521

{code}
01:07:57,099 [Receiver (1/6)#1] WARN  
org.apache.flink.runtime.taskmanager.Task[] - Receiver 
(1/6)#1 (e701d0caf3247ea7554acfb5dd8df541_cb0a5d4bcd60528ae7c4e8c99900a321_0_1) 
switched from RUNNING to FAILED with failure cause:
java.lang.NullPointerException: null
at 
org.apache.flink.runtime.jobmaster.TestingAbstractInvokables$Receiver.invoke(TestingAbstractInvokables.java:82)
 ~[test-classes/:?]
at 
org.apache.flink.runtime.jobmaster.JobRecoveryITCase$FailingOnceReceiver.invoke(JobRecoveryITCase.java:126)
 ~[test-classes/:?]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
 ~[classes/:?]
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) 
[classes/:?]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
[classes/:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
[classes/:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
{code}

This one fails with a {{NullPointerException}} in the same method 
[TestingAbstractInvokables.Receiver#invoke:71ff|https://github.com/apache/flink/blob/026675a5cb8a3704c51802fb549d6b0bc4759835/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java#L71].
 Essentially, the data that has been received seems to be corrupted

Update:
There was a Wrong data exception also thrown in this case. It appeared while 
cancelling the tasks which was caused by the expected 
{{FlinkRuntimeException}}. It didn't have an impact because the job was already 
transitioning into CANCELLING, I guess.


was (Author: mapohl):
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46250=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=7c1d86e3-35bd-5fd5-3b7c-30c126a78702=8521

{code}
01:07:57,099 [Receiver (1/6)#1] WARN  
org.apache.flink.runtime.taskmanager.Task[] - Receiver 
(1/6)#1 (e701d0caf3247ea7554acfb5dd8df541_cb0a5d4bcd60528ae7c4e8c99900a321_0_1) 
switched from RUNNING to FAILED with failure cause:
java.lang.NullPointerException: null
at 
org.apache.flink.runtime.jobmaster.TestingAbstractInvokables$Receiver.invoke(TestingAbstractInvokables.java:82)
 ~[test-classes/:?]
at 
org.apache.flink.runtime.jobmaster.JobRecoveryITCase$FailingOnceReceiver.invoke(JobRecoveryITCase.java:126)
 ~[test-classes/:?]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
 ~[classes/:?]
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) 
[classes/:?]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
[classes/:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
[classes/:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
{code}

This one fails with a {{NullPointerException}} in the same method 
[TestingAbstractInvokables.Receiver#invoke:71ff|https://github.com/apache/flink/blob/026675a5cb8a3704c51802fb549d6b0bc4759835/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java#L71].
 Essentially, the data that has been received seems to be corrupted

> JobRecoveryITCase.testTaskFailureRecovery failed due to the job not finishing 
> successfully
> --
>
> Key: FLINK-31119
> URL: https://issues.apache.org/jira/browse/FLINK-31119
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Blocker
>  Labels: test-stability
> Attachments: FLINK-31119.20230217.1.log, FLINK-31119.20230217.4.log
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46247=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=7c1d86e3-35bd-5fd5-3b7c-30c126a78702=8523
> {code}
> Feb 17 02:24:35 [ERROR] Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 24.074 s <<< FAILURE! - in 
> org.apache.flink.runtime.jobmaster.JobRecoveryITCase
> Feb 17 02:24:35 [ERROR] 
> org.apache.flink.runtime.jobmaster.JobRecoveryITCase.testTaskFailureRecovery  
> Time elapsed: 20.981 s  <<< FAILURE!
> Feb 17 02:24:35 java.lang.AssertionError: 
> Feb 17 02:24:35 
> Feb 17 02:24:35 Expected: is 
> Feb 17 02:24:35  but: was 
> Feb 17 02:24:35   at 
> 

[jira] [Created] (FLINK-31122) Expose MetricGroup to Committer of new Unified Sink v2

2023-02-17 Thread Theo Diefenthal (Jira)
Theo Diefenthal created FLINK-31122:
---

 Summary: Expose MetricGroup to Committer of new Unified Sink v2
 Key: FLINK-31122
 URL: https://issues.apache.org/jira/browse/FLINK-31122
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Reporter: Theo Diefenthal


When writing my own committer, I want to include metrics on the operations 
performed as for any other custom function in the pipeline.

The goal of this story is to provide MetricGroup to Committer in some way so 
that in sink2.Committer, within the commit method, I somehow have access to the 
MetricGroup.

The usecase for me with regards to this story: I want to notify Impala about 
newly added partitions written via FileSink and track via metrics the number of 
performed Impala queries and their duration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] snuyanzin commented on a diff in pull request #21958: [FLINK-31118][table] Add ARRAY_UNION function.

2023-02-17 Thread via GitHub


snuyanzin commented on code in PR #21958:
URL: https://github.com/apache/flink/pull/21958#discussion_r1109698942


##
flink-python/pyflink/table/expression.py:
##
@@ -1487,6 +1487,13 @@ def array_distinct(self) -> 'Expression':
 """
 return _binary_op("arrayDistinct")(self)
 
+def array_union(self, array) -> 'Expression':
+"""
+Returns an array of the elements in the union of array1 and array2, 
without duplicates.
+If both of the array are null, the function will return null.

Review Comment:
   same here



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java:
##
@@ -1359,6 +1360,16 @@ public OutType arrayDistinct() {
 return toApiSpecificExpression(unresolvedCall(ARRAY_DISTINCT, 
toExpr()));
 }
 
+/**
+ * Returns an array of the elements in the union of array1 and array2, 
without duplicates.

Review Comment:
   What about ordering?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on pull request #21958: [FLINK-31118][table] Add ARRAY_UNION function.

2023-02-17 Thread via GitHub


snuyanzin commented on PR #21958:
URL: https://github.com/apache/flink/pull/21958#issuecomment-1434551541

   Thanks for your contribution.
   
   To be honest i didn't get the logic.
   What i did:
   1. Built it from the PR's branch
   2. Started standalone Flink
   3. Via sqlClient submitted several queries
   
   ```sql
   SELECT array_union(array[1], array[2]);
   -- result array[1, 2]
   -- this is OK
   ```
   
   ```sql
   SELECT array_union(array[1], array[2, 3]);
   -- result array[1, 2, 3]
   --- this is OK
   ```
   
   ```sql
   SELECT array_union(array[1], array[2, 3, null]);
   -- result array[1, 2, 3, 0]
   --- this is NOT OK
   ```
   
   ```sql
   SELECT array_union(array[1], array['this is a string']);
   -- result [1, 16]
   -- this is NOT OK
   ```
   
   ```sql
   SELECT array_union(array[1], array['this is a string']);
   -- result [1, 16]
   -- this is NOT OK
   ```
   
   ```sql
   SELECT array_union(array[1], array[array[1, 2, 3]]);
   -- result [1, 24]
   -- this is NOT OK
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] SteNicholas closed pull request #209: [FLINK-27103] Don't store redundant primary key fields

2023-02-17 Thread via GitHub


SteNicholas closed pull request #209: [FLINK-27103] Don't store redundant 
primary key fields
URL: https://github.com/apache/flink-table-store/pull/209


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #21958: [FLINK-31118][table] Add ARRAY_UNION function.

2023-02-17 Thread via GitHub


snuyanzin commented on code in PR #21958:
URL: https://github.com/apache/flink/pull/21958#discussion_r1109698642


##
docs/data/sql_functions.yml:
##
@@ -617,6 +617,9 @@ collection:
   - sql: ARRAY_DISTINCT(haystack)
 table: haystack.arrayDistinct()
 description: Returns an array with unique elements. If the array itself is 
null, the function will return null. Keeps ordering of elements.
+  - sql: ARRAY_UNION(array1, array2)
+table: haystack.arrayUnion(array)
+description: Returns an array of the elements in the union of array1 and 
array2, without duplicates. If both of the array are null, the function will 
return null.

Review Comment:
   From the description it is not clear what happens if only one array is `null`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #21934: [FLINK-27998][table] Upgrade Calcite to 1.30.0

2023-02-17 Thread via GitHub


snuyanzin commented on code in PR #21934:
URL: https://github.com/apache/flink/pull/21934#discussion_r1109660825


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtilTest.scala:
##
@@ -206,7 +206,7 @@ class FlinkRexUtilTest {
 assertEquals(RexUtil.toCnf(rexBuilder, predicate).toString, 
newPredicate3.toString)
 
 val newPredicate4 = FlinkRexUtil.toCnf(rexBuilder, Int.MaxValue, predicate)
-assertFalse(RexUtil.eq(predicate, newPredicate4))

Review Comment:
   Deprecated in https://issues.apache.org/jira/browse/CALCITE-2632
   in favor of `e1.equals(e2)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #21934: [FLINK-27998][table] Upgrade Calcite to 1.30.0

2023-02-17 Thread via GitHub


snuyanzin commented on code in PR #21934:
URL: https://github.com/apache/flink/pull/21934#discussion_r1109660535


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkSubQueryRemoveRule.scala:
##
@@ -304,13 +304,13 @@ class FlinkSubQueryRemoveRule(
   replacement: RexNode): RexNode = {
 condition.accept(new RexShuttle() {
   override def visitSubQuery(subQuery: RexSubQuery): RexNode = {
-if (RexUtil.eq(subQuery, oldSubQueryCall)) replacement else subQuery

Review Comment:
   Deprecated in https://issues.apache.org/jira/browse/CALCITE-2632
   in favor of `e1.equals(e2)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #21934: [FLINK-27998][table] Upgrade Calcite to 1.30.0

2023-02-17 Thread via GitHub


snuyanzin commented on code in PR #21934:
URL: https://github.com/apache/flink/pull/21934#discussion_r1109652438


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/FlinkRelDistribution.scala:
##
@@ -119,7 +119,7 @@ class FlinkRelDistribution private (
   try {
 val i = mapping.getTargetOpt(fieldCollation.getFieldIndex)
 if (i >= 0) {
-  newFieldCollations.add(fieldCollation.copy(i))

Review Comment:
   deprecated at 
https://github.com/apache/calcite/commit/4fdf241dfb8cc555ab28c6cba7a152e4cc4ec169#diff-c324ef65e6a4e0a729b606cf5f4998aa6769bed7ce52f94995b85dcee93b9d96R219-R227
   in favor of `withXXX` methods



##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/TraitUtil.scala:
##
@@ -49,7 +49,7 @@ object TraitUtil {
 fieldCollation =>
   try {
 val i = mapping.getTargetOpt(fieldCollation.getFieldIndex)
-if (i >= 0) newFieldCollations.add(fieldCollation.copy(i))

Review Comment:
   deprecated at 
https://github.com/apache/calcite/commit/4fdf241dfb8cc555ab28c6cba7a152e4cc4ec169#diff-c324ef65e6a4e0a729b606cf5f4998aa6769bed7ce52f94995b85dcee93b9d96R219-R227
   in favor of `withXXX` methods



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #21934: [FLINK-27998][table] Upgrade Calcite to 1.30.0

2023-02-17 Thread via GitHub


snuyanzin commented on code in PR #21934:
URL: https://github.com/apache/flink/pull/21934#discussion_r1109652438


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/FlinkRelDistribution.scala:
##
@@ -119,7 +119,7 @@ class FlinkRelDistribution private (
   try {
 val i = mapping.getTargetOpt(fieldCollation.getFieldIndex)
 if (i >= 0) {
-  newFieldCollations.add(fieldCollation.copy(i))

Review Comment:
   deprecated at 
https://github.com/apache/calcite/commit/513f4d2510a47e8d0b55f35dac56785c16fa7d5f
   in favor of `withXXX` methods



##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/TraitUtil.scala:
##
@@ -49,7 +49,7 @@ object TraitUtil {
 fieldCollation =>
   try {
 val i = mapping.getTargetOpt(fieldCollation.getFieldIndex)
-if (i >= 0) newFieldCollations.add(fieldCollation.copy(i))

Review Comment:
   deprecated at 
https://github.com/apache/calcite/commit/513f4d2510a47e8d0b55f35dac56785c16fa7d5f
   in favor of `withXXX` methods



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #21934: [FLINK-27998][table] Upgrade Calcite to 1.30.0

2023-02-17 Thread via GitHub


snuyanzin commented on code in PR #21934:
URL: https://github.com/apache/flink/pull/21934#discussion_r1109651676


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRule.scala:
##
@@ -328,7 +328,7 @@ class DecomposeGroupingSetsRule
 val res: Long = call.getArgList.foldLeft(0L)(
   (res, arg) => (res << 1L) + (if (groups.contains(arg)) 0L else 1L))
 builder.makeLiteral(res, call.getType, false)
-  case _ => builder.constantNull()

Review Comment:
   Deprecated in 
https://github.com/apache/calcite/commit/b432756e2be9ad0557a56254550eb4438dd0efcf
   in favor of `#makeNullLiteral(RelDataType)`
   because 
   
   > because it produces untyped NULL literals that make planning difficult



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >