Re: [PR] [BP-1.19][FLINK-35786] Fix NPE BlobServer / shutdownHook [flink]

2024-07-08 Thread via GitHub


flinkbot commented on PR #25057:
URL: https://github.com/apache/flink/pull/25057#issuecomment-2216726410

   
   ## CI report:
   
   * 98f6bcbe0e3adfc2d849fb67e6a471d994442aae UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [BP-1.19][FLINK-35786] Fix NPE BlobServer / shutdownHook [flink]

2024-07-08 Thread via GitHub


rkhachatryan opened a new pull request, #25057:
URL: https://github.com/apache/flink/pull/25057

   backport of #25050 to 1.19


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34111][table] Add support for json_quote, json_unquote, address PR feedback #24156 [flink]

2024-07-08 Thread via GitHub


fhueske commented on code in PR #24967:
URL: https://github.com/apache/flink/pull/24967#discussion_r1669010333


##
docs/data/sql_functions.yml:
##
@@ -377,6 +377,12 @@ string:
   - sql: SUBSTR(string, integer1[, integer2])
 table: STRING.substr(INTEGER1[, INTEGER2])
 description: Returns a substring of string starting from position integer1 
with length integer2 (to the end by default).
+  - sql: JSON_QUOTE(string)
+table: STRING.JsonQuote()
+description: Quotes a string as a JSON value by wrapping it with double 
quote characters, escaping interior quote and special characters ('"', '\', 
'/', 'b', 'f', 'n', 'r', 't'), and returning the result as a string. If the 
argument is NULL, the function returns NULL.
+  - sql: JSON_UNQUOTE(string)
+table: STRING.JsonUnquote()
+description: Unquotes JSON value, escapes special characters ('"', '\', 
'/', 'b', 'f', 'n', 'r', 't', 'u' hex hex hex hex), and returns the result as a 
string. If the argument is NULL, returns NULL. If the value starts and ends 
with double quotes but is not a valid JSON string literal, the value is passed 
through unmodified.

Review Comment:
   ```suggestion
   description: Unquotes JSON value, unescapes escaped special characters 
('"', '\', '/', 'b', 'f', 'n', 'r', 't', 'u' hex hex hex hex), and returns the 
result as a string. If the argument is NULL, returns NULL. If the value starts 
and ends with double quotes but is not a valid JSON string literal, the value 
is passed through unmodified.
   ```
   
   we are unescaping (previously) escaped characters, right?



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java:
##
@@ -794,6 +795,196 @@ private static List jsonObjectSpec() {
 STRING().notNull()));
 }
 
+private static List jsonQuoteSpec() {
+
+return Arrays.asList(
+TestSetSpec.forFunction(BuiltInFunctionDefinitions.JSON_QUOTE)
+.onFieldsWithData(0)
+.testResult(
+nullOf(STRING()).jsonQuote(),
+"JSON_QUOTE(CAST(NULL AS STRING))",
+null,
+STRING().nullable()),
+TestSetSpec.forFunction(BuiltInFunctionDefinitions.JSON_QUOTE)
+.onFieldsWithData(
+"V",
+"\"null\"",
+"[1, 2, 3]",
+"This is a \t test \n with special characters: 
\" \\ \b \f \r \u0041",
+"\"kv_pair_test\": \"\\b\\f\\r\"",
+"\ttab and fwd slash /",
+"this will not be quoted \\u006z",
+"this will be quoted ≠",
+null)
+.andDataTypes(
+STRING().notNull(),
+STRING().notNull(),
+STRING().notNull(),
+STRING().notNull(),
+STRING().notNull(),
+STRING().notNull(),
+STRING().notNull(),
+STRING().notNull(),
+STRING().nullable())
+.testResult(
+$("f0").jsonQuote(), "JSON_QUOTE(f0)", 
"\"V\"", STRING().notNull())
+.testResult(
+$("f1").jsonQuote(),
+"JSON_QUOTE(f1)",
+"\"\\\"null\\\"\"",
+STRING().notNull())
+.testResult(
+$("f2").jsonQuote(),
+"JSON_QUOTE(f2)",
+"\"[1, 2, 3]\"",
+STRING().notNull())
+.testResult(
+$("f3").jsonQuote(),
+"JSON_QUOTE(f3)",
+"\"This is a \\t test \\n with special 
characters: \\\"  \\b \\f \\r A\"",
+STRING().notNull())
+.testResult(
+$("f4").jsonQuote(),
+"JSON_QUOTE(f4)",
+"\"\\\"kv_pair_test\\\": 
\\\"bfr\\\"\"",
+STRING().notNull())
+.testResult(
+$("f5").jsonQuote(),
+"JSON_QUOTE(f5)",
+"\"\\ttab and fwd slash \\/\"",
+STRING().notNull())
+  

Re: [PR] [FLINK-35786] Fix NPE BlobServer / shutdownHook [flink]

2024-07-08 Thread via GitHub


rkhachatryan merged PR #25050:
URL: https://github.com/apache/flink/pull/25050


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34657] extract lineage info for stream API [flink]

2024-07-08 Thread via GitHub


flinkbot commented on PR #25056:
URL: https://github.com/apache/flink/pull/25056#issuecomment-2216704983

   
   ## CI report:
   
   * 75d6e981e0387369db13c4a3646b5af764c84e9c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-34657] add lineage info for stream API [flink]

2024-07-08 Thread via GitHub


HuangZhenQiu opened a new pull request, #25056:
URL: https://github.com/apache/flink/pull/25056

   ## What is the purpose of the change
   
   Support Lineage info extraction for Stream API use cases.
   
   ## Brief change log
   
 - Add LineageVertexInfo into transformation when user use DataStream API 
to add source and sink
 - Add Test cases in LineageGraphUtilsTest for source and sink.
   
   ## Verifying this change
   
 - Add test cases in LineageGraphUtils
 - Add test cases in JobStatusChangedListenerITCase
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35749) Kafka sink component will lose data when kafka cluster is unavailable for a while

2024-07-08 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-35749:
---
Affects Version/s: kafka-3.2.0

> Kafka sink component will lose data when kafka cluster is unavailable for a 
> while
> -
>
> Key: FLINK-35749
> URL: https://issues.apache.org/jira/browse/FLINK-35749
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.16.2, 1.17.1, kafka-3.2.0
>Reporter: Jimmy Zhao
>Assignee: Jimmy Zhao
>Priority: Blocker
>  Labels: pull-request-available
>
> As the title described, here is the procedure to reproduce the problem:
> 1. develop a simple flink stream job to consume from one kafka topic and sink 
> to anthoer kafka sever and topic
> 2. make amount of kafka message and produce to the source kafka topic, record 
> the message number
> 3. start the flink stream job, and config to cosume from earliest source 
> topic offset
> 4. during the job cosuming the source topic, restart the kafka cluster(we use 
> aws MSK)
> 5. the flink job will not throw any Exception like nothing happened, but only 
> print error log like : [kafka-producer-network-thread | producer-2] INFO  
> org.apache.kafka.clients.NetworkClient  - [Producer clientId=producer-2] Node 
> 2 disconnected.
> 6. wait for the kafka cluster finished restarting and all the source kafka 
> message consumed
> 7. count the target kafka topic message number, compare to the source, there 
> is a high probability of data loss(more than 50%)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35749) Kafka sink component will lose data when kafka cluster is unavailable for a while

2024-07-08 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864016#comment-17864016
 ] 

Weijie Guo commented on FLINK-35749:


I don't think this should be a blocker of 1.20, because flink-connector-kafka 
is now an external separate repository. I remove 1.20 from affect version/s, 
and could some one help updating this to kafka-xxx version?

> Kafka sink component will lose data when kafka cluster is unavailable for a 
> while
> -
>
> Key: FLINK-35749
> URL: https://issues.apache.org/jira/browse/FLINK-35749
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.16.2, 1.17.1
>Reporter: Jimmy Zhao
>Assignee: Jimmy Zhao
>Priority: Blocker
>  Labels: pull-request-available
>
> As the title described, here is the procedure to reproduce the problem:
> 1. develop a simple flink stream job to consume from one kafka topic and sink 
> to anthoer kafka sever and topic
> 2. make amount of kafka message and produce to the source kafka topic, record 
> the message number
> 3. start the flink stream job, and config to cosume from earliest source 
> topic offset
> 4. during the job cosuming the source topic, restart the kafka cluster(we use 
> aws MSK)
> 5. the flink job will not throw any Exception like nothing happened, but only 
> print error log like : [kafka-producer-network-thread | producer-2] INFO  
> org.apache.kafka.clients.NetworkClient  - [Producer clientId=producer-2] Node 
> 2 disconnected.
> 6. wait for the kafka cluster finished restarting and all the source kafka 
> message consumed
> 7. count the target kafka topic message number, compare to the source, there 
> is a high probability of data loss(more than 50%)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35749) Kafka sink component will lose data when kafka cluster is unavailable for a while

2024-07-08 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35749:
---
Affects Version/s: (was: 1.20.0)

> Kafka sink component will lose data when kafka cluster is unavailable for a 
> while
> -
>
> Key: FLINK-35749
> URL: https://issues.apache.org/jira/browse/FLINK-35749
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.16.2, 1.17.1
>Reporter: Jimmy Zhao
>Assignee: Jimmy Zhao
>Priority: Blocker
>  Labels: pull-request-available
>
> As the title described, here is the procedure to reproduce the problem:
> 1. develop a simple flink stream job to consume from one kafka topic and sink 
> to anthoer kafka sever and topic
> 2. make amount of kafka message and produce to the source kafka topic, record 
> the message number
> 3. start the flink stream job, and config to cosume from earliest source 
> topic offset
> 4. during the job cosuming the source topic, restart the kafka cluster(we use 
> aws MSK)
> 5. the flink job will not throw any Exception like nothing happened, but only 
> print error log like : [kafka-producer-network-thread | producer-2] INFO  
> org.apache.kafka.clients.NetworkClient  - [Producer clientId=producer-2] Node 
> 2 disconnected.
> 6. wait for the kafka cluster finished restarting and all the source kafka 
> message consumed
> 7. count the target kafka topic message number, compare to the source, there 
> is a high probability of data loss(more than 50%)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35354] Support host mapping in Flink tikv cdc [flink-cdc]

2024-07-08 Thread via GitHub


GOODBOY008 commented on PR #3336:
URL: https://github.com/apache/flink-cdc/pull/3336#issuecomment-2216531933

   @Mrart Can you rebase the branch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-35783) Flink CDC Could not start the yaml Job

2024-07-08 Thread layhuts (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

layhuts closed FLINK-35783.
---
Resolution: Done

> Flink CDC Could not start the yaml Job
> --
>
> Key: FLINK-35783
> URL: https://issues.apache.org/jira/browse/FLINK-35783
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: layhuts
>Priority: Major
>
> * flink版本1.19.1
>  * flink CDC版本3.1.1
>  * 在${FLINK_HOME}/lib下增加了 mysql-connector-java-8.0.27.jar 和 
> flink-sql-connector-mysql-cdc-3.1.1.jar
>  * 在flink-cdc/lib下增加了flink-cdc-pipeline-connector-mysql-3.1.1.jar 和 
> flink-cdc-pipeline-connector-doris-3.1.1.jar
> 第一次使用
> {code:java}
> bin/flink-cdc.sh ***.yaml {code}
>  
> 提交作业提示java.lang.NoClassDefFoundError:org/apache/flink/cdc/runtime/typeutils/EventTypeInfo
> {code:java}
> Caused by: java.lang.NoClassDefFoundError: 
> org/apache/flink/cdc/runtime/typeutils/EventTypeInfo   at 
> java.lang.Class.getDeclaredFields0(Native Method)   at 
> java.lang.Class.privateGetDeclaredFields(Class.java:2583)   at 
> java.lang.Class.getDeclaredField(Class.java:2068)   at 
> java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1872)   at 
> java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:79)   at 
> java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:506)   at 
> java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)   at 
> java.security.AccessController.doPrivileged(Native Method)   at 
> java.io.ObjectStreamClass.(ObjectStreamClass.java:494)   at 
> java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)   at 
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681)   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2028)   at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1875)   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2028)   at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1875)   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2209)   
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692)   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2454)   at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378)   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236)   
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692)   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2454)   at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378)   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236)   
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692)   at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)   at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539)
>    at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527)
>    at 
> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
>    at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:496)
>    at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:294)
>    at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:173)
>    ... 19 more Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.cdc.runtime.typeutils.EventTypeInfo   at 
> java.net.URLClassLoader.findClass(URLClassLoader.java:387)   at 
> java.lang.ClassLoader.loadClass(ClassLoader.java:418)   at 
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)   at 
> java.lang.ClassLoader.loadClass(ClassLoader.java:351)   ... 52 more {code}
> 按照提示在${FLINK_HOME}/lib下增加了 flink-cdc-runtime-3.1.1.jar 后再次运行出现如下问题:
> {code:java}
> Exception in thread "main" org.apache.flink.util.FlinkException: Failed to 
> execute job 'Sync mid_cloud Database to Doris'.
>     at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2455)
>     at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2421)
>     at 
> org.apache.flink.cdc.composer.flink.FlinkPipelineExecution.execute(FlinkPipelineExecution.java:43)
>     at org.apache.flink.cdc.cli.CliExecutor.run(CliExecutor.java:74)
>     at org.apache.flink.cdc.cli.CliFrontend.main(CliFrontend.java:71)
> Caused by: java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobInitializationException: Could not start

Re: [PR] [FLINK-35299] Respect initial position for new streams [flink-connector-aws]

2024-07-08 Thread via GitHub


code-hard-play-harder commented on code in PR #140:
URL: 
https://github.com/apache/flink-connector-aws/pull/140#discussion_r1669689616


##
docs/content/docs/connectors/datastream/kinesis.md:
##
@@ -217,6 +217,38 @@ properties by providing a value for 
`ConsumerConfigConstants.STREAM_INITIAL_TIME
 If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined 
then the default pattern will be `-MM-dd'T'HH:mm:ss.SSSXXX`
 (for example, timestamp value is `2016-04-04` and pattern is `-MM-dd` 
given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without 
given a pattern).
 
+### Configuring starting position for new streams
+
+By default, the Flink Kinesis Consumer handles new streams the same way it 
handles a new shard for an existing stream, and it starts consuming from the 
earliest record (same behaviour as TRIM_HORIZON).
+
+This behaviour is fine if you're consuming from a stream that you don't want 
to lose any data from, but if you're consuming from a stream with a large 
retention and where it is fine to start consuming from "now",
+or more generally started from that is defined in 
`ConsumerConfigConstants.STREAM_INITIAL_POSITION`, this was not possible 
before. 
+
+This behaviour can now be enabled by setting the 
`ConsumerConfigConstants.APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS` flag to 
true, which will make ALL new streams "reset" to consume from the initial 
position
+instead of starting from the beginning. 
+
+If you just want to force a particular new stream to start consuming from the 
defined `ConsumerConfigConstants.STREAM_INITIAL_POSITION`, you can use the 
`ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO` property 
(described below) instead.
+
+### Resetting specific streams to the starting position
+
+One of the features of the Flink Kinesis Consumer is that it keeps track of 
the offset that the application is at for each shard, so that if the 
application is restarted we can start consuming from that offset
+when restoring from snapshot. 
+
+This is the ideal behaviour most of the time, but what if you want to jump to 
`LATEST` or go back to `TRIM_HORIZON` for a stream that is already being 
tracked by the Flink Kinesis Consumer? 
+
+You can now do this via the 
`ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO` property, 
which expects a comma separated list of strings referring to the names of the 
Kinesis Streams to reset.
+
+For example, if you configure your application with
+```
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

Review Comment:
   I was wondering if we are able to set different streams with different 
INITIAL POSITION. Let's say we would add `streamA`, `streamB` and `streamC` as 
new streams, I want to have `streamA` and `streamB` to consume from `LATEST` 
and `streamC` from `AT_TIMESTAMP`. Is this possible?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35792) Sorting by proctime does not work in rank

2024-07-08 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-35792:

Affects Version/s: 1.19.1
   (was: 1.19.0)

> Sorting by proctime does not work in rank
> -
>
> Key: FLINK-35792
> URL: https://issues.apache.org/jira/browse/FLINK-35792
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0, 1.19.1
>Reporter: xuyang
>Priority: Major
>
> Take the following sql as an example:
> {code:java}
> @Test
> def test(): Unit = {
>   val sql =
> """
>   |SELECT *
>   |FROM (
>   |  SELECT a, b, c,
>   |  ROW_NUMBER() OVER (PARTITION BY a ORDER BY b, proctime DESC) as 
> rank_num
>   |  FROM MyTable)
>   |WHERE rank_num = 1
> """.stripMargin
>   // This rank can't be converted into Deduplicated because it also uses `b`  
>  
>   // as order key.    
>   util.verifyExecPlan(sql)
> } {code}
> The rank node will not materialize the `proctime` in 
> `RelTimeIndicatorConverter`, thus the order key `proctime` is always null.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35792) Sorting by proctime does not work in rank

2024-07-08 Thread xuyang (Jira)
xuyang created FLINK-35792:
--

 Summary: Sorting by proctime does not work in rank
 Key: FLINK-35792
 URL: https://issues.apache.org/jira/browse/FLINK-35792
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.19.0, 1.20.0
Reporter: xuyang


Take the following sql as an example:
{code:java}
@Test
def test(): Unit = {
  val sql =
"""
  |SELECT *
  |FROM (
  |  SELECT a, b, c,
  |  ROW_NUMBER() OVER (PARTITION BY a ORDER BY b, proctime DESC) as 
rank_num
  |  FROM MyTable)
  |WHERE rank_num = 1
""".stripMargin

  // This rank can't be converted into Deduplicated because it also uses `b`   
  // as order key.    
  util.verifyExecPlan(sql)
} {code}
The rank node will not materialize the `proctime` in 
`RelTimeIndicatorConverter`, thus the order key `proctime` is always null.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35784][checkpoint] Fix the missing shared state registration of file-merging directories [flink]

2024-07-08 Thread via GitHub


Zakelly commented on PR #25051:
URL: https://github.com/apache/flink/pull/25051#issuecomment-2216370232

   Thanks. I change some names of vars and functions, will merge after CI green.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] FLIP-466: Introduce ProcessFunction Attribute in DataStream API V2 [flink]

2024-07-08 Thread via GitHub


flinkbot commented on PR #25055:
URL: https://github.com/apache/flink/pull/25055#issuecomment-2216364944

   
   ## CI report:
   
   * 1f14a6435260beb038deae2b136249d66e36599c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35740][mysql] Allow column as chunk key even if not in Primary Keys [flink-cdc]

2024-07-08 Thread via GitHub


SML0127 commented on PR #3448:
URL: https://github.com/apache/flink-cdc/pull/3448#issuecomment-2216351412

   @leonardBang PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [docs][minor] Correct Typos in Documentation/Code [flink-cdc]

2024-07-08 Thread via GitHub


yuxiqian commented on PR #3451:
URL: https://github.com/apache/flink-cdc/pull/3451#issuecomment-2216340738

   Seems `OracleE2eITCase.testOracleCDC` missed a change inside:
   ```
   arrays first differed at element [7];
   expected:<...,jacket,water resist[a]nt black wind breake...>
but was:<...,jacket,water resist[e]nt black wind breake...>
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] FLIP-466: Introduce ProcessFunction Attribute in DataStream API V2 [flink]

2024-07-08 Thread via GitHub


WencongLiu opened a new pull request, #25055:
URL: https://github.com/apache/flink/pull/25055

   ## What is the purpose of the change
   
   *Introduce ProcessFunction Attribute in DataStream API V2.*
   
   
   ## Brief change log
 - *Introduce the framework of ProcessFunction Attribute.*
   
   ## Does this pull request potentially affect one of the following parts:
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no 
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? JavaDocs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35791) Add database and table infos to Kafka json output.

2024-07-08 Thread LvYanquan (Jira)
LvYanquan created FLINK-35791:
-

 Summary: Add database and table infos to Kafka json output.
 Key: FLINK-35791
 URL: https://issues.apache.org/jira/browse/FLINK-35791
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: LvYanquan
 Fix For: cdc-3.2.0


Currently, database and table were not passed to canal/debezium json output 
format of Kafka sink.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34572] Support OceanBase Jdbc Catalog [flink-connector-jdbc]

2024-07-08 Thread via GitHub


RocMarshal commented on PR #109:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/109#issuecomment-2216074991

   @whhe nice work!
   Could you help squish the commits into a single commit ? thx


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35783) Flink CDC Could not start the yaml Job

2024-07-08 Thread layhuts (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863976#comment-17863976
 ] 

layhuts commented on FLINK-35783:
-

[~xiqian_yu] 谢谢 (y)

> Flink CDC Could not start the yaml Job
> --
>
> Key: FLINK-35783
> URL: https://issues.apache.org/jira/browse/FLINK-35783
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: layhuts
>Priority: Major
>
> * flink版本1.19.1
>  * flink CDC版本3.1.1
>  * 在${FLINK_HOME}/lib下增加了 mysql-connector-java-8.0.27.jar 和 
> flink-sql-connector-mysql-cdc-3.1.1.jar
>  * 在flink-cdc/lib下增加了flink-cdc-pipeline-connector-mysql-3.1.1.jar 和 
> flink-cdc-pipeline-connector-doris-3.1.1.jar
> 第一次使用
> {code:java}
> bin/flink-cdc.sh ***.yaml {code}
>  
> 提交作业提示java.lang.NoClassDefFoundError:org/apache/flink/cdc/runtime/typeutils/EventTypeInfo
> {code:java}
> Caused by: java.lang.NoClassDefFoundError: 
> org/apache/flink/cdc/runtime/typeutils/EventTypeInfo   at 
> java.lang.Class.getDeclaredFields0(Native Method)   at 
> java.lang.Class.privateGetDeclaredFields(Class.java:2583)   at 
> java.lang.Class.getDeclaredField(Class.java:2068)   at 
> java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1872)   at 
> java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:79)   at 
> java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:506)   at 
> java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)   at 
> java.security.AccessController.doPrivileged(Native Method)   at 
> java.io.ObjectStreamClass.(ObjectStreamClass.java:494)   at 
> java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)   at 
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681)   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2028)   at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1875)   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2028)   at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1875)   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2209)   
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692)   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2454)   at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378)   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236)   
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692)   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2454)   at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378)   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236)   
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692)   at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)   at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539)
>    at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527)
>    at 
> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
>    at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:496)
>    at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:294)
>    at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:173)
>    ... 19 more Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.cdc.runtime.typeutils.EventTypeInfo   at 
> java.net.URLClassLoader.findClass(URLClassLoader.java:387)   at 
> java.lang.ClassLoader.loadClass(ClassLoader.java:418)   at 
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)   at 
> java.lang.ClassLoader.loadClass(ClassLoader.java:351)   ... 52 more {code}
> 按照提示在${FLINK_HOME}/lib下增加了 flink-cdc-runtime-3.1.1.jar 后再次运行出现如下问题:
> {code:java}
> Exception in thread "main" org.apache.flink.util.FlinkException: Failed to 
> execute job 'Sync mid_cloud Database to Doris'.
>     at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2455)
>     at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2421)
>     at 
> org.apache.flink.cdc.composer.flink.FlinkPipelineExecution.execute(FlinkPipelineExecution.java:43)
>     at org.apache.flink.cdc.cli.CliExecutor.run(CliExecutor.java:74)
>     at org.apache.flink.cdc.cli.CliFrontend.main(CliFrontend.java:71)
> Caused by: java.lang.RuntimeException: 
> org.apache.flink

Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-08 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1669440001


##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml:
##
@@ -0,0 +1,111 @@
+
+
+
+http://maven.apache.org/POM/4.0.0";
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+
+flink-connector-aws-e2e-tests-parent
+org.apache.flink
+4.3-SNAPSHOT
+
+
+4.0.0
+
+flink-connector-aws-sqs-e2e-tests
+Flink : Connectors : AWS : E2E Tests : Amazon SQS
+jar
+
+
+
+org.apache.flink
+flink-streaming-java
+${flink.version}
+test
+
+
+
+org.apache.flink
+flink-connector-sqs
+${project.version}
+test
+
+
+
+org.apache.flink
+flink-connector-aws-base
+${project.version}
+test
+test-jar
+
+
+
+org.apache.flink
+flink-connector-sqs
+${project.version}
+test
+test-jar
+
+
+
+
+com.google.guava
+guava
+test
+
+
+
+com.fasterxml.jackson.core
+jackson-databind
+test
+
+
+
+com.fasterxml.jackson.datatype
+jackson-datatype-jsr310
+test
+
+
+
+com.fasterxml.jackson.datatype
+jackson-datatype-jdk8
+test
+

Review Comment:
   Not needed. removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-08 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1669436100


##
flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkElementConverter.java:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+/**
+ * An implementation of the {@link ElementConverter} that uses the AWS SQS SDK 
v2. The user only
+ * needs to provide a {@link SerializationSchema} of the {@code InputT} to 
transform it into a
+ * {@link SendMessageBatchRequestEntry} that may be persisted.
+ */
+@Internal
+public class SqsSinkElementConverter
+implements ElementConverter {
+
+/** A serialization schema to specify how the input element should be 
serialized. */
+private final SerializationSchema serializationSchema;
+
+private SqsSinkElementConverter(SerializationSchema 
serializationSchema) {
+this.serializationSchema = serializationSchema;
+}
+
+@Override
+public SendMessageBatchRequestEntry apply(InputT element, 
SinkWriter.Context context) {
+final byte[] messageBody = serializationSchema.serialize(element);
+return SendMessageBatchRequestEntry.builder()
+.id(UUID.randomUUID().toString())
+.messageBody(new String(messageBody, StandardCharsets.UTF_8))
+.build();
+}
+
+@Override
+public void open(Sink.InitContext context) {
+try {
+serializationSchema.open(

Review Comment:
   Thanks for the detail, Updated with  
serializationSchema.open(context.asSerializationSchemaInitializationContext());



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-08 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1669426704


##
flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkWriter.java:
##
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.connector.sqs.sink.client.SdkClientProvider;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
+import static 
org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier;
+
+/**
+ * Sink writer created by {@link SqsSink} to write to SQS. More details on the 
operation of this
+ * sink writer may be found in the doc for {@link SqsSink}. More details on 
the internals of this
+ * sink writer may be found in {@link AsyncSinkWriter}.
+ *
+ * The {@link SqsAsyncClient} used here may be configured in the standard 
way for the AWS SDK
+ * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} 
and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class SqsSinkWriter extends AsyncSinkWriter {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkWriter.class);
+
+private final SdkClientProvider clientProvider;
+
+private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER =
+AWSExceptionHandler.withClassifier(
+FatalExceptionClassifier.createChain(
+getInterruptedExceptionClassifier(),
+getInvalidCredentialsExceptionClassifier(),
+
SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier(),
+
SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(),
+
SqsExceptionClassifiers.getNotAuthorizedExceptionClassifier(),
+getSdkClientMisconfiguredExceptionClassifier()));
+
+private final Counter numRecordsOutErrorsCounter;
+
+/* Url of SQS */
+private final String sqsUrl;
+
+/* The sink writer metric group */
+private final SinkWriterMetricGroup metrics;
+
+/* Flag to whether fatally fail any time we encounter an exception when 
persisting records */
+private final boolean failOnError;
+
+SqsSinkWriter(
+ElementConverter 
elementConverter,
+Sink.InitContext context,
+int maxBatchSize,
+int maxInFlightRequests,
+int maxBufferedRequ

Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-08 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1669408631


##
flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsStateSerializer.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+/** SQS implementation {@link AsyncSinkWriterStateSerializer}. */
+@Internal
+public class SqsStateSerializer
+extends AsyncSinkWriterStateSerializer {
+@Override
+protected void serializeRequestToStream(
+final SendMessageBatchRequestEntry request, final DataOutputStream 
out)
+throws IOException {
+out.write(request.messageBody().getBytes(StandardCharsets.UTF_8));
+}
+
+@Override
+protected SendMessageBatchRequestEntry deserializeRequestFromStream(
+final long requestSize, final DataInputStream in) throws 
IOException {
+final byte[] requestData = new byte[(int) requestSize];
+in.read(requestData);
+return SendMessageBatchRequestEntry.builder()
+.id(UUID.randomUUID().toString())

Review Comment:
   Make sense, updated the code accordingly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2024-07-08 Thread Kevin Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863951#comment-17863951
 ] 

Kevin Tseng commented on FLINK-33545:
-

Hi [~arvid] , [~masc] , [~thomasWeise] 

I agree that on the surface it appeared there's no way record can be missed. 
And this issue is not a frequent occurrence.

I was able to narrow down the requirement where this has a high chance to take 
place (not 100%)
 # Kafka cluster not fully stable either due to network issue / cluster upgrade
 # batching is enabled by setting "linger.ms" to anything other than 0   << if 
we disable batching, this issue goes away altogether >>
 # massive data load taking place at kafka (millions of record being processed 
with our testing)
 # ack is set to all

Due to the condition required for this to take place, I'm not able to carry out 
test at-will to generate needed behavior, we simply observed same problem 
happening multiple times throughout the years we left Flink running.

This was observed in Flink 1.16, 1.17 with Kafka Connector 3.0.

We have never observed this while using FlinkKafkaProducer in the past, and 
only started seeing this after switching to KafkaSink.

If we compared the implementation between FlinkKafkaProducer & KafkaSink, 
there's inherent assumption that all records are sent within KafkaSink, whereas 
FlinkKafkaProducer utilized callback with counter to ensure everything is done.

Therefore, I thought the best course of action without having a complete 
understanding why this happened would be the implementation that was also 
suggested by [~masc]: "The best thing we can do here is to fail and replay from 
last checkpoint, to maintain at least/exactly once semantics. This is also 
inline with how FlinkKafkaProducer has implemented this."

But this change would only make sense if there's any possibility that some 
record could be sent after flush has been triggered, or flush returned without 
fully committed all messages. I am inclined to believe all records have been 
produced correctly and blocking are done in the correct thread, but evidence so 
far said otherwise.

I have not conducted further test on this since last committed, and have only 
applied our own workaround & fix locally to our instances.

The issue: https://issues.apache.org/jira/browse/FLINK-35749 may also have been 
the culprit of this problem

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VAL

Re: [PR] [WIP][FLINK-32218][Connector/Kinesis] Add support for parent-child shard ordering to Kinesis streams source [flink-connector-aws]

2024-07-08 Thread via GitHub


hlteoh37 commented on code in PR #145:
URL: 
https://github.com/apache/flink-connector-aws/pull/145#discussion_r1669358672


##
flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxyTest.java:
##
@@ -413,10 +414,15 @@ private List getTestShards(final int startShardId, 
final int endShardId)
 private Consumer getListShardRequestValidation(
 final String streamArn, final String startShardId, final String 
nextToken) {
 return req -> {
+ShardFilter startingPosition = null;
+if (startShardId != null) {
+startingPosition =
+
ListShardsStartingPosition.fromShardId(startShardId).getShardFilter();
+}
 ListShardsRequest expectedReq =
 ListShardsRequest.builder()
 .streamARN(streamArn)
-.exclusiveStartShardId(startShardId)
+.shardFilter(startingPosition)

Review Comment:
   Do we need to add a test for startingPosition of AT_TIMESTAMP?



##
flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/tracker/SplitTrackerTest.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator.tracker;
+
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisShardSplitWithAssignmentStatus;
+import 
org.apache.flink.connector.kinesis.source.enumerator.SplitAssignmentStatus;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId;
+import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+class SplitTrackerTest {
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testSplitWithoutParentAreAvailableToAssign(boolean 
preserveShardOrdering) {
+SplitTracker splitTracker = new SplitTracker(preserveShardOrdering);
+
+KinesisShardSplit split = getTestSplit(generateShardId(1), 
Collections.emptySet());
+
+splitTracker.addSplits(Collections.singletonList(split));
+
+List pendingSplits = 
splitTracker.splitsAvailableForAssignment();
+
+assertThat(pendingSplits).containsExactly(split);
+}
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testStateSnapshot(boolean preserveShardOrdering) {
+List assignedSplits =
+Arrays.asList(
+// Shards without parents
+getTestSplit(generateShardId(1), 
Collections.emptySet()),
+getTestSplit(generateShardId(2), 
Collections.emptySet()));
+List unassignedSplits =
+Arrays.asList(
+// Shards without parents
+getTestSplit(generateShardId(3), 
Collections.emptySet()),
+// Shards produced by splitting parent shard
+getTestSplit(generateShardId(4), 
Collections.singleton(generateShardId(1))),
+getTestSplit(generateShardId(5), 
Collections.singleton(generateShardId(1))),
+// Shard produced by merging 2 parent shards
+getTestSplit(
+generateShardId(6),
+new HashSet<>(
+Arrays.asList(generateShardId(2), 
generateShardId(3);
+
+List assignedSplitsWithStatus =
+assignedSplits.stream()
+ 

Re: [PR] [FLINK-35789][table] Allow defining watermarks & PRIMARY KEY in CREATE TABLE AS (CTAS) [flink]

2024-07-08 Thread via GitHub


flinkbot commented on PR #25054:
URL: https://github.com/apache/flink/pull/25054#issuecomment-2215396765

   
   ## CI report:
   
   * d7c585e34cab930de4c29f068a64e889d8a9bbfb UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35790) Update docs for new schema definition in CTAS and RTAS

2024-07-08 Thread Jira
Sergio Peña created FLINK-35790:
---

 Summary: Update docs for new schema definition in CTAS and RTAS
 Key: FLINK-35790
 URL: https://issues.apache.org/jira/browse/FLINK-35790
 Project: Flink
  Issue Type: Sub-task
Reporter: Sergio Peña






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35789][table] Allow defining watermarks & PRIMARY KEY in CREATE TABLE AS (CTAS) [flink]

2024-07-08 Thread via GitHub


spena commented on code in PR #25054:
URL: https://github.com/apache/flink/pull/25054#discussion_r1669360054


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java:
##
@@ -312,113 +302,36 @@ private void appendDerivedPrimaryKey(@Nullable 
SqlTableConstraint derivedPrimary
 "The base table already has a primary key. You might "
 + "want to specify EXCLUDING CONSTRAINTS.");
 } else if (derivedPrimaryKey != null) {
-List primaryKeyColumns = new ArrayList<>();
-for (SqlNode primaryKeyNode : derivedPrimaryKey.getColumns()) {
-String primaryKey = ((SqlIdentifier) 
primaryKeyNode).getSimple();
-if (!columns.containsKey(primaryKey)) {
-throw new ValidationException(
-String.format(
-"Primary key column '%s' is not 
defined in the schema at %s",
-primaryKey, 
primaryKeyNode.getParserPosition()));
-}
-if (!(columns.get(primaryKey) instanceof 
UnresolvedPhysicalColumn)) {
-throw new ValidationException(
-String.format(
-"Could not create a PRIMARY KEY with 
column '%s' at %s.\n"
-+ "A PRIMARY KEY constraint 
must be declared on physical columns.",
-primaryKey, 
primaryKeyNode.getParserPosition()));
-}
-primaryKeyColumns.add(primaryKey);
-}
-primaryKey =
-new UnresolvedPrimaryKey(
-derivedPrimaryKey
-.getConstraintName()
-.orElseGet(
-() ->
-
primaryKeyColumns.stream()
-.collect(
-
Collectors.joining(
-   
 "_", "PK_", ""))),
-primaryKeyColumns);
+setPrimaryKey(derivedPrimaryKey);
 }
 }
 
 private void appendDerivedWatermarks(
 Map mergingStrategies,
 List derivedWatermarkSpecs) {
-for (SqlWatermark derivedWatermarkSpec : derivedWatermarkSpecs) {
-SqlIdentifier eventTimeColumnName = 
derivedWatermarkSpec.getEventTimeColumnName();
-
-HashMap nameToTypeMap = new 
LinkedHashMap<>();
-nameToTypeMap.putAll(physicalFieldNamesToTypes);
-nameToTypeMap.putAll(metadataFieldNamesToTypes);
-nameToTypeMap.putAll(computedFieldNamesToTypes);
-
-verifyRowtimeAttribute(mergingStrategies, eventTimeColumnName, 
nameToTypeMap);
-String rowtimeAttribute = eventTimeColumnName.toString();
-
-SqlNode expression = 
derivedWatermarkSpec.getWatermarkStrategy();
-
-// this will validate and expand function identifiers.
-SqlNode validated =
-
sqlValidator.validateParameterizedExpression(expression, nameToTypeMap);
-
-watermarkSpecs.put(
-rowtimeAttribute,
-new UnresolvedWatermarkSpec(
-rowtimeAttribute,
-new 
SqlCallExpression(escapeExpressions.apply(validated;
-}
-}
-
-private void verifyRowtimeAttribute(
-Map mergingStrategies,
-SqlIdentifier eventTimeColumnName,
-Map allFieldsTypes) {
-String fullRowtimeExpression = eventTimeColumnName.toString();
-boolean specAlreadyExists = 
watermarkSpecs.containsKey(fullRowtimeExpression);
-
-if (specAlreadyExists
-&& mergingStrategies.get(FeatureOption.WATERMARKS)
-!= MergingStrategy.OVERWRITING) {
-throw new ValidationException(
-String.format(
-"There already exists a watermark spec for 
column '%s' in the base table. You "
-+ "might want to specify EXCLUDING 
WATERMARKS or OVERWRITING WATERMARKS.",
-fullRowtimeExpression));
-}
-
-List components = eventTimeColumnName.names;
-if (!allFieldsTypes.containsKey(components.get(0))) {
-throw new Vali

[jira] [Updated] (FLINK-35789) Allow defining watermarks & PRIMARY KEY in CREATE TABLE AS (CTAS)

2024-07-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35789:
---
Labels: pull-request-available  (was: )

> Allow defining watermarks & PRIMARY KEY in CREATE TABLE AS (CTAS)
> -
>
> Key: FLINK-35789
> URL: https://issues.apache.org/jira/browse/FLINK-35789
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Sergio Peña
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35789][table] Allow defining watermarks & PRIMARY KEY in CREATE TABLE AS (CTAS) [flink]

2024-07-08 Thread via GitHub


spena opened a new pull request, #25054:
URL: https://github.com/apache/flink/pull/25054

   ## What is the purpose of the change
   
   Allows defining the WATERMARK and PRIMARY KEY in the CTAS statement. 
   
   PRIMARY KEY example:
   ```
   CREATE TABLE table_name (PRIMARY KEY (person) NOT ENFORCED)
   AS SELECT person, age FROM people;
   ```
   
   WATERMARK example example:
   ```
   CREATE TABLE table_name (WATERMARK FOR ts AS ts - INTERVAL '3' SECOND)
   AS SELECT f0, f1, ts FROM people;
   ```
   
   ## Brief change log
   
   - Added support for WATERMARK and PRIMARY KEY syntax in CTAS
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Added unit tests to validation and converter classes
   - Added integration tests
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? JavaDocs (will follow-up with 
another PR to update docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35789) Allow defining watermarks & PRIMARY KEY in CREATE TABLE AS (CTAS)

2024-07-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-35789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergio Peña updated FLINK-35789:

Summary: Allow defining watermarks & PRIMARY KEY in CREATE TABLE AS (CTAS)  
(was: Allow WATERMARK & PRIMARY KEY in CREATE TABLE AS (CTAS))

> Allow defining watermarks & PRIMARY KEY in CREATE TABLE AS (CTAS)
> -
>
> Key: FLINK-35789
> URL: https://issues.apache.org/jira/browse/FLINK-35789
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Sergio Peña
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35789) Allow WATERMARK & PRIMARY KEY in CREATE TABLE AS (CTAS)

2024-07-08 Thread Jira
Sergio Peña created FLINK-35789:
---

 Summary: Allow WATERMARK & PRIMARY KEY in CREATE TABLE AS (CTAS)
 Key: FLINK-35789
 URL: https://issues.apache.org/jira/browse/FLINK-35789
 Project: Flink
  Issue Type: Sub-task
Reporter: Sergio Peña






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34111][table] Add support for json_quote, json_unquote, address PR feedback #24156 [flink]

2024-07-08 Thread via GitHub


snuyanzin commented on code in PR #24967:
URL: https://github.com/apache/flink/pull/24967#discussion_r1669276320


##
docs/data/sql_functions.yml:
##
@@ -377,6 +377,12 @@ string:
   - sql: SUBSTR(string, integer1[, integer2])
 table: STRING.substr(INTEGER1[, INTEGER2])
 description: Returns a substring of string starting from position integer1 
with length integer2 (to the end by default).
+  - sql: JSON_QUOTE(string)

Review Comment:
   One thing I forgot to mention: also need to update chinese doc.
   It's ok to put there english version of description however it's better to 
keep them synced 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35749] Kafka sink component will lose data when kafka cluster is unavailable for a while [flink-connector-kafka]

2024-07-08 Thread via GitHub


mas-chen commented on code in PR #107:
URL: 
https://github.com/apache/flink-connector-kafka/pull/107#discussion_r1669266607


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java:
##
@@ -449,12 +460,17 @@ public void onCompletion(RecordMetadata metadata, 
Exception exception) {
 }
 
 // Checking for exceptions from previous writes
-mailboxExecutor.submit(
+// Notice: throwing exception in mailboxExecutor thread is not 
safe enough for
+// triggering global
+// fail over, which has been fixed in [FLINK-31305]. And using
+// mailboxExecutor.execute() is better than
+// mailboxExecutor.submit() since submit will swallow 
exceptions inside.
+mailboxExecutor.execute(

Review Comment:
   Good catch, thanks for fixing this!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34127) Kafka connector repo runs a duplicate of `IntegrationTests` framework tests

2024-07-08 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863925#comment-17863925
 ] 

Mason Chen commented on FLINK-34127:


Hi [~arvid], yes feel free to takeover. I left it at this PR: 
[https://github.com/apache/flink-connector-kafka/pull/98]

I underestimated the task and eventually got stuck at the Java 21 CI errors, 
which requires to upgrade spotless and few more dependencies from the connector 
utils.

> Kafka connector repo runs a duplicate of `IntegrationTests` framework tests
> ---
>
> Key: FLINK-34127
> URL: https://issues.apache.org/jira/browse/FLINK-34127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System / CI, Connectors / Kafka
>Affects Versions: kafka-3.0.2
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>  Labels: pull-request-available
>
> I found out this behavior when troubleshooting CI flakiness. These 
> integration tests make heavy use of the CI since they require Kafka, 
> Zookeeper, and Docker containers. We can further stablize CI by not 
> redundantly running these set of tests.
> `grep -E ".*testIdleReader\[TestEnvironment.*" 14_Compile\ and\ test.txt` 
> returns:
> ```
> 2024-01-17T00:51:05.2943150Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2],
>  Semantic: [EXACTLY_ONCE]] is running.
> 2024-01-17T00:51:07.6922535Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2],
>  Semantic: [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:27.1326332Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a],
>  Semantic: [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:28.4000830Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a],
>  Semantic: [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:58.7830792Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:59.0544092Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:59.3910987Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:59.6025298Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:57:37.8378640Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:57:38.0144732Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:57:38.2004796Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:57:38.4072815Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T01:06:11.2933375Z Test 
> org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment:
>  [FlinkContainers], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] is runnin

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2024-07-08 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863924#comment-17863924
 ] 

Mason Chen commented on FLINK-33545:


[~arvid] [~thw] I believe this is solved by 
https://issues.apache.org/jira/browse/FLINK-35749!

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the 
> end of EXACTLY_ONCE actually ensured everything fenced in the current 
> checkpoint will be sent to Kafka, or fail the checkpoint if not successful.
>  
> Due the above finding, I'm recommending one of the following fixes:
>  # need to perform second flush for AT_LEAST_ONCE
>  # or move flush

Re: [PR] [WIP][FLINK-32218][Connector/Kinesis] Add support for parent-child shard ordering to Kinesis streams source [flink-connector-aws]

2024-07-08 Thread via GitHub


hlteoh37 commented on code in PR #145:
URL: 
https://github.com/apache/flink-connector-aws/pull/145#discussion_r1669243126


##
flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/tracker/SplitTrackerTest.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator.tracker;
+
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisShardSplitWithAssignmentStatus;
+import 
org.apache.flink.connector.kinesis.source.enumerator.SplitAssignmentStatus;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId;
+import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+class SplitTrackerTest {
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testSplitWithoutParentAreAvailableToAssign(boolean 
preserveShardOrdering) {
+SplitTracker splitTracker = new SplitTracker(preserveShardOrdering);
+
+KinesisShardSplit split = getTestSplit(generateShardId(1), 
Collections.emptySet());
+
+splitTracker.addSplits(Collections.singletonList(split));
+
+List pendingSplits = 
splitTracker.splitsAvailableForAssignment();
+
+assertThat(pendingSplits).containsExactly(split);
+}
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testStateSnapshot(boolean preserveShardOrdering) {
+List assignedSplits =
+Arrays.asList(
+// Shards without parents
+getTestSplit(generateShardId(1), 
Collections.emptySet()),
+getTestSplit(generateShardId(2), 
Collections.emptySet()));
+List unassignedSplits =
+Arrays.asList(
+// Shards without parents
+getTestSplit(generateShardId(3), 
Collections.emptySet()),
+// Shards produced by splitting parent shard
+getTestSplit(generateShardId(4), 
Collections.singleton(generateShardId(1))),
+getTestSplit(generateShardId(5), 
Collections.singleton(generateShardId(1))),
+// Shard produced by merging 2 parent shards
+getTestSplit(
+generateShardId(6),
+new HashSet<>(
+Arrays.asList(generateShardId(2), 
generateShardId(3);
+
+List assignedSplitsWithStatus =
+assignedSplits.stream()
+.map(
+split ->
+new 
KinesisShardSplitWithAssignmentStatus(
+split, 
SplitAssignmentStatus.ASSIGNED))
+.collect(Collectors.toList());
+List unassignedSplitsWithStatus 
=
+unassignedSplits.stream()
+.map(
+split ->
+new 
KinesisShardSplitWithAssignmentStatus(
+split, 
SplitAssignmentStatus.UNASSIGNED))
+.collect(Collectors.toList());
+List expectedState =
+new ArrayList<>(assignedSplitsWithStatus);
+expectedState.addAll(unassignedSplitsWithStatus);
+
+SplitTracker splitTracker = new SplitTracker(preserveShardOrdering);
+
+splitTracker.addSplits(assignedSplits);
+splitTracker.addSplits(unassignedSplits);
+spl

Re: [PR] [WIP][FLINK-32218][Connector/Kinesis] Add support for parent-child shard ordering to Kinesis streams source [flink-connector-aws]

2024-07-08 Thread via GitHub


hlteoh37 commented on code in PR #145:
URL: 
https://github.com/apache/flink-connector-aws/pull/145#discussion_r1669238262


##
flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/tracker/SplitTrackerTest.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator.tracker;
+
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisShardSplitWithAssignmentStatus;
+import 
org.apache.flink.connector.kinesis.source.enumerator.SplitAssignmentStatus;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId;
+import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+class SplitTrackerTest {
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testSplitWithoutParentAreAvailableToAssign(boolean 
preserveShardOrdering) {
+SplitTracker splitTracker = new SplitTracker(preserveShardOrdering);
+
+KinesisShardSplit split = getTestSplit(generateShardId(1), 
Collections.emptySet());
+
+splitTracker.addSplits(Collections.singletonList(split));
+
+List pendingSplits = 
splitTracker.splitsAvailableForAssignment();
+
+assertThat(pendingSplits).containsExactly(split);
+}
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testStateSnapshot(boolean preserveShardOrdering) {

Review Comment:
   nit: Might help readability to add some comments around the test structure
   
   `// Given mixture of assigned and unassigned splits`
   
   `// When registered with split tracker`
   
   `// Then snapshot state accurately reflects assigned and unassigned splits`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35749] Kafka sink component will lose data when kafka cluster is unavailable for a while [flink-connector-kafka]

2024-07-08 Thread via GitHub


JimmyZZZ commented on PR #107:
URL: 
https://github.com/apache/flink-connector-kafka/pull/107#issuecomment-2214785529

   @AHeise  added new test case KafkaWriterFaultToleranceITCase and do some 
refactor to extract some public things for KafkaWriterFaultToleranceITCase and 
KafkaWriterITCase, and also provided more details in this pr as Github PR 
template.
   
   Pls help to review again, thanks very much.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [FLINK-12450] Add leftshift, rightshift functions supported in Table API and SQL [flink]

2024-07-08 Thread via GitHub


flinkbot commented on PR #25053:
URL: https://github.com/apache/flink/pull/25053#issuecomment-2214758102

   
   ## CI report:
   
   * 16ea0a2e327ff94139aa069f65b893514540eb1d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-35785) Executing query in SQL client results in "java.lang.ClassNotFoundException: org.apache.flink.core.execution.RestoreMode"

2024-07-08 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-35785.
--
Resolution: Cannot Reproduce

[~Weijie Guo] [~ferenc-csaky] I'm not sure why, but after manually killing the 
Flink java processes and trying it again, it does work. I guess there was some 
left-over process that interfered with RC0. Apologies. 

> Executing query in SQL client results in "java.lang.ClassNotFoundException: 
> org.apache.flink.core.execution.RestoreMode"
> 
>
> Key: FLINK-35785
> URL: https://issues.apache.org/jira/browse/FLINK-35785
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Table SQL / Client
>Reporter: Martijn Visser
>Priority: Blocker
>
> Tested with Flink 1.20 RC0
> Reproducer:
> {code:sql}
> CREATE TABLE `product` (
> id INT,
> brandId INT,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '10',
> 'fields.id.kind' = 'random',
> 'fields.brandId.min' = '1',
> 'fields.brandId.max' = '100'
> );
> {code}
> Followed by:
> {code:sql}
> SELECT * FROM product
> {code}
> Results in:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException: org.apache.flink.core.execution.RestoreMode



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-12450) [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table API and SQL

2024-07-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12450:
---
Labels: auto-unassigned pull-request-available stale-assigned  (was: 
auto-unassigned stale-assigned)

> [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table 
> API and SQL
> ---
>
> Key: FLINK-12450
> URL: https://issues.apache.org/jira/browse/FLINK-12450
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Zhanchun Zhang
>Assignee: Kartikey Pant
>Priority: Major
>  Labels: auto-unassigned, pull-request-available, stale-assigned
>
> BIT_LSHIFT, Shifts a long number to the left
> BIT_RSHIFT, Shifts a long number to the right



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-12450] [WIP] Add leftshift, rightshift functions supported in Table API and SQL [flink]

2024-07-08 Thread via GitHub


kartikeypant opened a new pull request, #25053:
URL: https://github.com/apache/flink/pull/25053

   ## What is the purpose of the change
   * The purpose of this change is to enhance the Flink Table API and SQL by 
introducing new bitwise functions: `shiftleft` and `shiftright`. These will 
also improve compatibility with Hive since Hive also provides the functions 
with the same names. 
   * These functions will allow users to perform bitwise left shift and right 
shift operations directly within their SQL queries and Table API expressions.  
   * This change is associated to the following Jira Feature: [FLINK-12450
   ](https://issues.apache.org/jira/projects/FLINK/issues/FLINK-12450)
   
   
   ## Brief change log
   * Adds classes 
`org.apache.flink.table.runtime.functions.scalar.BitRightShiftFunction` and 
`org.apache.flink.table.runtime.functions.scalar.BitRightShiftFunction`.
   * Modifies `org.apache.flink.table.functions.BuiltInFunctionDefinitions` to 
add shiftleft and shiftright functions.
   * Modifies `org.apache.flink.table.api.internal.BaseExpressions` to add 
shiftleft and shiftright functions.
   
   ## Verifying this change
   * Added relevant tests for the functions to test the functional correctness 
in class `MathFunctionsITCase`.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? will complete documentation 
before raising for review
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35786] Fix NPE BlobServer / shutdownHook [flink]

2024-07-08 Thread via GitHub


XComp commented on code in PR #25050:
URL: https://github.com/apache/flink/pull/25050#discussion_r166929


##
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java:
##
@@ -396,7 +399,9 @@ public void close() throws IOException {
 if (LOG.isInfoEnabled()) {
 LOG.info(
 "Stopped BLOB server at {}:{}",
-serverSocket.getInetAddress().getHostAddress(),
+serverSocket == null
+? null
+: 
serverSocket.getInetAddress().getHostAddress(),
 getPort());

Review Comment:
   but then you might want to add a log message in the else branch as well. 
Otherwise, we would miss the log message in certain scenarios.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP][FLINK-32218][Connector/Kinesis] Add support for parent-child shard ordering to Kinesis streams source [flink-connector-aws]

2024-07-08 Thread via GitHub


hlteoh37 commented on code in PR #145:
URL: 
https://github.com/apache/flink-connector-aws/pull/145#discussion_r1668997135


##
flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorStateSerializerTest.java:
##
@@ -145,6 +129,27 @@ void testSerializeWithTrailingBytes() throws Exception {
 .withMessageContaining("Unexpected trailing bytes when 
deserializing.");
 }
 
+private List getSplits(
+IntStream assignedShardIdRange, IntStream unassignedShardIdRange) {

Review Comment:
   Nice 😄 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.20][FLINK-35784][checkpoint] Fix the missing shared state registration of file-merging directories [flink]

2024-07-08 Thread via GitHub


flinkbot commented on PR #25052:
URL: https://github.com/apache/flink/pull/25052#issuecomment-2214715238

   
   ## CI report:
   
   * cadb2364f238bf047e938b742079f7f56a2b10ea UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35784][checkpoint] Fix the missing shared state registration of file-merging directories [flink]

2024-07-08 Thread via GitHub


flinkbot commented on PR #25051:
URL: https://github.com/apache/flink/pull/25051#issuecomment-2214714463

   
   ## CI report:
   
   * e555779cb606a2ecb5bcd7f5f6e7c12eddd85a65 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [BP-1.20][FLINK-35784][checkpoint] Fix the missing shared state registration of file-merging directories [flink]

2024-07-08 Thread via GitHub


Zakelly opened a new pull request, #25052:
URL: https://github.com/apache/flink/pull/25052

   ## What is the purpose of the change
   
   The `OperatorSubtaskState` only make keyed state register with 
`SharedStateRegistry`. However, the file-merging directories's handle are 
wrapped in `FileMergingOperatorStreamStateHandle}` which is an 
`OperatorStreamStateHandle`. That means the `#registerSharedStates` is never 
called, so the registry(JM) will never know and delete the directories.
   
   
   ## Brief change log
   
- In `OperatorSubtaskState`, we register 
`FileMergingOperatorStreamStateHandle`.
   
   
   ## Verifying this change
   
   Modified tests in `SnapshotFileMergingCompatibilityITCase`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35784) The cp file-merging directory not properly registered in SharedStateRegistry

2024-07-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35784:
---
Labels: pull-request-available  (was: )

> The cp file-merging directory not properly registered in SharedStateRegistry
> 
>
> Key: FLINK-35784
> URL: https://issues.apache.org/jira/browse/FLINK-35784
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.20.0
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Blocker
>  Labels: pull-request-available
>
> The {{OperatorSubtaskState}} only make keyed state register with 
> {{SharedStateRegistry}}. However, the file-merging directories's handle are 
> wrapped in {{FileMergingOperatorStreamStateHandle}}, which is an 
> {{OperatorStreamStateHandle}}. That means the {{#registerSharedStates}} is 
> never called, so the registry will never delete the directories.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35784][checkpoint] Fix the missing shared state registration of file-merging directories [flink]

2024-07-08 Thread via GitHub


Zakelly opened a new pull request, #25051:
URL: https://github.com/apache/flink/pull/25051

   ## What is the purpose of the change
   
   The `OperatorSubtaskState` only make keyed state register with 
`SharedStateRegistry`. However, the file-merging directories's handle are 
wrapped in `FileMergingOperatorStreamStateHandle}` which is an 
`OperatorStreamStateHandle`. That means the `#registerSharedStates` is never 
called, so the registry(JM) will never know and delete the directories.
   
   
   ## Brief change log
   
- In `OperatorSubtaskState`, we register 
`FileMergingOperatorStreamStateHandle`.
   
   
   ## Verifying this change
   
   Modified tests in `SnapshotFileMergingCompatibilityITCase`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35786] Fix NPE BlobServer / shutdownHook [flink]

2024-07-08 Thread via GitHub


rkhachatryan commented on code in PR #25050:
URL: https://github.com/apache/flink/pull/25050#discussion_r1668975142


##
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java:
##
@@ -396,7 +399,9 @@ public void close() throws IOException {
 if (LOG.isInfoEnabled()) {
 LOG.info(
 "Stopped BLOB server at {}:{}",
-serverSocket.getInetAddress().getHostAddress(),
+serverSocket == null
+? null
+: 
serverSocket.getInetAddress().getHostAddress(),
 getPort());

Review Comment:
   I prefer to use one string interpolation method, but I noticed that 
`getPort` might also cause NPE,
   so I added a check in in the `if` condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35784) The cp file-merging directory not properly registered in SharedStateRegistry

2024-07-08 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan updated FLINK-35784:

Description: The {{OperatorSubtaskState}} only make keyed state register 
with {{SharedStateRegistry}}. However, the file-merging directories's handle 
are wrapped in {{FileMergingOperatorStreamStateHandle}}, which is an 
{{OperatorStreamStateHandle}}. That means the {{#registerSharedStates}} is 
never called, so the registry will never delete the directories.  (was: The 
{{OperatorSubtaskState}} only make keyed state to register with 
{{SharedStateRegistry}}. However, the file-merging directories's handle are 
wrapped in {{FileMergingOperatorStreamStateHandle}}, which is an 
{{OperatorStreamStateHandle}}. That means the {{#registerSharedStates}} is 
never called, so the registry will never delete the directories.)

> The cp file-merging directory not properly registered in SharedStateRegistry
> 
>
> Key: FLINK-35784
> URL: https://issues.apache.org/jira/browse/FLINK-35784
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.20.0
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Blocker
>
> The {{OperatorSubtaskState}} only make keyed state register with 
> {{SharedStateRegistry}}. However, the file-merging directories's handle are 
> wrapped in {{FileMergingOperatorStreamStateHandle}}, which is an 
> {{OperatorStreamStateHandle}}. That means the {{#registerSharedStates}} is 
> never called, so the registry will never delete the directories.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35786] Fix NPE BlobServer / shutdownHook [flink]

2024-07-08 Thread via GitHub


XComp commented on code in PR #25050:
URL: https://github.com/apache/flink/pull/25050#discussion_r1668927920


##
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java:
##
@@ -396,7 +399,9 @@ public void close() throws IOException {
 if (LOG.isInfoEnabled()) {
 LOG.info(
 "Stopped BLOB server at {}:{}",
-serverSocket.getInetAddress().getHostAddress(),
+serverSocket == null
+? null
+: 
serverSocket.getInetAddress().getHostAddress(),
 getPort());

Review Comment:
   ```java
   "Stopped BLOB server{}.",
   serverSocket == null
   ? ""
   : String.format(
   " at %s:%d",
   
serverSocket.getInetAddress().getHostAddress(), getPort()));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35738) Release Testing: Verify FLINK-26050 Too many small sst files in rocksdb state backend when using time window created in ascending order

2024-07-08 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35738:
---
Labels: release-testing  (was: )

> Release Testing: Verify FLINK-26050 Too many small sst files in rocksdb state 
> backend when using time window created in ascending order
> ---
>
> Key: FLINK-35738
> URL: https://issues.apache.org/jira/browse/FLINK-35738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Samrat Deb
>Priority: Major
>  Labels: release-testing
> Fix For: 1.20.0
>
>
> The problem occurs when using RocksDB and specific queries/jobs (please see 
> the ticket for the detailed description).
> To test the solution, run the following query with RocksDB as a state backend:
>  
> {code:java}
> INSERT INTO top_5_highest_view_time
> SELECT *
> FROM   (
>                 SELECT   *,
>                          ROW_NUMBER() OVER (PARTITION BY window_start, 
> window_end ORDER BY view_time DESC) AS rownum
>                 FROM     (
>                                   SELECT   window_start,
>                                            window_end,
>                                            product_id,
>                                            SUM(view_time) AS view_time,
>                                            COUNT(*)       AS cnt
>                                   FROM     TABLE(TUMBLE(TABLE 
> `shoe_clickstream`, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
>                                   GROUP BY window_start,
>                                            window_end,
>                                            product_id))
> WHERE  rownum <= 5;{code}
>  
> With the feature disabled (default), the number of files in rocksdb working 
> directory (as well as in the checkpoint) should grow indefinitely.
>  
> With feature enabled, the number of files should stays constant (as they 
> should get merged with each other).
> To enable the feature, set 
> {code:java}
> state.backend.rocksdb.manual-compaction.min-interval{code}
>  set to 1 minute for example.
>  
> Please consult 
> [https://github.com/apache/flink/blob/e7d7db3b6f87e53d9bace2a16cf95e5f7a79087a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionOptions.java#L29]
>  for other options if necessary.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35738) Release Testing: Verify FLINK-26050 Too many small sst files in rocksdb state backend when using time window created in ascending order

2024-07-08 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35738:
---
Fix Version/s: 1.20.0

> Release Testing: Verify FLINK-26050 Too many small sst files in rocksdb state 
> backend when using time window created in ascending order
> ---
>
> Key: FLINK-35738
> URL: https://issues.apache.org/jira/browse/FLINK-35738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Samrat Deb
>Priority: Major
> Fix For: 1.20.0
>
>
> The problem occurs when using RocksDB and specific queries/jobs (please see 
> the ticket for the detailed description).
> To test the solution, run the following query with RocksDB as a state backend:
>  
> {code:java}
> INSERT INTO top_5_highest_view_time
> SELECT *
> FROM   (
>                 SELECT   *,
>                          ROW_NUMBER() OVER (PARTITION BY window_start, 
> window_end ORDER BY view_time DESC) AS rownum
>                 FROM     (
>                                   SELECT   window_start,
>                                            window_end,
>                                            product_id,
>                                            SUM(view_time) AS view_time,
>                                            COUNT(*)       AS cnt
>                                   FROM     TABLE(TUMBLE(TABLE 
> `shoe_clickstream`, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
>                                   GROUP BY window_start,
>                                            window_end,
>                                            product_id))
> WHERE  rownum <= 5;{code}
>  
> With the feature disabled (default), the number of files in rocksdb working 
> directory (as well as in the checkpoint) should grow indefinitely.
>  
> With feature enabled, the number of files should stays constant (as they 
> should get merged with each other).
> To enable the feature, set 
> {code:java}
> state.backend.rocksdb.manual-compaction.min-interval{code}
>  set to 1 minute for example.
>  
> Please consult 
> [https://github.com/apache/flink/blob/e7d7db3b6f87e53d9bace2a16cf95e5f7a79087a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionOptions.java#L29]
>  for other options if necessary.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35738) Release Testing: Verify FLINK-26050 Too many small sst files in rocksdb state backend when using time window created in ascending order

2024-07-08 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35738:
---
Affects Version/s: 1.20.0

> Release Testing: Verify FLINK-26050 Too many small sst files in rocksdb state 
> backend when using time window created in ascending order
> ---
>
> Key: FLINK-35738
> URL: https://issues.apache.org/jira/browse/FLINK-35738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Samrat Deb
>Priority: Major
>
> The problem occurs when using RocksDB and specific queries/jobs (please see 
> the ticket for the detailed description).
> To test the solution, run the following query with RocksDB as a state backend:
>  
> {code:java}
> INSERT INTO top_5_highest_view_time
> SELECT *
> FROM   (
>                 SELECT   *,
>                          ROW_NUMBER() OVER (PARTITION BY window_start, 
> window_end ORDER BY view_time DESC) AS rownum
>                 FROM     (
>                                   SELECT   window_start,
>                                            window_end,
>                                            product_id,
>                                            SUM(view_time) AS view_time,
>                                            COUNT(*)       AS cnt
>                                   FROM     TABLE(TUMBLE(TABLE 
> `shoe_clickstream`, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
>                                   GROUP BY window_start,
>                                            window_end,
>                                            product_id))
> WHERE  rownum <= 5;{code}
>  
> With the feature disabled (default), the number of files in rocksdb working 
> directory (as well as in the checkpoint) should grow indefinitely.
>  
> With feature enabled, the number of files should stays constant (as they 
> should get merged with each other).
> To enable the feature, set 
> {code:java}
> state.backend.rocksdb.manual-compaction.min-interval{code}
>  set to 1 minute for example.
>  
> Please consult 
> [https://github.com/apache/flink/blob/e7d7db3b6f87e53d9bace2a16cf95e5f7a79087a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionOptions.java#L29]
>  for other options if necessary.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35712) [Release-1.20] Prepare RC0 release

2024-07-08 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo resolved FLINK-35712.

Resolution: Done

Already announced in mail list.

> [Release-1.20] Prepare RC0 release
> --
>
> Key: FLINK-35712
> URL: https://issues.apache.org/jira/browse/FLINK-35712
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
> Fix For: 1.20.0
>
>
> * The preview source release and binary convenience releases
>  * All artifacts that would normally be deployed to the Maven Central 
> Repository.
>  * Source code tag "release-1.20.0-rc0"
>  * Announce in the mail list



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35712) [Release-1.20] Prepare RC0 release

2024-07-08 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35712:
---
Description: 
* The preview source release and binary convenience releases
 * All artifacts that would normally be deployed to the Maven Central 
Repository.
 * Source code tag "release-1.20.0-rc0"
 * Announce in the mail list

  was:
* The preview source release and binary convenience releases
* All artifacts that would normally be deployed to the Maven Central Repository.
* Source code tag "release-1.18.0-rc0"
* Announce in the mail list


> [Release-1.20] Prepare RC0 release
> --
>
> Key: FLINK-35712
> URL: https://issues.apache.org/jira/browse/FLINK-35712
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
> Fix For: 1.20.0
>
>
> * The preview source release and binary convenience releases
>  * All artifacts that would normally be deployed to the Maven Central 
> Repository.
>  * Source code tag "release-1.20.0-rc0"
>  * Announce in the mail list



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35602) [Umbrella] Test Flink Release 1.20

2024-07-08 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35602:
---
Fix Version/s: 1.20.0

> [Umbrella] Test Flink Release 1.20
> --
>
> Key: FLINK-35602
> URL: https://issues.apache.org/jira/browse/FLINK-35602
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.20.0
>
> Attachments: image-2024-06-14-16-32-03-627.png
>
>
> This is an umbrella ticket for the Flink 1.20 testing efforts.
> There're two kinds of ticket: 'Release Testing Instructions' and 'Release 
> Testing' ones.
> h1. Release Testing Instructions
> For the Release Testing Instructions ticket (title starts with 'Release 
> Testing Instructions:',
> please follow the steps:
> !image-2024-06-14-16-32-03-627.png|width=894,height=511!
> 1. Whether the feature needs a crossteam testing, if no, authors just close 
> the ticket
> 2. If testing is required, the author should prepare user document[must have] 
> and additional instructions (if exists, which are thought necessary for 
> testers, e.g., some limitations that are outside the scope of the design)
> 3. After No.2 is done, the author should close the jira and clone/create a 
> new jira for tracking the testing result(keep unassigned or assign to a 
> volunteer)
> *Other features need cross-team testing*
> Also contributors are encouraged to create tickets if there are other ones 
> that need to be cross-team tested (Just create new ticket for testing using 
> title 'Release Testing: Verify ...' without 'Instructions' keyword).
> h1. Release Testing
> Note: All the testing sub-tasks should be opened with:
> Priority: Blocker
> Fix Version: 1.20.0
> Label: release-testing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35785) Executing query in SQL client results in "java.lang.ClassNotFoundException: org.apache.flink.core.execution.RestoreMode"

2024-07-08 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863848#comment-17863848
 ] 

Weijie Guo commented on FLINK-35785:


I download flink-1.20.0 binary from 
[https://dist.apache.org/repos/dist/dev/flink/flink-1.20.0-rc0/flink-1.20.0-bin-scala_2.12.tgz]

Then I start a local cluster in standalone mode and submit the SQL you 
mentioned, but I can't get the ClassNotFoundException.

What deployment model are you using, yarn/k8s session/application? 

 

 

> Executing query in SQL client results in "java.lang.ClassNotFoundException: 
> org.apache.flink.core.execution.RestoreMode"
> 
>
> Key: FLINK-35785
> URL: https://issues.apache.org/jira/browse/FLINK-35785
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Table SQL / Client
>Reporter: Martijn Visser
>Priority: Blocker
>
> Tested with Flink 1.20 RC0
> Reproducer:
> {code:sql}
> CREATE TABLE `product` (
> id INT,
> brandId INT,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '10',
> 'fields.id.kind' = 'random',
> 'fields.brandId.min' = '1',
> 'fields.brandId.max' = '100'
> );
> {code}
> Followed by:
> {code:sql}
> SELECT * FROM product
> {code}
> Results in:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException: org.apache.flink.core.execution.RestoreMode



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35785) Executing query in SQL client results in "java.lang.ClassNotFoundException: org.apache.flink.core.execution.RestoreMode"

2024-07-08 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863848#comment-17863848
 ] 

Weijie Guo edited comment on FLINK-35785 at 7/8/24 4:04 PM:


Hi [~martijnvisser] 

I download flink-1.20.0 binary from 
[https://dist.apache.org/repos/dist/dev/flink/flink-1.20.0-rc0/flink-1.20.0-bin-scala_2.12.tgz]

Then I start a local cluster in standalone mode and submit the SQL you 
mentioned, but I can't get the ClassNotFoundException.

What deployment mode are you using, yarn/k8s session/application?


was (Author: weijie guo):
Hi [~martijnvisser] 

I download flink-1.20.0 binary from 
[https://dist.apache.org/repos/dist/dev/flink/flink-1.20.0-rc0/flink-1.20.0-bin-scala_2.12.tgz]

Then I start a local cluster in standalone mode and submit the SQL you 
mentioned, but I can't get the ClassNotFoundException.

What deployment mode are you using, yarn/k8s session/application?

 

> Executing query in SQL client results in "java.lang.ClassNotFoundException: 
> org.apache.flink.core.execution.RestoreMode"
> 
>
> Key: FLINK-35785
> URL: https://issues.apache.org/jira/browse/FLINK-35785
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Table SQL / Client
>Reporter: Martijn Visser
>Priority: Blocker
>
> Tested with Flink 1.20 RC0
> Reproducer:
> {code:sql}
> CREATE TABLE `product` (
> id INT,
> brandId INT,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '10',
> 'fields.id.kind' = 'random',
> 'fields.brandId.min' = '1',
> 'fields.brandId.max' = '100'
> );
> {code}
> Followed by:
> {code:sql}
> SELECT * FROM product
> {code}
> Results in:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException: org.apache.flink.core.execution.RestoreMode



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35785) Executing query in SQL client results in "java.lang.ClassNotFoundException: org.apache.flink.core.execution.RestoreMode"

2024-07-08 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863848#comment-17863848
 ] 

Weijie Guo edited comment on FLINK-35785 at 7/8/24 4:04 PM:


Hi [~martijnvisser] 

I download flink-1.20.0 binary from 
[https://dist.apache.org/repos/dist/dev/flink/flink-1.20.0-rc0/flink-1.20.0-bin-scala_2.12.tgz]

Then I start a local cluster in standalone mode and submit the SQL you 
mentioned, but I can't get the ClassNotFoundException.

What deployment mode are you using, yarn/k8s session/application?

 


was (Author: weijie guo):
I download flink-1.20.0 binary from 
[https://dist.apache.org/repos/dist/dev/flink/flink-1.20.0-rc0/flink-1.20.0-bin-scala_2.12.tgz]

Then I start a local cluster in standalone mode and submit the SQL you 
mentioned, but I can't get the ClassNotFoundException.

What deployment mode are you using, yarn/k8s session/application?

 

> Executing query in SQL client results in "java.lang.ClassNotFoundException: 
> org.apache.flink.core.execution.RestoreMode"
> 
>
> Key: FLINK-35785
> URL: https://issues.apache.org/jira/browse/FLINK-35785
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Table SQL / Client
>Reporter: Martijn Visser
>Priority: Blocker
>
> Tested with Flink 1.20 RC0
> Reproducer:
> {code:sql}
> CREATE TABLE `product` (
> id INT,
> brandId INT,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '10',
> 'fields.id.kind' = 'random',
> 'fields.brandId.min' = '1',
> 'fields.brandId.max' = '100'
> );
> {code}
> Followed by:
> {code:sql}
> SELECT * FROM product
> {code}
> Results in:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException: org.apache.flink.core.execution.RestoreMode



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35785) Executing query in SQL client results in "java.lang.ClassNotFoundException: org.apache.flink.core.execution.RestoreMode"

2024-07-08 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863848#comment-17863848
 ] 

Weijie Guo edited comment on FLINK-35785 at 7/8/24 4:04 PM:


I download flink-1.20.0 binary from 
[https://dist.apache.org/repos/dist/dev/flink/flink-1.20.0-rc0/flink-1.20.0-bin-scala_2.12.tgz]

Then I start a local cluster in standalone mode and submit the SQL you 
mentioned, but I can't get the ClassNotFoundException.

What deployment mode are you using, yarn/k8s session/application?

 


was (Author: weijie guo):
I download flink-1.20.0 binary from 
[https://dist.apache.org/repos/dist/dev/flink/flink-1.20.0-rc0/flink-1.20.0-bin-scala_2.12.tgz]

Then I start a local cluster in standalone mode and submit the SQL you 
mentioned, but I can't get the ClassNotFoundException.

What deployment model are you using, yarn/k8s session/application? 

 

 

> Executing query in SQL client results in "java.lang.ClassNotFoundException: 
> org.apache.flink.core.execution.RestoreMode"
> 
>
> Key: FLINK-35785
> URL: https://issues.apache.org/jira/browse/FLINK-35785
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Table SQL / Client
>Reporter: Martijn Visser
>Priority: Blocker
>
> Tested with Flink 1.20 RC0
> Reproducer:
> {code:sql}
> CREATE TABLE `product` (
> id INT,
> brandId INT,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '10',
> 'fields.id.kind' = 'random',
> 'fields.brandId.min' = '1',
> 'fields.brandId.max' = '100'
> );
> {code}
> Followed by:
> {code:sql}
> SELECT * FROM product
> {code}
> Results in:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException: org.apache.flink.core.execution.RestoreMode



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35786] Fix NPE BlobServer / shutdownHook [flink]

2024-07-08 Thread via GitHub


rkhachatryan commented on code in PR #25050:
URL: https://github.com/apache/flink/pull/25050#discussion_r1668897088


##
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java:
##
@@ -354,10 +354,12 @@ public void close() throws IOException {
 if (shutdownRequested.compareAndSet(false, true)) {
 Exception exception = null;
 
-try {
-this.serverSocket.close();
-} catch (IOException ioe) {
-exception = ioe;
+if (serverSocket != null) { // can be null if shut down before 
constructor completion

Review Comment:
   Good point, added annotation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35786] Fix NPE BlobServer / shutdownHook [flink]

2024-07-08 Thread via GitHub


rkhachatryan commented on code in PR #25050:
URL: https://github.com/apache/flink/pull/25050#discussion_r1668896504


##
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java:
##
@@ -354,10 +354,12 @@ public void close() throws IOException {
 if (shutdownRequested.compareAndSet(false, true)) {
 Exception exception = null;
 
-try {
-this.serverSocket.close();
-} catch (IOException ioe) {
-exception = ioe;
+if (serverSocket != null) { // can be null if shut down before 
constructor completion

Review Comment:
   Right, added check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-33494) FLIP-376: Add DISTRIBUTED BY clause

2024-07-08 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863839#comment-17863839
 ] 

Weijie Guo edited comment on FLINK-33494 at 7/8/24 3:48 PM:


Hi [~jhughes]. It's possible that the person assigned doesn't have permission 
to edit it, I don't know, but that's probably why you didn't find it :D

1. click the Edit button on the top of this ticket.

!image-2024-07-08-23-45-43-850.png|width=546,height=153!

2. Find the `Release Note` area and fill it.

!image-2024-07-08-23-46-11-355.png|width=707,height=416!


was (Author: weijie guo):
Hi [~jhughes] 

1. click the Edit button on the top of this ticket.

!image-2024-07-08-23-45-43-850.png|width=546,height=153!

2. Find the `Release Note` area and fill it.

!image-2024-07-08-23-46-11-355.png|width=707,height=416!

> FLIP-376: Add DISTRIBUTED BY clause
> ---
>
> Key: FLINK-33494
> URL: https://issues.apache.org/jira/browse/FLINK-33494
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: Jim Hughes
>Priority: Major
> Fix For: 1.20.0
>
> Attachments: image-2024-07-08-23-45-43-850.png, 
> image-2024-07-08-23-46-11-355.png
>
>
> Many SQL vendors expose the concepts of Partitioning, Bucketing, and 
> Clustering.
> [FLIP-376|https://cwiki.apache.org/confluence/x/loxEE] proposes to introduce 
> the concept of Bucketing to Flink.
> It focuses solely on the syntax and necessary API changes to offer a native 
> way of declaring bucketing. Whether this is supported or not during runtime 
> should then be a connector characteristic - similar to partitioning. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35786] Fix NPE BlobServer / shutdownHook [flink]

2024-07-08 Thread via GitHub


XComp commented on code in PR #25050:
URL: https://github.com/apache/flink/pull/25050#discussion_r1668882384


##
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java:
##
@@ -354,10 +354,12 @@ public void close() throws IOException {
 if (shutdownRequested.compareAndSet(false, true)) {
 Exception exception = null;
 
-try {
-this.serverSocket.close();
-} catch (IOException ioe) {
-exception = ioe;
+if (serverSocket != null) { // can be null if shut down before 
constructor completion

Review Comment:
   hm, I guess, you're right. The socket creation should live in the 
`BlobServer#start` method rather than the constructor. ...from a conceptual 
standpoint. 🤔 
   
   Shall we make the `serverSocket` field `@Nullable` as part of this change to 
underline this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35786] Fix NPE BlobServer / shutdownHook [flink]

2024-07-08 Thread via GitHub


XComp commented on code in PR #25050:
URL: https://github.com/apache/flink/pull/25050#discussion_r1668880762


##
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java:
##
@@ -354,10 +354,12 @@ public void close() throws IOException {
 if (shutdownRequested.compareAndSet(false, true)) {
 Exception exception = null;
 
-try {
-this.serverSocket.close();
-} catch (IOException ioe) {
-exception = ioe;
+if (serverSocket != null) { // can be null if shut down before 
constructor completion

Review Comment:
   we would need another `null` check in further down in the `closeAsync` 
method (see 
[BlobServer:401](https://github.com/apache/flink/blob/2d524cf16224834ef4b8c3fd7f4c63caabdf43ba/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java#L401))
 to prevent the `NullPointerPointerException` to be thrown there, instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35786] Fix NPE BlobServer / shutdownHook [flink]

2024-07-08 Thread via GitHub


XComp commented on code in PR #25050:
URL: https://github.com/apache/flink/pull/25050#discussion_r1668880762


##
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java:
##
@@ -354,10 +354,12 @@ public void close() throws IOException {
 if (shutdownRequested.compareAndSet(false, true)) {
 Exception exception = null;
 
-try {
-this.serverSocket.close();
-} catch (IOException ioe) {
-exception = ioe;
+if (serverSocket != null) { // can be null if shut down before 
constructor completion

Review Comment:
   we would need another `null` check in further down in the `closeAsync` 
method (where the log message is created).



##
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java:
##
@@ -354,10 +354,12 @@ public void close() throws IOException {
 if (shutdownRequested.compareAndSet(false, true)) {
 Exception exception = null;
 
-try {
-this.serverSocket.close();
-} catch (IOException ioe) {
-exception = ioe;
+if (serverSocket != null) { // can be null if shut down before 
constructor completion

Review Comment:
   hm, I guess, you're right. The socket creation should live in the 
`BlobServer#start` method. ...from a conceptual standpoint.
   
   Shall we make the `serverSocket` field `@Nullable` as part of this change to 
underline this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33494) FLIP-376: Add DISTRIBUTED BY clause

2024-07-08 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863839#comment-17863839
 ] 

Weijie Guo commented on FLINK-33494:


Hi [~jhughes] 

1. click the Edit button on the top of this ticket.

!image-2024-07-08-23-45-43-850.png|width=546,height=153!

2. Find the `Release Note` area and fill it.

!image-2024-07-08-23-46-11-355.png|width=707,height=416!

> FLIP-376: Add DISTRIBUTED BY clause
> ---
>
> Key: FLINK-33494
> URL: https://issues.apache.org/jira/browse/FLINK-33494
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: Jim Hughes
>Priority: Major
> Fix For: 1.20.0
>
> Attachments: image-2024-07-08-23-45-43-850.png, 
> image-2024-07-08-23-46-11-355.png
>
>
> Many SQL vendors expose the concepts of Partitioning, Bucketing, and 
> Clustering.
> [FLIP-376|https://cwiki.apache.org/confluence/x/loxEE] proposes to introduce 
> the concept of Bucketing to Flink.
> It focuses solely on the syntax and necessary API changes to offer a native 
> way of declaring bucketing. Whether this is supported or not during runtime 
> should then be a connector characteristic - similar to partitioning. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33494) FLIP-376: Add DISTRIBUTED BY clause

2024-07-08 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-33494:
---
Attachment: image-2024-07-08-23-46-11-355.png

> FLIP-376: Add DISTRIBUTED BY clause
> ---
>
> Key: FLINK-33494
> URL: https://issues.apache.org/jira/browse/FLINK-33494
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: Jim Hughes
>Priority: Major
> Fix For: 1.20.0
>
> Attachments: image-2024-07-08-23-45-43-850.png, 
> image-2024-07-08-23-46-11-355.png
>
>
> Many SQL vendors expose the concepts of Partitioning, Bucketing, and 
> Clustering.
> [FLIP-376|https://cwiki.apache.org/confluence/x/loxEE] proposes to introduce 
> the concept of Bucketing to Flink.
> It focuses solely on the syntax and necessary API changes to offer a native 
> way of declaring bucketing. Whether this is supported or not during runtime 
> should then be a connector characteristic - similar to partitioning. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33494) FLIP-376: Add DISTRIBUTED BY clause

2024-07-08 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-33494:
---
Attachment: image-2024-07-08-23-45-43-850.png

> FLIP-376: Add DISTRIBUTED BY clause
> ---
>
> Key: FLINK-33494
> URL: https://issues.apache.org/jira/browse/FLINK-33494
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: Jim Hughes
>Priority: Major
> Fix For: 1.20.0
>
> Attachments: image-2024-07-08-23-45-43-850.png
>
>
> Many SQL vendors expose the concepts of Partitioning, Bucketing, and 
> Clustering.
> [FLIP-376|https://cwiki.apache.org/confluence/x/loxEE] proposes to introduce 
> the concept of Bucketing to Flink.
> It focuses solely on the syntax and necessary API changes to offer a native 
> way of declaring bucketing. Whether this is supported or not during runtime 
> should then be a connector characteristic - similar to partitioning. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35786] Fix NPE BlobServer / shutdownHook [flink]

2024-07-08 Thread via GitHub


rkhachatryan commented on code in PR #25050:
URL: https://github.com/apache/flink/pull/25050#discussion_r1668850192


##
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java:
##
@@ -354,10 +354,12 @@ public void close() throws IOException {
 if (shutdownRequested.compareAndSet(false, true)) {
 Exception exception = null;
 
-try {
-this.serverSocket.close();
-} catch (IOException ioe) {
-exception = ioe;
+if (serverSocket != null) { // can be null if shut down before 
constructor completion

Review Comment:
   The following comment in the constructor made me think that this is 
intentional - 
   the hook is registered before starting anything that might potentially fail, 
so the resource cleanup is guaranteed.
   
   ```
   ...
   this.cleanupTimer.schedule( ...
   
   this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, 
getClass().getSimpleName(), LOG);

   
   //  --- start the server ---
   ...
   this.serverSocket =
   NetUtils.createSocketFromPorts(
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35788) Deprecate old InputFormat and SinkFunction

2024-07-08 Thread Jira
João Boto created FLINK-35788:
-

 Summary: Deprecate old InputFormat and SinkFunction
 Key: FLINK-35788
 URL: https://issues.apache.org/jira/browse/FLINK-35788
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / JDBC
Reporter: João Boto






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35707) Allow column definition in CREATE TABLE AS (CTAS)

2024-07-08 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-35707.

Fix Version/s: 2.0.0
   Resolution: Fixed

Fixed in master: 93d7f45595435da10ac1d685feae8f7e8a9f6016

> Allow column definition in CREATE TABLE AS (CTAS)
> -
>
> Key: FLINK-35707
> URL: https://issues.apache.org/jira/browse/FLINK-35707
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Sergio Peña
>Assignee: Sergio Peña
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Allow defining new columns or overriding source column types on CTAS 
> statements.
> For following syntax should be allowed:
> {noformat}
> CREATE TABLE table_name [( [, ...n] )]
> [WITH (table_properties)]
> AS SELECT query_expression;
> {noformat}
> If new columns are defined in the create clause, then the columns should 
> appear at the beginning of the resulted sink schema. Columns of the query 
> source schema should appear at the end.
> If source columns are re-defined in the create clause, then the columns 
> should appear in the same order as the query schema. Column types may be 
> overridden, but implicit casting rules should be validated and applied 
> (similar to iNSERT/SELECT statements) to make sure the SELECT query will be 
> able to run and write the data to the sink table.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35707][Table SQL / API] Allow column definition in CREATE TABLE AS (CTAS) [flink]

2024-07-08 Thread via GitHub


twalthr merged PR #24987:
URL: https://github.com/apache/flink/pull/24987


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35786] Fix NPE BlobServer / shutdownHook [flink]

2024-07-08 Thread via GitHub


XComp commented on code in PR #25050:
URL: https://github.com/apache/flink/pull/25050#discussion_r1668797608


##
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java:
##
@@ -354,10 +354,12 @@ public void close() throws IOException {
 if (shutdownRequested.compareAndSet(false, true)) {
 Exception exception = null;
 
-try {
-this.serverSocket.close();
-} catch (IOException ioe) {
-exception = ioe;
+if (serverSocket != null) { // can be null if shut down before 
constructor completion

Review Comment:
   hm, wouldn't it be more consistent to move the shutdown hook initialization 
(along with the cleanup scheduling) to the end of the `BlobServer` constructor?
   
   It feels wrong to register the shutdown hook for an instance that's not 
fully created, yet. 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35786] Fix NPE BlobServer / shutdownHook [flink]

2024-07-08 Thread via GitHub


XComp commented on code in PR #25050:
URL: https://github.com/apache/flink/pull/25050#discussion_r1668797608


##
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java:
##
@@ -354,10 +354,12 @@ public void close() throws IOException {
 if (shutdownRequested.compareAndSet(false, true)) {
 Exception exception = null;
 
-try {
-this.serverSocket.close();
-} catch (IOException ioe) {
-exception = ioe;
+if (serverSocket != null) { // can be null if shut down before 
constructor completion

Review Comment:
   hm, wouldn't it be more consistent to move the shutdown hook initialization 
(along with the cleanup scheduling) at the end of the `BlobServer` 
initialization?
   
   It feels wrong to register the shutdown hook for an instance that's not 
fully created, yet. 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33494) FLIP-376: Add DISTRIBUTED BY clause

2024-07-08 Thread Jim Hughes (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863819#comment-17863819
 ] 

Jim Hughes commented on FLINK-33494:


Hi [~Weijie Guo] sorry to be slow to respond.  

Where did you add to the release notes?  (I tried to find them, and I did not 
know where to look.)  Happy to take a look.

> FLIP-376: Add DISTRIBUTED BY clause
> ---
>
> Key: FLINK-33494
> URL: https://issues.apache.org/jira/browse/FLINK-33494
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: Jim Hughes
>Priority: Major
> Fix For: 1.20.0
>
>
> Many SQL vendors expose the concepts of Partitioning, Bucketing, and 
> Clustering.
> [FLIP-376|https://cwiki.apache.org/confluence/x/loxEE] proposes to introduce 
> the concept of Bucketing to Flink.
> It focuses solely on the syntax and necessary API changes to offer a native 
> way of declaring bucketing. Whether this is supported or not during runtime 
> should then be a connector characteristic - similar to partitioning. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35785) Executing query in SQL client results in "java.lang.ClassNotFoundException: org.apache.flink.core.execution.RestoreMode"

2024-07-08 Thread Ferenc Csaky (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863817#comment-17863817
 ] 

Ferenc Csaky commented on FLINK-35785:
--

With a fresh local build today from the {{release-1.20}} branch, I was not able 
to reproduce this. Started a cluster via 
{{./build-target/bin/start-cluster.sh}}, then executed the mentioned commands 
in the desc. Job was deployed successfully, results showed up as expected.

> Executing query in SQL client results in "java.lang.ClassNotFoundException: 
> org.apache.flink.core.execution.RestoreMode"
> 
>
> Key: FLINK-35785
> URL: https://issues.apache.org/jira/browse/FLINK-35785
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Table SQL / Client
>Reporter: Martijn Visser
>Priority: Blocker
>
> Tested with Flink 1.20 RC0
> Reproducer:
> {code:sql}
> CREATE TABLE `product` (
> id INT,
> brandId INT,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '10',
> 'fields.id.kind' = 'random',
> 'fields.brandId.min' = '1',
> 'fields.brandId.max' = '100'
> );
> {code}
> Followed by:
> {code:sql}
> SELECT * FROM product
> {code}
> Results in:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException: org.apache.flink.core.execution.RestoreMode



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35785) Executing query in SQL client results in "java.lang.ClassNotFoundException: org.apache.flink.core.execution.RestoreMode"

2024-07-08 Thread Ferenc Csaky (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863817#comment-17863817
 ] 

Ferenc Csaky edited comment on FLINK-35785 at 7/8/24 2:41 PM:
--

With a fresh local build today from the {{release-1.20}} branch ({{mvn clean 
install -DskipTests -Dfast}}), I was not able to reproduce this. Started a 
cluster via {{./build-target/bin/start-cluster.sh}}, then executed the 
mentioned commands in the desc. Job was deployed successfully, results showed 
up as expected.


was (Author: ferenc-csaky):
With a fresh local build today from the {{release-1.20}} branch, I was not able 
to reproduce this. Started a cluster via 
{{./build-target/bin/start-cluster.sh}}, then executed the mentioned commands 
in the desc. Job was deployed successfully, results showed up as expected.

> Executing query in SQL client results in "java.lang.ClassNotFoundException: 
> org.apache.flink.core.execution.RestoreMode"
> 
>
> Key: FLINK-35785
> URL: https://issues.apache.org/jira/browse/FLINK-35785
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Table SQL / Client
>Reporter: Martijn Visser
>Priority: Blocker
>
> Tested with Flink 1.20 RC0
> Reproducer:
> {code:sql}
> CREATE TABLE `product` (
> id INT,
> brandId INT,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '10',
> 'fields.id.kind' = 'random',
> 'fields.brandId.min' = '1',
> 'fields.brandId.max' = '100'
> );
> {code}
> Followed by:
> {code:sql}
> SELECT * FROM product
> {code}
> Results in:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException: org.apache.flink.core.execution.RestoreMode



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35787) DefaultSlotStatusSyncer might bring down JVM (exit code 239 instead of a proper shutdown)

2024-07-08 Thread Roman Khachatryan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-35787:
--
Description: 
In our internal CI, I've encountered the following error:
{code:java}
* 12:02:47,205 [   pool-126-thread-1] ERROR 
org.apache.flink.util.FatalExitExceptionHandler              [] - FATAL: Thread 
'pool-126-thread-1' produced an uncaught exception. Stopping the process...
  java.util.concurrent.CompletionException: 
java.util.concurrent.RejectedExecutionException: Task 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@38ce013a[Not
 completed, task = 
java.util.concurrent.Executors$RunnableAdapter@640a9cf7[Wrapped task = 
java.util.concurrent.>
          at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
 ~[?:?]
          at 
java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:951)
 ~[?:?]
          at 
java.util.concurrent.CompletableFuture.handleAsync(CompletableFuture.java:2282) 
~[?:?]
          at 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer.allocateSlot(DefaultSlotStatusSyncer.java:138)
 ~[classes/:?]
          at 
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.allocateSlotsAccordingTo(FineGrainedSlotManager.java:722)
 ~[classes/:?]
          at 
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.checkResourceRequirements(FineGrainedSlotManager.java:645)
 ~[classes/:?]
          at 
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.lambda$checkResourceRequirementsWithDelay$12(FineGrainedSlotManager.java:603)
 ~[classes/:?]
          at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
          at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
          at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 [?:?]
          at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
          at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
          at java.lang.Thread.run(Thread.java:829) [?:?]
  Caused by: java.util.concurrent.RejectedExecutionException: Task 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@38ce013a[Not
 completed, task = 
java.util.concurrent.Executors$RunnableAdapter@640a9cf7[Wrapped task = 
java.util.concurrent.CompletableFuture$UniHandle@f3d>
          at 
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
 ~[?:?]
          at 
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825) 
~[?:?]
          at 
java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:340)
 ~[?:?]
          at 
java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:562)
 ~[?:?]
          at 
java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:705)
 ~[?:?]
          at 
java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:687)
 ~[?:?]
          at 
java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:949)
 ~[?:?]
          ... 11 more{code}
[From the 
code|https://github.com/apache/flink/blob/fa96ed209a7753a3fe46f93288857e9526c4a7ca/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java#L137],
 it looks like RM main thread executor was shut down, and that triggered JVM 
exit:
{code:java}
CompletableFuture requestFuture =
          gateway.requestSlot(
                  SlotID.getDynamicSlotID(resourceId),
                  jobId,
                  allocationId,
                  resourceProfile,
                  targetAddress,
                  resourceManagerId,
                  taskManagerRequestTimeout);        
CompletableFuture returnedFuture = new CompletableFuture<>();        
FutureUtils.assertNoException(
          requestFuture.handleAsync(
                (Acknowledge acknowledge, Throwable throwable) -> { ... },
                mainThreadExecutor));{code}
 

  was:
In our internal CI, I've encountered the following error:
{code:java}
* 12:02:47,205 [   pool-126-thread-1] ERROR 
org.apache.flink.util.FatalExitExceptionHandler              [] - FATAL: Thread 
'pool-126-thread-1' produced an uncaught exception. Stopping the process...
  java.util.concurrent.CompletionException: 
java.util.concurrent.RejectedExecutionException: Task 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@38ce013a[Not
 completed, task = 
java.util.concurrent.Executors$RunnableAdapter@640a9cf7[Wrapped task = 
java.util.concurrent.>
          at 
java.util.concurrent.CompletableFuture.encodeThrowable(Completabl

[jira] [Closed] (FLINK-35787) DefaultSlotStatusSyncer might bring down JVM (exit code 239 instead of a proper shutdown)

2024-07-08 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl closed FLINK-35787.
-
Resolution: Duplicate

This is a duplicate of FLINK-34427.

> DefaultSlotStatusSyncer might bring down JVM (exit code 239 instead of a 
> proper shutdown)
> -
>
> Key: FLINK-35787
> URL: https://issues.apache.org/jira/browse/FLINK-35787
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.1
>Reporter: Roman Khachatryan
>Priority: Major
>
> In our internal CI, I've encountered the following error:
> {code:java}
> * 12:02:47,205 [   pool-126-thread-1] ERROR 
> org.apache.flink.util.FatalExitExceptionHandler              [] - FATAL: 
> Thread 'pool-126-thread-1' produced an uncaught exception. Stopping the 
> process...
>   java.util.concurrent.CompletionException: 
> java.util.concurrent.RejectedExecutionException: Task 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@38ce013a[Not
>  completed, task = 
> java.util.concurrent.Executors$RunnableAdapter@640a9cf7[Wrapped task = 
> java.util.concurrent.>
>           at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
>  ~[?:?]
>           at 
> java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:951)
>  ~[?:?]
>           at 
> java.util.concurrent.CompletableFuture.handleAsync(CompletableFuture.java:2282)
>  ~[?:?]
>           at 
> org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer.allocateSlot(DefaultSlotStatusSyncer.java:138)
>  ~[classes/:?]
>           at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.allocateSlotsAccordingTo(FineGrainedSlotManager.java:722)
>  ~[classes/:?]
>           at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.checkResourceRequirements(FineGrainedSlotManager.java:645)
>  ~[classes/:?]
>           at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.lambda$checkResourceRequirementsWithDelay$12(FineGrainedSlotManager.java:603)
>  ~[classes/:?]
>           at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
>           at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
>           at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>  [?:?]
>           at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  [?:?]
>           at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  [?:?]
>           at java.lang.Thread.run(Thread.java:829) [?:?]
>   Caused by: java.util.concurrent.RejectedExecutionException: Task 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@38ce013a[Not
>  completed, task = 
> java.util.concurrent.Executors$RunnableAdapter@640a9cf7[Wrapped task = 
> java.util.concurrent.CompletableFuture$UniHandle@f3d>
>           at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
>  ~[?:?]
>           at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825) 
> ~[?:?]
>           at 
> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:340)
>  ~[?:?]
>           at 
> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:562)
>  ~[?:?]
>           at 
> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:705)
>  ~[?:?]
>           at 
> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:687)
>  ~[?:?]
>           at 
> java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:949)
>  ~[?:?]
>           ... 11 more{code}
> [From the 
> code|https://github.com/apache/flink/blob/fa96ed209a7753a3fe46f93288857e9526c4a7ca/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java#L137],
>  it looks like RM main thread executor was shut down, and that triggered JVM 
> exit:
> {code:java}
> CompletableFuture requestFuture =
>           gateway.requestSlot(
>                   SlotID.getDynamicSlotID(resourceId),
>                   jobId,
>                   allocationId,
>                   resourceProfile,
>                   targetAddress,
>                   resourceManagerId,
>                   taskManagerRequestTimeout);        
> CompletableFuture returnedFuture = new CompletableFuture<>();        
> FutureUtils.assertNoException(
>           requestFuture.handleAsync(
>                 (Acknowledge acknowledge, Throwable throw

[jira] [Updated] (FLINK-35787) DefaultSlotStatusSyncer might bring down JVM (exit code 239 instead of a proper shutdown)

2024-07-08 Thread Roman Khachatryan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-35787:
--
Description: 
In our internal CI, I've encountered the following error:
{code:java}
* 12:02:47,205 [   pool-126-thread-1] ERROR 
org.apache.flink.util.FatalExitExceptionHandler              [] - FATAL: Thread 
'pool-126-thread-1' produced an uncaught exception. Stopping the process...
  java.util.concurrent.CompletionException: 
java.util.concurrent.RejectedExecutionException: Task 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@38ce013a[Not
 completed, task = 
java.util.concurrent.Executors$RunnableAdapter@640a9cf7[Wrapped task = 
java.util.concurrent.>
          at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
 ~[?:?]
          at 
java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:951)
 ~[?:?]
          at 
java.util.concurrent.CompletableFuture.handleAsync(CompletableFuture.java:2282) 
~[?:?]
          at 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer.allocateSlot(DefaultSlotStatusSyncer.java:138)
 ~[classes/:?]
          at 
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.allocateSlotsAccordingTo(FineGrainedSlotManager.java:722)
 ~[classes/:?]
          at 
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.checkResourceRequirements(FineGrainedSlotManager.java:645)
 ~[classes/:?]
          at 
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.lambda$checkResourceRequirementsWithDelay$12(FineGrainedSlotManager.java:603)
 ~[classes/:?]
          at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
          at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
          at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 [?:?]
          at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
          at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
          at java.lang.Thread.run(Thread.java:829) [?:?]
  Caused by: java.util.concurrent.RejectedExecutionException: Task 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@38ce013a[Not
 completed, task = 
java.util.concurrent.Executors$RunnableAdapter@640a9cf7[Wrapped task = 
java.util.concurrent.CompletableFuture$UniHandle@f3d>
          at 
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
 ~[?:?]
          at 
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825) 
~[?:?]
          at 
java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:340)
 ~[?:?]
          at 
java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:562)
 ~[?:?]
          at 
java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:705)
 ~[?:?]
          at 
java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:687)
 ~[?:?]
          at 
java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:949)
 ~[?:?]
          ... 11 more{code}
[From the 
code|https://github.com/apache/flink/blob/fa96ed209a7753a3fe46f93288857e9526c4a7ca/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java#L137],
 it looks like RM main thread executor was shut down, and that triggered JVM 
exit:
{code:java}
CompletableFuture requestFuture =
          gateway.requestSlot(
                  SlotID.getDynamicSlotID(resourceId),
                  jobId,
                  allocationId,
                  resourceProfile,
                  targetAddress,
                  resourceManagerId,
                  taskManagerRequestTimeout);        CompletableFuture 
returnedFuture = new CompletableFuture<>();        
FutureUtils.assertNoException(
          requestFuture.handleAsync(
                (Acknowledge acknowledge, Throwable throwable) -> { ... },
                mainThreadExecutor));{code}
 

  was:
In our internal CI, I've encountered the following error:
{code:java}
* 12:02:47,205 [   pool-126-thread-1] ERROR 
org.apache.flink.util.FatalExitExceptionHandler              [] - FATAL: Thread 
'pool-126-thread-1' produced an uncaught exception. Stopping the process...
  java.util.concurrent.CompletionException: 
java.util.concurrent.RejectedExecutionException: Task 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@38ce013a[Not
 completed, task = 
java.util.concurrent.Executors$RunnableAdapter@640a9cf7[Wrapped task = 
java.util.concurrent.>
          at 
java.util.concurrent.CompletableFuture.encodeThrowable(Completabl

[jira] [Updated] (FLINK-35787) DefaultSlotStatusSyncer might bring down JVM (exit code 239 instead of a proper shutdown)

2024-07-08 Thread Roman Khachatryan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-35787:
--
Affects Version/s: 1.19.1

> DefaultSlotStatusSyncer might bring down JVM (exit code 239 instead of a 
> proper shutdown)
> -
>
> Key: FLINK-35787
> URL: https://issues.apache.org/jira/browse/FLINK-35787
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.1
>Reporter: Roman Khachatryan
>Priority: Major
>
> In our internal CI, I've encountered the following error:
> {code:java}
> * 12:02:47,205 [   pool-126-thread-1] ERROR 
> org.apache.flink.util.FatalExitExceptionHandler              [] - FATAL: 
> Thread 'pool-126-thread-1' produced an uncaught exception. Stopping the 
> process...
>   java.util.concurrent.CompletionException: 
> java.util.concurrent.RejectedExecutionException: Task 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@38ce013a[Not
>  completed, task = 
> java.util.concurrent.Executors$RunnableAdapter@640a9cf7[Wrapped task = 
> java.util.concurrent.>
>           at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
>  ~[?:?]
>           at 
> java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:951)
>  ~[?:?]
>           at 
> java.util.concurrent.CompletableFuture.handleAsync(CompletableFuture.java:2282)
>  ~[?:?]
>           at 
> org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer.allocateSlot(DefaultSlotStatusSyncer.java:138)
>  ~[classes/:?]
>           at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.allocateSlotsAccordingTo(FineGrainedSlotManager.java:722)
>  ~[classes/:?]
>           at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.checkResourceRequirements(FineGrainedSlotManager.java:645)
>  ~[classes/:?]
>           at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.lambda$checkResourceRequirementsWithDelay$12(FineGrainedSlotManager.java:603)
>  ~[classes/:?]
>           at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
>           at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
>           at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>  [?:?]
>           at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  [?:?]
>           at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  [?:?]
>           at java.lang.Thread.run(Thread.java:829) [?:?]
>   Caused by: java.util.concurrent.RejectedExecutionException: Task 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@38ce013a[Not
>  completed, task = 
> java.util.concurrent.Executors$RunnableAdapter@640a9cf7[Wrapped task = 
> java.util.concurrent.CompletableFuture$UniHandle@f3d>
>           at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
>  ~[?:?]
>           at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825) 
> ~[?:?]
>           at 
> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:340)
>  ~[?:?]
>           at 
> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:562)
>  ~[?:?]
>           at 
> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:705)
>  ~[?:?]
>           at 
> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:687)
>  ~[?:?]
>           at 
> java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:949)
>  ~[?:?]
>           ... 11 more{code}
> From the code, it looks like RM main thread executor was shut down, and that 
> triggered JVM exit:
> {code:java}
>         CompletableFuture requestFuture =
>                 gateway.requestSlot(
>                         SlotID.getDynamicSlotID(resourceId),
>                         jobId,
>                         allocationId,
>                         resourceProfile,
>                         targetAddress,
>                         resourceManagerId,
>                         taskManagerRequestTimeout);        
> CompletableFuture returnedFuture = new CompletableFuture<>();        
> FutureUtils.assertNoException(
>                 requestFuture.handleAsync(
>                         (Acknowledge acknowledge, Throwable throwable) -> { 
> ... },
>                         mainThreadExecutor));
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35787) DefaultSlotStatusSyncer might bring down JVM (exit code 239 instead of a proper shutdown)

2024-07-08 Thread Roman Khachatryan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-35787:
--
Component/s: Runtime / Coordination

> DefaultSlotStatusSyncer might bring down JVM (exit code 239 instead of a 
> proper shutdown)
> -
>
> Key: FLINK-35787
> URL: https://issues.apache.org/jira/browse/FLINK-35787
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Roman Khachatryan
>Priority: Major
>
> In our internal CI, I've encountered the following error:
> {code:java}
> * 12:02:47,205 [   pool-126-thread-1] ERROR 
> org.apache.flink.util.FatalExitExceptionHandler              [] - FATAL: 
> Thread 'pool-126-thread-1' produced an uncaught exception. Stopping the 
> process...
>   java.util.concurrent.CompletionException: 
> java.util.concurrent.RejectedExecutionException: Task 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@38ce013a[Not
>  completed, task = 
> java.util.concurrent.Executors$RunnableAdapter@640a9cf7[Wrapped task = 
> java.util.concurrent.>
>           at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
>  ~[?:?]
>           at 
> java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:951)
>  ~[?:?]
>           at 
> java.util.concurrent.CompletableFuture.handleAsync(CompletableFuture.java:2282)
>  ~[?:?]
>           at 
> org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer.allocateSlot(DefaultSlotStatusSyncer.java:138)
>  ~[classes/:?]
>           at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.allocateSlotsAccordingTo(FineGrainedSlotManager.java:722)
>  ~[classes/:?]
>           at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.checkResourceRequirements(FineGrainedSlotManager.java:645)
>  ~[classes/:?]
>           at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.lambda$checkResourceRequirementsWithDelay$12(FineGrainedSlotManager.java:603)
>  ~[classes/:?]
>           at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
>           at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
>           at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>  [?:?]
>           at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  [?:?]
>           at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  [?:?]
>           at java.lang.Thread.run(Thread.java:829) [?:?]
>   Caused by: java.util.concurrent.RejectedExecutionException: Task 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@38ce013a[Not
>  completed, task = 
> java.util.concurrent.Executors$RunnableAdapter@640a9cf7[Wrapped task = 
> java.util.concurrent.CompletableFuture$UniHandle@f3d>
>           at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
>  ~[?:?]
>           at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825) 
> ~[?:?]
>           at 
> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:340)
>  ~[?:?]
>           at 
> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:562)
>  ~[?:?]
>           at 
> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:705)
>  ~[?:?]
>           at 
> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:687)
>  ~[?:?]
>           at 
> java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:949)
>  ~[?:?]
>           ... 11 more{code}
> From the code, it looks like RM main thread executor was shut down, and that 
> triggered JVM exit:
> {code:java}
>         CompletableFuture requestFuture =
>                 gateway.requestSlot(
>                         SlotID.getDynamicSlotID(resourceId),
>                         jobId,
>                         allocationId,
>                         resourceProfile,
>                         targetAddress,
>                         resourceManagerId,
>                         taskManagerRequestTimeout);        
> CompletableFuture returnedFuture = new CompletableFuture<>();        
> FutureUtils.assertNoException(
>                 requestFuture.handleAsync(
>                         (Acknowledge acknowledge, Throwable throwable) -> { 
> ... },
>                         mainThreadExecutor));
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35787) DefaultSlotStatusSyncer might bring down JVM (exit code 239 instead of a proper shutdown)

2024-07-08 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-35787:
-

 Summary: DefaultSlotStatusSyncer might bring down JVM (exit code 
239 instead of a proper shutdown)
 Key: FLINK-35787
 URL: https://issues.apache.org/jira/browse/FLINK-35787
 Project: Flink
  Issue Type: Bug
Reporter: Roman Khachatryan


In our internal CI, I've encountered the following error:
{code:java}
* 12:02:47,205 [   pool-126-thread-1] ERROR 
org.apache.flink.util.FatalExitExceptionHandler              [] - FATAL: Thread 
'pool-126-thread-1' produced an uncaught exception. Stopping the process...
  java.util.concurrent.CompletionException: 
java.util.concurrent.RejectedExecutionException: Task 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@38ce013a[Not
 completed, task = 
java.util.concurrent.Executors$RunnableAdapter@640a9cf7[Wrapped task = 
java.util.concurrent.>
          at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
 ~[?:?]
          at 
java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:951)
 ~[?:?]
          at 
java.util.concurrent.CompletableFuture.handleAsync(CompletableFuture.java:2282) 
~[?:?]
          at 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer.allocateSlot(DefaultSlotStatusSyncer.java:138)
 ~[classes/:?]
          at 
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.allocateSlotsAccordingTo(FineGrainedSlotManager.java:722)
 ~[classes/:?]
          at 
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.checkResourceRequirements(FineGrainedSlotManager.java:645)
 ~[classes/:?]
          at 
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.lambda$checkResourceRequirementsWithDelay$12(FineGrainedSlotManager.java:603)
 ~[classes/:?]
          at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
          at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
          at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 [?:?]
          at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
          at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
          at java.lang.Thread.run(Thread.java:829) [?:?]
  Caused by: java.util.concurrent.RejectedExecutionException: Task 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@38ce013a[Not
 completed, task = 
java.util.concurrent.Executors$RunnableAdapter@640a9cf7[Wrapped task = 
java.util.concurrent.CompletableFuture$UniHandle@f3d>
          at 
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
 ~[?:?]
          at 
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825) 
~[?:?]
          at 
java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:340)
 ~[?:?]
          at 
java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:562)
 ~[?:?]
          at 
java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:705)
 ~[?:?]
          at 
java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:687)
 ~[?:?]
          at 
java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:949)
 ~[?:?]
          ... 11 more{code}
>From the code, it looks like RM main thread executor was shut down, and that 
>triggered JVM exit:
{code:java}
        CompletableFuture requestFuture =
                gateway.requestSlot(
                        SlotID.getDynamicSlotID(resourceId),
                        jobId,
                        allocationId,
                        resourceProfile,
                        targetAddress,
                        resourceManagerId,
                        taskManagerRequestTimeout);        
CompletableFuture returnedFuture = new CompletableFuture<>();        
FutureUtils.assertNoException(
                requestFuture.handleAsync(
                        (Acknowledge acknowledge, Throwable throwable) -> { ... 
},
                        mainThreadExecutor));
 {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35786] Fix NPE BlobServer / shutdownHook [flink]

2024-07-08 Thread via GitHub


flinkbot commented on PR #25050:
URL: https://github.com/apache/flink/pull/25050#issuecomment-2214162634

   
   ## CI report:
   
   * 2d524cf16224834ef4b8c3fd7f4c63caabdf43ba UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35786) NPE in BlobServer / shutdownHook

2024-07-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35786:
---
Labels: pull-request-available  (was: )

> NPE in BlobServer / shutdownHook
> 
>
> Key: FLINK-35786
> URL: https://issues.apache.org/jira/browse/FLINK-35786
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.1
>Reporter: Roman Khachatryan
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0, 1.19.2
>
>
> In constructor, BlobServer registers a shutdown hook to close the socket.
> Later in constructor, BlobServer creates this socket (and makes sure it's not 
> null).
>  
> But if the shutdown hook gets invoked before opening the socket, NPE will be 
> thrown:
> {code:java}
>   12:02:49,983 [PermanentBlobCache shutdown hook] INFO  
> org.apache.flink.runtime.blob.PermanentBlobCache             [] - Shutting 
> down BLOB cache
>   12:02:49,985 [BlobServer shutdown hook] ERROR 
> org.apache.flink.runtime.blob.BlobServer                     [] - Error 
> during shutdown of BlobServer via JVM shutdown hook.
>   java.lang.NullPointerException: null
>           at 
> org.apache.flink.runtime.blob.BlobServer.close(BlobServer.java:358) 
> ~[classes/:?]
>           at 
> org.apache.flink.util.ShutdownHookUtil.lambda$addShutdownHook$0(ShutdownHookUtil.java:39)
>  ~[flink-core-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
>           at java.lang.Thread.run(Thread.java:829) [?:?]
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35786] Fix NPE BlobServer / shutdownHook [flink]

2024-07-08 Thread via GitHub


rkhachatryan opened a new pull request, #25050:
URL: https://github.com/apache/flink/pull/25050

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP][FLINK-32218][Connector/Kinesis] Add support for parent-child shard ordering to Kinesis streams source [flink-connector-aws]

2024-07-08 Thread via GitHub


hlteoh37 commented on code in PR #145:
URL: 
https://github.com/apache/flink-connector-aws/pull/145#discussion_r1668649676


##
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/event/SplitsFinishedEvent.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.event;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceEvent;
+
+import java.util.Set;
+
+/** Source event used by source reader to communicate that splits are finished 
to enumerator. */
+@Internal
+public class SplitsFinishedEvent implements SourceEvent {

Review Comment:
   Q: Do we meed to implement `hashcode` and `equals` here?



##
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java:
##
@@ -64,6 +66,8 @@ public KinesisStreamsSourceReader(
 @Override
 protected void onSplitFinished(Map 
finishedSplitIds) {
 finishedSplitIds.keySet().forEach(this::unregisterShardMetricGroup);
+context.sendSourceEventToCoordinator(
+new SplitsFinishedEvent(new 
HashSet<>(finishedSplitIds.keySet(;
 }

Review Comment:
   nit: can we swap the order? It would be better to send 
`SplitsFinishedEvent`, but miss unregister metric group, compared to the 
alternative. 



##
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/ListShardsStartingPosition.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.proxy;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.services.kinesis.model.ShardFilter;
+import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
+
+import java.time.Instant;
+
+/** Starting position to perform list shard request. */
+@Internal
+public class ListShardsStartingPosition {

Review Comment:
   Nice! 😄 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35786) NPE in BlobServer / shutdownHook

2024-07-08 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-35786:
-

 Summary: NPE in BlobServer / shutdownHook
 Key: FLINK-35786
 URL: https://issues.apache.org/jira/browse/FLINK-35786
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.19.1
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.20.0, 1.19.2


In constructor, BlobServer registers a shutdown hook to close the socket.

Later in constructor, BlobServer creates this socket (and makes sure it's not 
null).

 

But if the shutdown hook gets invoked before opening the socket, NPE will be 
thrown:
{code:java}
  12:02:49,983 [PermanentBlobCache shutdown hook] INFO  
org.apache.flink.runtime.blob.PermanentBlobCache             [] - Shutting down 
BLOB cache
  12:02:49,985 [BlobServer shutdown hook] ERROR 
org.apache.flink.runtime.blob.BlobServer                     [] - Error during 
shutdown of BlobServer via JVM shutdown hook.
  java.lang.NullPointerException: null
          at 
org.apache.flink.runtime.blob.BlobServer.close(BlobServer.java:358) 
~[classes/:?]
          at 
org.apache.flink.util.ShutdownHookUtil.lambda$addShutdownHook$0(ShutdownHookUtil.java:39)
 ~[flink-core-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
          at java.lang.Thread.run(Thread.java:829) [?:?]
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-27355][runtime] Unregister JobManagerRunner after it's closed [flink]

2024-07-08 Thread via GitHub


XComp commented on code in PR #25027:
URL: https://github.com/apache/flink/pull/25027#discussion_r1668650443


##
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java:
##
@@ -85,7 +85,16 @@ public Collection getJobManagerRunners() {
 @Override
 public CompletableFuture localCleanupAsync(JobID jobId, Executor 
unusedExecutor) {
 if (isRegistered(jobId)) {
-return unregister(jobId).closeAsync();
+CompletableFuture resultFuture = 
this.jobManagerRunners.get(jobId).closeAsync();
+
+resultFuture.whenComplete(
+(result, throwable) -> {
+if (throwable == null) {
+unregister(jobId);

Review Comment:
   you're correct - that's what happens in the general scenario (there's an 
edge case where the CompletableFuture is already completed when chaining the 
callback to the future instance). 
   
   Executing the `unregister(JobID)` in the thread that executed `closeAsync` 
is something we don't want to do (for the reasons around main thread execution 
which I outlined in my comment above), though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-08 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1668650953


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##
@@ -266,19 +304,31 @@ protected void restoreJob(
 Optional savepointOpt = Optional.empty();
 
 if (spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
-savepointOpt =
-Optional.ofNullable(
-ctx.getResource()
-.getStatus()
-.getJobStatus()
-.getSavepointInfo()
-.getLastSavepoint())
-.flatMap(s -> 
Optional.ofNullable(s.getLocation()));
+if (FlinkStateSnapshotUtils.shouldCreateSnapshotResource(
+ctx.getOperatorConfig(), deployConfig)) {
+savepointOpt = 
getLatestSavepointPathFromFlinkStateSnapshots(ctx);
+} else {
+savepointOpt =
+Optional.ofNullable(
+ctx.getResource()
+.getStatus()
+.getJobStatus()
+.getSavepointInfo()
+.getLastSavepoint())
+.flatMap(s -> 
Optional.ofNullable(s.getLocation()));

Review Comment:
   I have added a commit that should cover these points, I have also resolved 
related conversations. The commit also adds support for `savepointTriggerNonce` 
and `checkpointTriggerNonce` with the new snapshot resources. I will also add 
unit tests for these later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-27355][runtime] Unregister JobManagerRunner after it's closed [flink]

2024-07-08 Thread via GitHub


XComp commented on code in PR #25027:
URL: https://github.com/apache/flink/pull/25027#discussion_r1668650443


##
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java:
##
@@ -85,7 +85,16 @@ public Collection getJobManagerRunners() {
 @Override
 public CompletableFuture localCleanupAsync(JobID jobId, Executor 
unusedExecutor) {
 if (isRegistered(jobId)) {
-return unregister(jobId).closeAsync();
+CompletableFuture resultFuture = 
this.jobManagerRunners.get(jobId).closeAsync();
+
+resultFuture.whenComplete(
+(result, throwable) -> {
+if (throwable == null) {
+unregister(jobId);

Review Comment:
   you're correct - that's what happens in the general scenario (there's an 
edge case where the `CompletableFuture` is already completed when chaining the 
callback to the future instance). 
   
   Executing the `unregister(JobID)` in the thread that executed `closeAsync` 
is something we don't want to do (for the reasons around main thread execution 
which I outlined in my comment above), though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35739][FLIP-444] Native file copy support [flink]

2024-07-08 Thread via GitHub


rkhachatryan commented on code in PR #25028:
URL: https://github.com/apache/flink/pull/25028#discussion_r1668622658


##
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java:
##
@@ -41,6 +42,41 @@
 /** Base class for file system factories that create S3 file systems. */
 public abstract class AbstractS3FileSystemFactory implements FileSystemFactory 
{
 
+public static final ConfigOption ACCESS_KEY =
+ConfigOptions.key("s3.access-key")

Review Comment:
   I couldn't find generated docs for these options (all s3 options).
   Should we start generating in this hotfix commit?



##
flink-core/src/main/java/org/apache/flink/util/FileUtils.java:
##
@@ -138,17 +140,27 @@ public static String readFile(File file, String 
charsetName) throws IOException
 return new String(bytes, charsetName);
 }
 
+public static String readFile(File file, Charset charset) throws 
IOException {
+byte[] bytes = readAllBytes(file.toPath());
+return new String(bytes, charset);
+}
+
 public static String readFileUtf8(File file) throws IOException {
-return readFile(file, "UTF-8");
+return readFile(file, StandardCharsets.UTF_8);
 }
 
 public static void writeFile(File file, String contents, String encoding) 
throws IOException {
 byte[] bytes = contents.getBytes(encoding);
 Files.write(file.toPath(), bytes, StandardOpenOption.WRITE);
 }

Review Comment:
   This method seems to be unused now.



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java:
##
@@ -38,6 +38,14 @@ public interface StreamStateHandle extends StateObject {
 /** @return Content of this handle as bytes array if it is already in 
memory. */
 Optional asBytesIfInMemory();
 
+/**
+ * @return Path to an underlying file represented by this {@link 
StreamStateHandle} or {@link
+ * Optional#empty()} if there is no such file.
+ */
+default Optional maybeGetPath() {
+return Optional.empty();
+}

Review Comment:
   Should this also be implemented by `DirectoryStreamStateHandle` and 
`SegmentFileStateHandle`?
   



##
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java:
##
@@ -101,13 +237,154 @@ public FlinkS3FileSystem(
 this.s3AccessHelper = s3UploadHelper;
 this.uploadThreadPool = Executors.newCachedThreadPool();
 
-Preconditions.checkArgument(s3uploadPartSize >= 
S3_MULTIPART_MIN_PART_SIZE);
+checkArgument(s3uploadPartSize >= S3_MULTIPART_MIN_PART_SIZE);
 this.s3uploadPartSize = s3uploadPartSize;
 this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream;
+LOG.info("Created Flink S3 FS, s5Cmd configuration: {}", 
s5CmdConfiguration);
 }
 
 // 
 
+@Override
+public boolean canCopyPaths(Path source, Path destination) {
+return canCopyPaths();
+}
+
+private boolean canCopyPaths() {
+return s5CmdConfiguration != null;
+}

Review Comment:
   1. Should we also check that one is remote and one is local? (IIRC, this is 
s5cmd requirement)
   2. Use public method everywhere and inline the private one?



##
flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/MinioTestContainer.java:
##
@@ -113,16 +114,16 @@ private String getHttpEndpoint() {
  * relevant parameter to access the {@code Minio} instance.
  */
 public void setS3ConfigOptions(Configuration config) {
-config.setString(FLINK_CONFIG_S3_ENDPOINT, getHttpEndpoint());
+config.set(AbstractS3FileSystemFactory.ENDPOINT, getHttpEndpoint());

Review Comment:
   nit: re-order commits so that the use of options goes after their 
introduction?
   
   Currently, I see it as
   ```
   [hotfix] Use newly defined ConfigOptions in MinioTestContainer 
   [hotfix] Move CompressionUtils to flink-core
   [hotfix] Create ConfigOptions for s3 access/secret keys and endpoint 
   ```



##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java:
##
@@ -94,46 +116,103 @@ public void transferAllStateDataToDirectory(
 }
 }
 
-/** Asynchronously runs the specified download requests on 
executorService. */
-private Stream> 
transferAllStateDataToDirectoryAsync(
-Collection handleWithPaths,
+private Collection createDownloadRunnables(
+Collection downloadRequests,
+CloseableRegistry closeableRegistry)
+throws IOException {
+// We need to support recovery from multiple FileSystems. At least one 
scenario that it can
+// happen is when:
+// 1. A checkpoint/savepoint is created on File

[jira] [Resolved] (FLINK-35697) Release Testing: Verify FLIP-451 Introduce timeout configuration to AsyncSink

2024-07-08 Thread Ahmed Hamdy (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmed Hamdy resolved FLINK-35697.
-
Resolution: Resolved

> Release Testing: Verify FLIP-451 Introduce timeout configuration to AsyncSink
> -
>
> Key: FLINK-35697
> URL: https://issues.apache.org/jira/browse/FLINK-35697
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Affects Versions: 1.20.0
>Reporter: Ahmed Hamdy
>Assignee: Muhammet Orazov
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.20.0
>
>
> h2. Description
> In FLIP-451 we added Timeout configuration to {{AsyncSinkWriter}}, with 
> default value of 10 minutes and default failOnTimeout to false. 
> We need to test the new feature on different levels
> - Functional Testing
> - Performance Testing
> - Regression Testing
> h2. Common Utils
> The feature introduced affects an abstract {{AsyncSinkWriter}} class. we need 
> to use an implementation sink for our tests, Any implementation where we can 
> track delivery of elements is accepted in our tests, an example is:
> {code}
> class DiscardingElementWriter extends AsyncSinkWriter {
> SeparateThreadExecutor executor =
> new SeparateThreadExecutor(r -> new Thread(r, 
> "DiscardingElementWriter"));
> public DiscardingElementWriter(
> Sink.InitContext context,
> AsyncSinkWriterConfiguration configuration,
> Collection> 
> bufferedRequestStates) {
> super(
> (element, context1) -> element.toString(),
> context,
> configuration,
> bufferedRequestStates);
> }
> @Override
> protected long getSizeInBytes(String requestEntry) {
> return requestEntry.length();
> }
> @Override
> protected void submitRequestEntries(
> List requestEntries, ResultHandler 
> resultHandler) {
> executor.execute(
> () -> {
> long delayMillis = new Random().nextInt(5000);
> try {
> Thread.sleep(delayMillis);
> } catch (InterruptedException ignored) {
> }
> for (String entry : requestEntries) {
> LOG.info("Discarding {} after {} ms", entry, 
> delayMillis);
> }
> resultHandler.complete();
> });
> }
> }
> {code}
> We will also need a simple Flink Job that writes data using the sink
> {code}
> final StreamExecutionEnvironment env = StreamExecutionEnvironment
> .getExecutionEnvironment();
> env.setParallelism(1);
> env.fromSequence(0, 100)
> .map(Object::toString)
> .sinkTo(new DiscardingTestAsyncSink<>());
> {code}
> We can use least values for batch size and inflight requests to increase 
> number of requests that are subject to timeout
> {code}
> public class DiscardingTestAsyncSink extends AsyncSinkBase {
> private static final Logger LOG = 
> LoggerFactory.getLogger(DiscardingTestAsyncSink.class);
> public DiscardingTestAsyncSink(long requestTimeoutMS, boolean 
> failOnTimeout) {
> super(
> (element, context) -> element.toString(),
> 1, // maxBatchSize
> 1, // maxInflightRequests
> 10, // maxBufferedRequests
> 1000L, // maxBatchsize
> 100, // MaxTimeInBuffer
> 500L, // maxRecordSize
> requestTimeoutMS,
> failOnTimeout);
> }
> @Override
> public SinkWriter createWriter(WriterInitContext context) throws 
> IOException {
> return new DiscardingElementWriter(
> new InitContextWrapper(context),
> AsyncSinkWriterConfiguration.builder()
> .setMaxBatchSize(this.getMaxBatchSize())
> .setMaxBatchSizeInBytes(this.getMaxBatchSizeInBytes())
> .setMaxInFlightRequests(this.getMaxInFlightRequests())
> .setMaxBufferedRequests(this.getMaxBufferedRequests())
> .setMaxTimeInBufferMS(this.getMaxTimeInBufferMS())
> 
> .setMaxRecordSizeInBytes(this.getMaxRecordSizeInBytes())
> .setFailOnTimeout(this.getFailOnTimeout())
> .setRequestTimeoutMS(this.getRequestTimeoutMS())
> .build(),
> Collections.emptyList());
> }
> @Override
>

  1   2   >