Re: [PR] [FLINK-28048][connectors] Introduce Source API alternative to FiniteTestSource [flink]

2024-04-18 Thread via GitHub


afedulov commented on PR #23777:
URL: https://github.com/apache/flink/pull/23777#issuecomment-2064139948

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34916) Support `ALTER CATALOG SET` syntax

2024-04-18 Thread Yubin Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yubin Li updated FLINK-34916:
-
Summary: Support `ALTER CATALOG SET` syntax  (was: Support `ALTER CATALOG` 
syntax)

> Support `ALTER CATALOG SET` syntax
> --
>
> Key: FLINK-34916
> URL: https://issues.apache.org/jira/browse/FLINK-34916
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: Yubin Li
>Assignee: Yubin Li
>Priority: Major
> Attachments: image-2024-03-22-18-30-33-182.png
>
>
> Set one or more properties in the specified catalog. If a particular property 
> is already set in the catalog, override the old value with the new one.
> !image-2024-03-22-18-30-33-182.png|width=736,height=583!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35159] Transition ExecutionGraph to RUNNING after slot assignment [flink]

2024-04-18 Thread via GitHub


zentol commented on code in PR #24680:
URL: https://github.com/apache/flink/pull/24680#discussion_r1571216390


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java:
##
@@ -1783,56 +1778,6 @@ void testStopWithSavepointFailsInIllegalState() throws 
Exception {
 .withCauseInstanceOf(CheckpointException.class);
 }
 
-@Test
-void testSavepointFailsWhenBlockingEdgeExists() throws Exception {

Review Comment:
   This was added in FLINK-34371 and is covered by tests in the 
DefaultScheduler:
   
   
https://github.com/apache/flink/commit/d4e0084649c019c536ee1e44bab15c8eca01bf13#diff-b4bc1cd606feb86850a18371520b5dd63d02090b8567fdf80730d1d7dd6e693d



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28048][connectors] Introduce Source API alternative to FiniteTestSource [flink]

2024-04-18 Thread via GitHub


afedulov commented on PR #23777:
URL: https://github.com/apache/flink/pull/23777#issuecomment-2064985125

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35165) AdaptiveBatch Scheduler should not restrict the default source parallelism to the max parallelism set

2024-04-18 Thread Venkata krishnan Sowrirajan (Jira)
Venkata krishnan Sowrirajan created FLINK-35165:
---

 Summary: AdaptiveBatch Scheduler should not restrict the default 
source parallelism to the max parallelism set
 Key: FLINK-35165
 URL: https://issues.apache.org/jira/browse/FLINK-35165
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Reporter: Venkata krishnan Sowrirajan


Copy-pasting the reasoning mentioned on this [discussion 
thread|https://lists.apache.org/thread/o887xhvvmn2rg5tyymw348yl2mqt23o7].

Let me state why I think 
"{_}jobmanager.adaptive-batch-scheduler.default-source-parallelism{_}" should 
not be bound by the "{_}jobmanager.adaptive-batch-scheduler.max-parallelism{_}".
 *  Source vertex is unique and does not have any upstream vertices - 
Downstream vertices read shuffled data partitioned by key, which is not the 
case for the Source vertex
 * Limiting source parallelism by downstream vertices' max parallelism is 
incorrect
 * If we say for ""semantic consistency" the source vertex parallelism has to 
be bound by the overall job's max parallelism, it can lead to following issues:
 ** High filter selectivity with huge amounts of data to read
 ** Setting high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so 
that source parallelism can be set higher can lead to small blocks and 
sub-optimal performance.
 ** Setting high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" 
requires careful tuning of network buffer configurations which is unnecessary 
in cases where it is not required just so that the source parallelism can be 
set high.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34915][table] Complete `DESCRIBE CATALOG` syntax [flink]

2024-04-18 Thread via GitHub


liyubin117 commented on PR #24630:
URL: https://github.com/apache/flink/pull/24630#issuecomment-2064501044

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35159] Transition ExecutionGraph to RUNNING in Executing state [flink]

2024-04-18 Thread via GitHub


zentol commented on PR #24680:
URL: https://github.com/apache/flink/pull/24680#issuecomment-2064570934

   meh, stop-with-savepoint failures need to be able to transition back into 
executing without recreating the EG :/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Adding JDBC Connector v3.2.0 [flink-web]

2024-04-18 Thread via GitHub


Samrat002 commented on code in PR #734:
URL: https://github.com/apache/flink-web/pull/734#discussion_r1571145570


##
docs/data/flink_connectors.yml:
##
@@ -51,11 +51,11 @@ hbase:
   compatibility: ["1.16.x", "1.17.x"]
 
 jdbc:
-  name: "Apache Flink JDBC Connector 3.1.2"
-  source_release_url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-connector-jdbc-3.1.2/flink-connector-jdbc-3.1.2-src.tgz;
-  source_release_asc_url: 
"https://downloads.apache.org/flink/flink-connector-jdbc-3.1.2/flink-connector-jdbc-3.1.2-src.tgz.asc;
-  source_release_sha512_url: 
"https://downloads.apache.org/flink/flink-connector-jdbc-3.1.2/flink-connector-jdbc-3.1.2-src.tgz.sha512;
-  compatibility: ["1.16.x", "1.17.x", "1.18.x"]
+  name: "Apache Flink JDBC Connector 3.2.0"
+  source_release_url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-connector-jdbc-3.2.0/flink-connector-jdbc-3.2.0-src.tgz;
+  source_release_asc_url: 
"https://downloads.apache.org/flink/flink-connector-jdbc-3.2.0/flink-connector-jdbc-3.2.0-src.tgz.asc;
+  source_release_sha512_url: 
"https://downloads.apache.org/flink/flink-connector-jdbc-3.2.0/flink-connector-jdbc-3.2.0-src.tgz.sha512;
+  compatibility: ["1.18.x", "1.19.x"]

Review Comment:
   [NIT]
   Earlier jdbc connector used to support 3 versions (1.16.x , 1.17.x and 
1.18.x) 
   now it supports only 2 ( 1.18.x and 1.19.x) versions only. any specific 
reason ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35115][Connectors/Kinesis] Allow kinesis consumer to snapshotState after operator had been cancelled [flink-connector-aws]

2024-04-18 Thread via GitHub


z3d1k commented on code in PR #138:
URL: 
https://github.com/apache/flink-connector-aws/pull/138#discussion_r1571230798


##
flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java:
##
@@ -149,6 +149,7 @@ public class FlinkKinesisConsumer extends 
RichParallelSourceFunction
 sequenceNumsToRestore;
 
 private volatile boolean running = true;
+private volatile boolean closed = false;

Review Comment:
   Added comments for these flags



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35155] Introduce TableRuntimeException [flink]

2024-04-18 Thread via GitHub


twalthr commented on code in PR #24679:
URL: https://github.com/apache/flink/pull/24679#discussion_r1570922395


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MiscAggFunctionITCase.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions;
+
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.types.Row;
+
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.api.DataTypes.*;
+import static org.apache.flink.types.RowKind.*;
+
+/** Tests for built-in ARRAY_AGG aggregation functions. */
+class MiscAggFunctionITCase extends BuiltInAggregateFunctionTestBase {
+
+@Override
+Stream getTestCaseSpecs() {
+return Stream.of(
+TestSpec.forExpression("SINGLE_VALUE")
+.withSource(
+ROW(STRING(), INT()),
+Arrays.asList(
+Row.ofKind(INSERT, "A", 1),
+Row.ofKind(INSERT, "A", 2),
+Row.ofKind(INSERT, "B", 2)))
+.testSqlRuntimeError(
+source ->
+"SELECT f0, SINGLE_VALUE(f1) FROM "
++ source
++ " GROUP BY f0",
+ROW(STRING(), INT()),
+ex ->
+
ex.hasRootCauseInstanceOf(TableRuntimeException.class)

Review Comment:
   Yes, let's strive for this. It should be our end goal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35159] Transition ExecutionGraph to RUNNING after slot assignment [flink]

2024-04-18 Thread via GitHub


zentol commented on code in PR #24680:
URL: https://github.com/apache/flink/pull/24680#discussion_r1571218899


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java:
##
@@ -166,18 +170,19 @@ static class MockCreatingExecutionGraphContext extends 
MockStateWithoutExecution
 new StateValidator<>("Executing");
 
 private Function<
-
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism,
-CreatingExecutionGraph.AssignmentResult>
+CreatingExecutionGraph.ExecutionGraphWithVertexParallelism,
+CreatingExecutionGraph.AssignmentResult>
 tryToAssignSlotsFunction =
-e -> 
CreatingExecutionGraph.AssignmentResult.success(e.getExecutionGraph());
+e -> 
CreatingExecutionGraph.AssignmentResult.success(e.getExecutionGraph());
 
 private GlobalFailureHandler globalFailureHandler =
 t -> {
 // No-op.
 };
 
 public void setExpectWaitingForResources() {
-waitingForResourcesStateValidator.expectInput((none) -> {});
+waitingForResourcesStateValidator.expectInput((none) -> {

Review Comment:
   No i screwed up :) My intellij is using a spotless config that isnt fully 
compatible with Flink...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35159] Transition ExecutionGraph to RUNNING after slot assignment [flink]

2024-04-18 Thread via GitHub


dmvk commented on code in PR #24680:
URL: https://github.com/apache/flink/pull/24680#discussion_r1571177099


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java:
##
@@ -166,18 +170,19 @@ static class MockCreatingExecutionGraphContext extends 
MockStateWithoutExecution
 new StateValidator<>("Executing");
 
 private Function<
-
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism,
-CreatingExecutionGraph.AssignmentResult>
+CreatingExecutionGraph.ExecutionGraphWithVertexParallelism,
+CreatingExecutionGraph.AssignmentResult>
 tryToAssignSlotsFunction =
-e -> 
CreatingExecutionGraph.AssignmentResult.success(e.getExecutionGraph());
+e -> 
CreatingExecutionGraph.AssignmentResult.success(e.getExecutionGraph());
 
 private GlobalFailureHandler globalFailureHandler =
 t -> {
 // No-op.
 };
 
 public void setExpectWaitingForResources() {
-waitingForResourcesStateValidator.expectInput((none) -> {});
+waitingForResourcesStateValidator.expectInput((none) -> {

Review Comment:
   uuh, what's happening here 樂 did we update formatter?



##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java:
##
@@ -1783,56 +1778,6 @@ void testStopWithSavepointFailsInIllegalState() throws 
Exception {
 .withCauseInstanceOf(CheckpointException.class);
 }
 
-@Test
-void testSavepointFailsWhenBlockingEdgeExists() throws Exception {

Review Comment:
   do you know why / when this was introduced? it indeed doesn't seem to be 
related AS; is this tested somewhere else?



##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java:
##
@@ -93,8 +93,12 @@ void 
testNotPossibleSlotAssignmentTransitionsToWaitingForResources() {
 ignored -> 
CreatingExecutionGraph.AssignmentResult.notPossible());
 context.setExpectWaitingForResources();
 
+final StateTrackingMockExecutionGraph executionGraph = new 
StateTrackingMockExecutionGraph();
+
 executionGraphWithVertexParallelismFuture.complete(
-getGraph(new StateTrackingMockExecutionGraph()));
+getGraph(executionGraph));
+
+
assertThat(executionGraph.getState()).isEqualTo(JobStatus.INITIALIZING);

Review Comment:
   iiuc, the graph would be running before the change; makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35159] Transition ExecutionGraph to RUNNING after slot assignment [flink]

2024-04-18 Thread via GitHub


zentol commented on code in PR #24680:
URL: https://github.com/apache/flink/pull/24680#discussion_r1571216654


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java:
##
@@ -93,8 +93,12 @@ void 
testNotPossibleSlotAssignmentTransitionsToWaitingForResources() {
 ignored -> 
CreatingExecutionGraph.AssignmentResult.notPossible());
 context.setExpectWaitingForResources();
 
+final StateTrackingMockExecutionGraph executionGraph = new 
StateTrackingMockExecutionGraph();
+
 executionGraphWithVertexParallelismFuture.complete(
-getGraph(new StateTrackingMockExecutionGraph()));
+getGraph(executionGraph));
+
+
assertThat(executionGraph.getState()).isEqualTo(JobStatus.INITIALIZING);

Review Comment:
   > the graph would be running before the change
   
   yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-04-18 Thread via GitHub


pvary commented on PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2065143602

   Good catch @loserwang1024!
   
   Could you please add a test case to prevent later code changes to revert 
this fix?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34915][table] Complete `DESCRIBE CATALOG` syntax [flink]

2024-04-18 Thread via GitHub


liyubin117 commented on PR #24630:
URL: https://github.com/apache/flink/pull/24630#issuecomment-2065504494

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34444] Initial implementation of JM operator metric rest api [flink]

2024-04-18 Thread via GitHub


afedulov commented on code in PR #24564:
URL: https://github.com/apache/flink/pull/24564#discussion_r1571313741


##
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##
@@ -184,6 +184,21 @@ public synchronized ComponentMetricStore 
getJobManagerMetricStore() {
 return ComponentMetricStore.unmodifiable(jobManager);
 }
 
+public synchronized ComponentMetricStore getJobManagerOperatorMetricStore(
+String jobID, String taskID) {
+JobMetricStore job = jobID == null ? null : jobs.get(jobID);
+if (job == null || taskID == null) {
+return null;
+}
+
+TaskMetricStore task = job.getTaskMetricStore(taskID);
+if (task == null) {
+return null;
+}
+
+return 
ComponentMetricStore.unmodifiable(task.getJobManagerOperatorMetricStore());
+}

Review Comment:
   Feels like this could be a bit more readable
   ```suggestion
public synchronized ComponentMetricStore 
getJobManagerOperatorMetricStore(String jobID, String taskID) {
   if (jobID == null || taskID == null) {
   return null;
   }
   JobMetricStore job = jobs.get(jobID);
   if (job == null) {
   return null;
   }
   TaskMetricStore task = job.getTaskMetricStore(taskID);
   if (task == null) {
   return null;
   }
   return 
ComponentMetricStore.unmodifiable(task.getJobManagerOperatorMetricStore());
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Support skip verify ssl certificate and hostname [flink-connector-elasticsearch]

2024-04-18 Thread via GitHub


liuml07 commented on PR #96:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/96#issuecomment-2065368913

   Thank you @tosone . Could you help review that PR? Also I can update that 
one to port your change about Table API changes. If you would like to co-author 
that by pushing to that branch, I'd appreciate it. We may need to ping folks 
with write permission to merge it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35047][state] Support ForStStateBackend and ForStKeyedStateBackend [flink]

2024-04-18 Thread via GitHub


flinkbot commented on PR #24682:
URL: https://github.com/apache/flink/pull/24682#issuecomment-2065635116

   
   ## CI report:
   
   * a5be0a188101bcaf6fb76c8d5144ad002a79a192 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35166] Make the SortBufferAccumulator use more buffers when the parallelism is small [flink]

2024-04-18 Thread via GitHub


flinkbot commented on PR #24683:
URL: https://github.com/apache/flink/pull/24683#issuecomment-2065635252

   
   ## CI report:
   
   * 0d0a7bcca23de55518958f32eb36a242bd4b615d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-18 Thread Vadim Vararu (Jira)


[ https://issues.apache.org/jira/browse/FLINK-35115 ]


Vadim Vararu deleted comment on FLINK-35115:
--

was (Author: JIRAUSER305101):
[~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, right?

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-18 Thread Vadim Vararu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838854#comment-17838854
 ] 

Vadim Vararu commented on FLINK-35115:
--

[~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, rigth?

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35166) Improve the performance of Hybrid Shuffle when enable memory decoupling

2024-04-18 Thread Jiang Xin (Jira)
Jiang Xin created FLINK-35166:
-

 Summary: Improve the performance of Hybrid Shuffle when enable 
memory decoupling
 Key: FLINK-35166
 URL: https://issues.apache.org/jira/browse/FLINK-35166
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Jiang Xin
 Fix For: 1.20.0


Currently, the tiered result partition creates the SortBufferAccumulator with 
the number of expected buffers as min(numSubpartitions+1, 512), thus the 
SortBufferAccumulator may obtain very few buffers when the parallelism is 
small. We can easily make the number of expected buffers 512 by default to have 
a better performance when the buffers are sufficient.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35167) [CDC] Introduce MaxCompute pipeline DataSink

2024-04-18 Thread zhangdingxin (Jira)
zhangdingxin created FLINK-35167:


 Summary: [CDC] Introduce MaxCompute pipeline DataSink
 Key: FLINK-35167
 URL: https://issues.apache.org/jira/browse/FLINK-35167
 Project: Flink
  Issue Type: New Feature
  Components: Flink CDC
Reporter: zhangdingxin


By integrating the MaxCompute DataSink, we enable the precise and efficient 
synchronization of data from Flink's Change Data Capture (CDC) into MaxCompute.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35162) Support grouping state get and put access

2024-04-18 Thread Jinzhong Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jinzhong Li updated FLINK-35162:

Component/s: Runtime / State Backends

> Support grouping state get and put access
> -
>
> Key: FLINK-35162
> URL: https://issues.apache.org/jira/browse/FLINK-35162
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35163) Utilize ForSt's native MultiGet API to optimize remote state access

2024-04-18 Thread Jinzhong Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jinzhong Li updated FLINK-35163:

Component/s: Runtime / State Backends

> Utilize ForSt's native MultiGet API to optimize remote state access
> ---
>
> Key: FLINK-35163
> URL: https://issues.apache.org/jira/browse/FLINK-35163
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35127] remove HybridSource to avoid CI failure. [flink-cdc]

2024-04-18 Thread via GitHub


lvyanquan commented on PR #3237:
URL: https://github.com/apache/flink-cdc/pull/3237#issuecomment-2065637354

   The failure in current CI contains a error message of 
   ```
   (4185ef2c5087300d32f25ea842d3ec59_cbc357ccb763df2852fee8c4fc7d55f2_0_0) 
switched from RUNNING to FAILED with failure cause:
   java.lang.IllegalStateException: The "before" field of UPDATE/DELETE message 
is null, please check the Postgres table has been set REPLICA IDENTITY to FULL 
level. You can update the setting by running the command in Postgres 'ALTER 
TABLE inventory.products REPLICA IDENTITY FULL'. Please see more in Debezium 
documentation: 
https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#postgresql-replica-identity
at 
org.apache.flink.cdc.connectors.postgres.table.PostgresValueValidator.validate(PostgresValueValidator.java:44)
 ~[classes/:?]
   ```
   Which is unrelated with this pr.
   
   And CC @loserwang1024.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35168) Basic State Iterator for async processing

2024-04-18 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35168:
---

 Summary: Basic State Iterator for async processing
 Key: FLINK-35168
 URL: https://issues.apache.org/jira/browse/FLINK-35168
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Zakelly Lan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35168) Basic State Iterator for async processing

2024-04-18 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan updated FLINK-35168:

Component/s: Runtime / State Backends

> Basic State Iterator for async processing
> -
>
> Key: FLINK-35168
> URL: https://issues.apache.org/jira/browse/FLINK-35168
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35028][runtime] Timer firing under async execution model [flink]

2024-04-18 Thread via GitHub


Zakelly commented on code in PR #24672:
URL: https://github.com/apache/flink/pull/24672#discussion_r1571754617


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java:
##
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.RecordContext;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.function.BiConsumerWithException;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * An implementation of {@link InternalTimerService} that is used by {@link
+ * 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperator}.
+ * The timer service will set {@link RecordContext} for the timers before 
invoking action to
+ * preserve the execution order between timer firing and records processing.
+ *
+ * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Timers>FLIP-425
+ * timers section.
+ * @param  Type of timer's key.
+ * @param  Type of the namespace to which timers are scoped.
+ */
+@Internal
+public class InternalTimerServiceAsyncImpl extends 
InternalTimerServiceImpl {
+
+private AsyncExecutionController asyncExecutionController;
+
+InternalTimerServiceAsyncImpl(
+TaskIOMetricGroup taskIOMetricGroup,
+KeyGroupRange localKeyGroupRange,
+KeyContext keyContext,
+ProcessingTimeService processingTimeService,
+KeyGroupedInternalPriorityQueue> 
processingTimeTimersQueue,
+KeyGroupedInternalPriorityQueue> 
eventTimeTimersQueue,
+StreamTaskCancellationContext cancellationContext,
+AsyncExecutionController asyncExecutionController) {
+super(
+taskIOMetricGroup,
+localKeyGroupRange,
+keyContext,
+processingTimeService,
+processingTimeTimersQueue,
+eventTimeTimersQueue,
+cancellationContext);
+this.asyncExecutionController = asyncExecutionController;
+}
+
+void onProcessingTime(long time) throws Exception {

Review Comment:
   add `@Override` here?



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java:
##
@@ -179,6 +180,55 @@  InternalTimerServiceImpl 
registerOrGetTimerService(
 return timerService;
 }
 
+@Override
+public  InternalTimerService getAsyncInternalTimerService(
+String name,
+TypeSerializer keySerializer,
+TypeSerializer namespaceSerializer,
+Triggerable triggerable,
+AsyncExecutionController asyncExecutionController) {
+checkNotNull(keySerializer, "Timers can only be used on keyed 
operators.");
+
+// the following casting is to overcome type restrictions.
+TimerSerializer timerSerializer =
+new TimerSerializer<>(keySerializer, namespaceSerializer);
+
+InternalTimerServiceAsyncImpl timerService =
+registerOrGetAsyncTimerService(name, timerSerializer, 
asyncExecutionController);
+
+timerService.startTimerService(
+timerSerializer.getKeySerializer(),
+timerSerializer.getNamespaceSerializer(),
+triggerable);
+
+return timerService;
+}
+
+ InternalTimerServiceAsyncImpl registerOrGetAsyncTimerService(
+String name,
+TimerSerializer timerSerializer,
+AsyncExecutionController 

[PR] [FLINK-35154] Javadoc generating fix [flink]

2024-04-18 Thread via GitHub


ldadima opened a new pull request, #24684:
URL: https://github.com/apache/flink/pull/24684

   ## What is the purpose of the change
   
   To fix [FLINK-35154](https://issues.apache.org/jira/browse/FLINK-35154)
   
   ## Verifying this change
   
   Run mvn javadoc:aggregate -Prelease
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? ( no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35169) Recycle buffers to freeSegments before releasing data buffer for sort accumulator

2024-04-18 Thread Yuxin Tan (Jira)
Yuxin Tan created FLINK-35169:
-

 Summary: Recycle buffers to freeSegments before releasing data 
buffer for sort accumulator
 Key: FLINK-35169
 URL: https://issues.apache.org/jira/browse/FLINK-35169
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.20.0
Reporter: Yuxin Tan
Assignee: Yuxin Tan


When using sortBufferAccumulator, we should recycle the buffers to freeSegments 
before releasing the data buffer. The reason is that when getting buffers from 
the DataBuffer, it may require more buffers than the current quantity available 
in freeSegments. Consequently, to ensure adequate buffers from DataBuffer, the 
flushed and recycled buffers should also be added to freeSegments for reuse.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35047) Introduce ForStStateBackend to manage ForSt

2024-04-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35047:
---
Labels: pull-request-available  (was: )

> Introduce ForStStateBackend to manage ForSt
> ---
>
> Key: FLINK-35047
> URL: https://issues.apache.org/jira/browse/FLINK-35047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Hangxiang Yu
>Assignee: Hangxiang Yu
>Priority: Major
>  Labels: pull-request-available
>
> A ForStStateBackend is introduced to leverage ForSt as state store for Flink.
> This ticket includes:
>  # Life cycle of ForSt, including initlization/closing
>  # basic options, resource control, metrics like RocksDBStateBackend
> doesn't include the implementation of new AsyncKeyedStateBackend and Async 
> State API which will be resolved in other tickets.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35047][state] Support ForStStateBackend and ForStKeyedStateBackend [flink]

2024-04-18 Thread via GitHub


masteryhx opened a new pull request, #24682:
URL: https://github.com/apache/flink/pull/24682

   
   
   ## What is the purpose of the change
   
   A ForStStateBackend is introduced to leverage ForSt as state store for Flink.
   
   This ticket includes:
   
   - Life cycle of ForSt, including initlization/closing
   - basic options, resource control, metrics like RocksDBStateBackend
   
   It doesn't include the implementation of new AsyncKeyedStateBackend and 
Async State API which will be resolved in other PRs.
   
   ## Brief change log
   
   - Support ForSt FlinkEnv
   - Introduce ForStStateBackend and ForStKeyedStateBackend
   - Support restoring and building ForSt
   - Introduce ResourceContainer for ForStStateBackend
   - Introduce Metrics for ForStStateBackend
   - Introduce ForSt related options
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Added serveral Tests about configs, metrics, memory control, forst load
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35167) Introduce MaxCompute pipeline DataSink

2024-04-18 Thread zhangdingxin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangdingxin updated FLINK-35167:
-
Summary: Introduce MaxCompute pipeline DataSink  (was: [CDC] Introduce 
MaxCompute pipeline DataSink)

> Introduce MaxCompute pipeline DataSink
> --
>
> Key: FLINK-35167
> URL: https://issues.apache.org/jira/browse/FLINK-35167
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: zhangdingxin
>Priority: Major
>  Labels: Flink-CDC, connector
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>
> By integrating the MaxCompute DataSink, we enable the precise and efficient 
> synchronization of data from Flink's Change Data Capture (CDC) into 
> MaxCompute.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33460][Connector/JDBC] Support property authentication connection. [flink-connector-jdbc]

2024-04-18 Thread via GitHub


RocMarshal commented on code in PR #115:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/115#discussion_r1571809638


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcConnectionOptions.java:
##
@@ -90,6 +101,11 @@ public JdbcConnectionOptionsBuilder withDriverName(String 
driverName) {
 return this;
 }
 
+public JdbcConnectionOptionsBuilder withExtendProperties(Properties 
extendProperties) {

Review Comment:
   @eskabetxe Thanks for your quick-review.
   The comments sounds good to me !
   And I updated it based on your comments.
   PTAL~ thank you very much : ) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35154) Javadoc aggregate fails

2024-04-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35154:
---
Labels: pull-request-available  (was: )

> Javadoc aggregate fails
> ---
>
> Key: FLINK-35154
> URL: https://issues.apache.org/jira/browse/FLINK-35154
> Project: Flink
>  Issue Type: Bug
>Reporter: Dmitriy Linevich
>Priority: Minor
>  Labels: pull-request-available
>
> Javadoc plugin fails with error. Using
> {code:java}
> javadoc:aggregate{code}
> ERROR:
> {code:java}
> [WARNING] The requested profile "include-hadoop" could not be activated 
> because it does not exist.
> [WARNING] The requested profile "arm" could not be activated because it does 
> not exist.
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-javadoc-plugin:2.9.1:aggregate (default-cli) 
> on project flink-parent: An error has occurred in JavaDocs report generation: 
> [ERROR] Exit code: 1 - 
> /{flinkProjectDir}/flink-end-to-end-tests/flink-confluent-schema-registry/target/generated-sources/example/avro/EventType.java:8:
>  error: type GenericEnumSymbol does not take parameters
> [ERROR] public enum EventType implements 
> org.apache.avro.generic.GenericEnumSymbol {
> [ERROR]    {code}
>  
> For our flink 1.17 ERROR
> {code:java}
> [ERROR] 
> /{flinkProjectDir}/flink-connectors/flink-sql-connector-hive-3.1.3/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java:21:
>  error: cannot find symbol
> [ERROR] import static 
> org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME;
> [ERROR] ^
> [ERROR]   symbol:   static DEFAULT_DATABASE_NAME
> [ERROR]   location: class{code}
>  
> Need to increase version of javadoc plugin from 2.9.1 to 2.10.4 (In current 
> version exists bug same with 
> [this|https://github.com/checkstyle/checkstyle/issues/291])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35166] Make the SortBufferAccumulator use more buffers when the parallelism is small [flink]

2024-04-18 Thread via GitHub


jiangxin369 opened a new pull request, #24683:
URL: https://github.com/apache/flink/pull/24683

   
   
   ## What is the purpose of the change
   
   Improve the performance of hybrid shuffle when enable memory decoupling and 
meantime the parallelism is small.
   
   
   ## Brief change log
   
 - Make the SortBufferAccumulator use more buffers when the parallelism is 
small
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35166) Improve the performance of Hybrid Shuffle when enable memory decoupling

2024-04-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35166:
---
Labels: pull-request-available  (was: )

> Improve the performance of Hybrid Shuffle when enable memory decoupling
> ---
>
> Key: FLINK-35166
> URL: https://issues.apache.org/jira/browse/FLINK-35166
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Jiang Xin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Currently, the tiered result partition creates the SortBufferAccumulator with 
> the number of expected buffers as min(numSubpartitions+1, 512), thus the 
> SortBufferAccumulator may obtain very few buffers when the parallelism is 
> small. We can easily make the number of expected buffers 512 by default to 
> have a better performance when the buffers are sufficient.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34915][table] Complete `DESCRIBE CATALOG` syntax [flink]

2024-04-18 Thread via GitHub


liyubin117 commented on PR #24630:
URL: https://github.com/apache/flink/pull/24630#issuecomment-2065632965

   @LadyForest Hi, I have updated as you said, besides, rebased on the latest 
master branch and CI passed, PTAL, thanks :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-18 Thread via GitHub


Zakelly commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1571771428


##
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##
@@ -181,4 +182,73 @@ public class ExecutionOptions {
 + " operators. NOTE: It takes effect only 
in the BATCH runtime mode and requires sorted inputs"
 + SORT_INPUTS.key()
 + " to be enabled.");
+
+/**
+ * A flag to enable or disable async mode related components when tasks 
initialize. As long as
+ * this option is enabled, the state access of Async state APIs will be 
executed asynchronously.
+ * Otherwise, the state access of Async state APIs will be executed 
synchronously. For Sync
+ * state APIs, the state access is always executed synchronously, enable 
this option would bring
+ * some overhead.
+ *
+ * Note: This is an experimental feature(FLIP-425) under evaluation.
+ */
+@Experimental
+public static final ConfigOption ASYNC_STATE_ENABLED =
+ConfigOptions.key("execution.async-mode.enabled")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"A flag to enable or disable async mode related 
components when tasks initialize."
++ " As long as this option is enabled, the 
state access of Async state APIs will be executed asynchronously."
++ " Otherwise, the state access of Async 
state APIs will be executed synchronously."
++ " For Sync state APIs, the state access 
is always executed synchronously, enable this option would bring some 
overhead.\n"
++ " Note: This is an experimental feature 
under evaluation.");
+
+/**
+ * The max limit of in-flight records number in async execution mode, 
'in-flight' refers to the
+ * records that have entered the operator but have not yet been processed 
and emitted to the
+ * downstream. If the in-flight records number exceeds the limit, the 
newly records entering
+ * will be blocked until the in-flight records number drops below the 
limit.
+ */
+@Experimental
+public static final ConfigOption ASYNC_INFLIGHT_RECORDS_LIMIT =
+ConfigOptions.key("execution.async-mode.in-flight-records-limit")
+.intType()
+.defaultValue(6000)
+.withDescription(
+"The max limit of in-flight records number in 
async execution mode, 'in-flight' refers"
++ " to the records that have entered the 
operator but have not yet been processed and"
++ " emitted to the downstream. If the 
in-flight records number exceeds the limit,"
++ " the newly records entering will be 
blocked until the in-flight records number drops below the limit.");
+
+/**
+ * The size of buffer under async execution mode. Async execution mode 
provides a buffer
+ * mechanism to reduce state access. When the number of state requests in 
the buffer exceeds the
+ * batch size, a batched state execution would be triggered. Larger batch 
sizes will bring
+ * higher end-to-end latency, this option works with {@link 
#ASYNC_BUFFER_TIMEOUT} to control
+ * the frequency of triggering.
+ */
+@Experimental
+public static final ConfigOption ASYNC_BUFFER_SIZE =
+ConfigOptions.key("execution.async-mode.buffer-size")
+.intType()
+.defaultValue(1000)
+.withDescription(
+"The size of buffer under async execution mode. 
Async execution mode provides a buffer mechanism to reduce state access."
++ " When the number of state requests in 
the active buffer exceeds the batch size,"
++ " a batched state execution would be 
triggered. Larger batch sizes will bring higher end-to-end latency,"
++ " this option works with 
'execution.async-state.buffer-timeout' to control the frequency of 
triggering.");
+
+/**
+ * The timeout of buffer triggering in milliseconds. If the buffer has not 
reached the {@link
+ * #ASYNC_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger 
will perform actively.
+ */
+@Experimental
+public static final ConfigOption ASYNC_BUFFER_TIMEOUT =
+ConfigOptions.key("execution.async-state.buffer-timeout")

Review Comment:
   Sry guys, I'm afraid I prefer the original proposal of `async-state` in 
FLIP, since this is only for the stateful operator and even the problem of 
`record-order` is caused by state 

Re: [PR] [FLINK-32315][k8s] Support uploading "local://" artifacts in Kubernetes Application Mode [flink]

2024-04-18 Thread via GitHub


ferenc-csaky commented on PR #24303:
URL: https://github.com/apache/flink/pull/24303#issuecomment-2065736703

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35027][runtime/checkpoint] Implement checkpoint drain in AsyncExecutionController [flink]

2024-04-18 Thread via GitHub


Zakelly commented on code in PR #24676:
URL: https://github.com/apache/flink/pull/24676#discussion_r1571777947


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##
@@ -133,6 +136,20 @@ public final  ThrowingConsumer, 
Exception> getRecordProcessor
 getClass().getName(), inputId));
 }
 
+@Override
+public final OperatorSnapshotFutures snapshotState(
+long checkpointId,
+long timestamp,
+CheckpointOptions checkpointOptions,
+CheckpointStreamFactory factory)
+throws Exception {
+asyncExecutionController.drainInflightRecords(0);

Review Comment:
   Ah... almost forget. Should we check `isAsyncStateProcessingEnabled` first?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35154] Javadoc generating fix [flink]

2024-04-18 Thread via GitHub


flinkbot commented on PR #24684:
URL: https://github.com/apache/flink/pull/24684#issuecomment-2065793603

   
   ## CI report:
   
   * b2a3ede0b60213c2dc38e5ff050c5f827e1e86bc UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34996][Connectors/Kafka] Use UserCodeCL to instantiate Deserializer [flink-connector-kafka]

2024-04-18 Thread via GitHub


hugogu commented on PR #89:
URL: 
https://github.com/apache/flink-connector-kafka/pull/89#issuecomment-2065806444

   Hi @uce and @rmetzger, do you mind reviewing the change and let me know if 
any change is required before merging. Thanks. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35127] remove HybridSource to avoid CI failure. [flink-cdc]

2024-04-18 Thread via GitHub


lvyanquan commented on PR #3237:
URL: https://github.com/apache/flink-cdc/pull/3237#issuecomment-2065589494

   Looks like HybridSource has a requirement for parallelism, but some of our 
tests use the core number of the machine itself 
https://github.com/apache/flink/blob/a312a3bdd258e0ff7d6f94e979b32e2bc762b82f/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceITCase.java#L55


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35161) Implement StateExecutor for ForStStateBackend

2024-04-18 Thread Jinzhong Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jinzhong Li updated FLINK-35161:

Component/s: Runtime / State Backends

> Implement StateExecutor for ForStStateBackend
> -
>
> Key: FLINK-35161
> URL: https://issues.apache.org/jira/browse/FLINK-35161
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-18 Thread Vadim Vararu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838854#comment-17838854
 ] 

Vadim Vararu edited comment on FLINK-35115 at 4/19/24 4:54 AM:
---

[~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, right?


was (Author: JIRAUSER305101):
[~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, rigth?

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35167) Introduce MaxCompute pipeline DataSink

2024-04-18 Thread zhangdingxin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838836#comment-17838836
 ] 

zhangdingxin commented on FLINK-35167:
--

I'd like to do it
[|https://issues.apache.org/jira/secure/AddComment!default.jspa?id=13575588]

> Introduce MaxCompute pipeline DataSink
> --
>
> Key: FLINK-35167
> URL: https://issues.apache.org/jira/browse/FLINK-35167
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: zhangdingxin
>Priority: Major
>  Labels: Flink-CDC, connector
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>
> By integrating the MaxCompute DataSink, we enable the precise and efficient 
> synchronization of data from Flink's Change Data Capture (CDC) into 
> MaxCompute.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35167) Introduce MaxCompute pipeline DataSink

2024-04-18 Thread zhangdingxin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangdingxin updated FLINK-35167:
-
Labels:   (was: Flink-CDC connector)

> Introduce MaxCompute pipeline DataSink
> --
>
> Key: FLINK-35167
> URL: https://issues.apache.org/jira/browse/FLINK-35167
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: zhangdingxin
>Assignee: zhangdingxin
>Priority: Major
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>
> By integrating the MaxCompute DataSink, we enable the precise and efficient 
> synchronization of data from Flink's Change Data Capture (CDC) into 
> MaxCompute.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35167) Introduce MaxCompute pipeline DataSink

2024-04-18 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren reassigned FLINK-35167:
-

Assignee: zhangdingxin

> Introduce MaxCompute pipeline DataSink
> --
>
> Key: FLINK-35167
> URL: https://issues.apache.org/jira/browse/FLINK-35167
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: zhangdingxin
>Assignee: zhangdingxin
>Priority: Major
>  Labels: Flink-CDC, connector
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>
> By integrating the MaxCompute DataSink, we enable the precise and efficient 
> synchronization of data from Flink's Change Data Capture (CDC) into 
> MaxCompute.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35167) Introduce MaxCompute pipeline DataSink

2024-04-18 Thread zhangdingxin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838836#comment-17838836
 ] 

zhangdingxin edited comment on FLINK-35167 at 4/19/24 2:22 AM:
---

I'd like to do it


was (Author: JIRAUSER305142):
I'd like to do it
[|https://issues.apache.org/jira/secure/AddComment!default.jspa?id=13575588]

> Introduce MaxCompute pipeline DataSink
> --
>
> Key: FLINK-35167
> URL: https://issues.apache.org/jira/browse/FLINK-35167
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: zhangdingxin
>Priority: Major
>  Labels: Flink-CDC, connector
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>
> By integrating the MaxCompute DataSink, we enable the precise and efficient 
> synchronization of data from Flink's Change Data Capture (CDC) into 
> MaxCompute.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35167) Introduce MaxCompute pipeline DataSink

2024-04-18 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren updated FLINK-35167:
--
Fix Version/s: cdc-3.2.0

> Introduce MaxCompute pipeline DataSink
> --
>
> Key: FLINK-35167
> URL: https://issues.apache.org/jira/browse/FLINK-35167
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: zhangdingxin
>Assignee: zhangdingxin
>Priority: Major
> Fix For: cdc-3.2.0
>
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>
> By integrating the MaxCompute DataSink, we enable the precise and efficient 
> synchronization of data from Flink's Change Data Capture (CDC) into 
> MaxCompute.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-18 Thread via GitHub


Zakelly commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1571721125


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+import org.apache.flink.runtime.state.v2.InternalValueState;
+import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
+
+import org.rocksdb.ColumnFamilyHandle;
+
+import java.io.IOException;
+
+/**
+ * The {@link InternalValueState} implement for ForStDB.
+ *
+ * @param  The type of the key.
+ * @param  The type of the value.
+ */
+public class ForStValueState extends InternalValueState
+implements ValueState, ForStInnerTable, V> {
+
+/** The column family which this internal value state belongs to. */
+private final ColumnFamilyHandle columnFamilyHandle;
+
+/** The serialized key builder which need be thread-safe. */
+private final ThreadLocal> 
serializedKeyBuilder;
+
+/** The data outputStream used for value serializer, which need be 
thread-safe. */
+private final ThreadLocal valueSerializerView;
+
+/** The data inputStream used for value deserializer, which need be 
thread-safe. */
+private final ThreadLocal valueDeserializerView;
+
+public ForStValueState(
+StateRequestHandler stateRequestHandler,
+ColumnFamilyHandle columnFamily,
+ValueStateDescriptor valueStateDescriptor,
+ThreadLocal> serializedKeyBuilder,
+ThreadLocal valueSerializerView,
+ThreadLocal valueDeserializerView) {
+super(stateRequestHandler, valueStateDescriptor);
+this.columnFamilyHandle = columnFamily;
+this.serializedKeyBuilder = serializedKeyBuilder;
+this.valueSerializerView = valueSerializerView;
+this.valueDeserializerView = valueDeserializerView;
+}
+
+@Override
+public ColumnFamilyHandle getColumnFamilyHandle() {
+return columnFamilyHandle;
+}
+
+@Override
+public byte[] serializeKey(ContextKey contextKey) throws IOException {

Review Comment:
   Should `ContextKey` be thread-safe? It seems the 'read cache or serialize' 
logic better be called only once for each context key.



##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStNativeMultiGetOperation.java:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The native multiGet operation implementation for ForStDB.
+ *
+ * @param  The type of key in get access request.
+ * @param  The type of value in get access request.
+ */
+public class ForStNativeMultiGetOperation implements 
ForStDBOperation> {

Review Comment:
   We can introduce this later   and may be excluded from this PR?



##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java:
##
@@ -0,0 

[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-18 Thread Vadim Vararu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838853#comment-17838853
 ] 

Vadim Vararu commented on FLINK-35115:
--

[~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, right?

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35154) Javadoc aggregate fails

2024-04-18 Thread Dmitriy Linevich (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838529#comment-17838529
 ] 

Dmitriy Linevich commented on FLINK-35154:
--

[~MartijnVisser] [~trohrmann] 

Hi,

Please, assign me to this task

> Javadoc aggregate fails
> ---
>
> Key: FLINK-35154
> URL: https://issues.apache.org/jira/browse/FLINK-35154
> Project: Flink
>  Issue Type: Bug
>Reporter: Dmitriy Linevich
>Priority: Minor
>
> Javadoc plugin fails with error cannot find symbol. Using
> {code:java}
> javadoc:aggregate{code}
> ERROR:
> !image-2024-04-18-15-20-56-467.png!
>  
> Need to increase version of javadoc plugin from 2.9.1 to 2.10.4 (In current 
> version exists bug same with 
> [this|https://github.com/checkstyle/checkstyle/issues/291])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35154) Javadoc aggregate fails

2024-04-18 Thread Dmitriy Linevich (Jira)
Dmitriy Linevich created FLINK-35154:


 Summary: Javadoc aggregate fails
 Key: FLINK-35154
 URL: https://issues.apache.org/jira/browse/FLINK-35154
 Project: Flink
  Issue Type: Bug
Reporter: Dmitriy Linevich


Javadoc plugin fails with error cannot find symbol. Using
{code:java}
javadoc:aggregate{code}
ERROR:

!image-2024-04-18-15-20-56-467.png!

 

Need to increase version of javadoc plugin from 2.9.1 to 2.10.4 (In current 
version exists bug same with 
[this|https://github.com/checkstyle/checkstyle/issues/291])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35154) Javadoc aggregate fails

2024-04-18 Thread Dmitriy Linevich (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitriy Linevich updated FLINK-35154:
-
Description: 
Javadoc plugin fails with error "cannot find symbol". Using
{code:java}
javadoc:aggregate{code}
ERROR:

!image-2024-04-18-15-20-56-467.png!

 

Need to increase version of javadoc plugin from 2.9.1 to 2.10.4 (In current 
version exists bug same with 
[this|https://github.com/checkstyle/checkstyle/issues/291])

  was:
Javadoc plugin fails with error cannot find symbol. Using
{code:java}
javadoc:aggregate{code}
ERROR:

!image-2024-04-18-15-20-56-467.png!

 

Need to increase version of javadoc plugin from 2.9.1 to 2.10.4 (In current 
version exists bug same with 
[this|https://github.com/checkstyle/checkstyle/issues/291])


> Javadoc aggregate fails
> ---
>
> Key: FLINK-35154
> URL: https://issues.apache.org/jira/browse/FLINK-35154
> Project: Flink
>  Issue Type: Bug
>Reporter: Dmitriy Linevich
>Priority: Minor
>
> Javadoc plugin fails with error "cannot find symbol". Using
> {code:java}
> javadoc:aggregate{code}
> ERROR:
> !image-2024-04-18-15-20-56-467.png!
>  
> Need to increase version of javadoc plugin from 2.9.1 to 2.10.4 (In current 
> version exists bug same with 
> [this|https://github.com/checkstyle/checkstyle/issues/291])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33460][Connector/JDBC] Support property authentication connection. [flink-connector-jdbc]

2024-04-18 Thread via GitHub


RocMarshal commented on PR #115:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/115#issuecomment-2063308586

   hi, @snuyanzin @eskabetxe Could you help to have a review if you had the 
free time ?
   Thank you~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35154) Javadoc aggregate fails

2024-04-18 Thread Dmitriy Linevich (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitriy Linevich updated FLINK-35154:
-
Description: 
Javadoc plugin fails with error "cannot find symbol". Using
{code:java}
javadoc:aggregate{code}
ERROR:
{code:java}
[WARNING] The requested profile "include-hadoop" could not be activated because 
it does not exist.
[WARNING] The requested profile "arm" could not be activated because it does 
not exist.
[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-javadoc-plugin:2.9.1:aggregate (default-cli) on 
project flink-parent: An error has occurred in JavaDocs report generation: 
[ERROR] Exit code: 1 - 
/{flinkProjectDir}/flink-end-to-end-tests/flink-confluent-schema-registry/target/generated-sources/example/avro/EventType.java:8:
 error: type GenericEnumSymbol does not take parameters
[ERROR] public enum EventType implements 
org.apache.avro.generic.GenericEnumSymbol {
[ERROR]    {code}
                                                                       ^

 

Need to increase version of javadoc plugin from 2.9.1 to 2.10.4 (In current 
version exists bug same with 
[this|https://github.com/checkstyle/checkstyle/issues/291])

  was:
Javadoc plugin fails with error "cannot find symbol". Using
{code:java}
javadoc:aggregate{code}
ERROR:

!image-2024-04-18-15-20-56-467.png!

 

Need to increase version of javadoc plugin from 2.9.1 to 2.10.4 (In current 
version exists bug same with 
[this|https://github.com/checkstyle/checkstyle/issues/291])


> Javadoc aggregate fails
> ---
>
> Key: FLINK-35154
> URL: https://issues.apache.org/jira/browse/FLINK-35154
> Project: Flink
>  Issue Type: Bug
>Reporter: Dmitriy Linevich
>Priority: Minor
>
> Javadoc plugin fails with error "cannot find symbol". Using
> {code:java}
> javadoc:aggregate{code}
> ERROR:
> {code:java}
> [WARNING] The requested profile "include-hadoop" could not be activated 
> because it does not exist.
> [WARNING] The requested profile "arm" could not be activated because it does 
> not exist.
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-javadoc-plugin:2.9.1:aggregate (default-cli) 
> on project flink-parent: An error has occurred in JavaDocs report generation: 
> [ERROR] Exit code: 1 - 
> /{flinkProjectDir}/flink-end-to-end-tests/flink-confluent-schema-registry/target/generated-sources/example/avro/EventType.java:8:
>  error: type GenericEnumSymbol does not take parameters
> [ERROR] public enum EventType implements 
> org.apache.avro.generic.GenericEnumSymbol {
> [ERROR]    {code}
>                                                                        ^
>  
> Need to increase version of javadoc plugin from 2.9.1 to 2.10.4 (In current 
> version exists bug same with 
> [this|https://github.com/checkstyle/checkstyle/issues/291])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35115][Connectors/Kinesis] Allow kinesis consumer to snapshotState after operator had been cancelled [flink-connector-aws]

2024-04-18 Thread via GitHub


dannycranmer commented on code in PR #138:
URL: 
https://github.com/apache/flink-connector-aws/pull/138#discussion_r1570265098


##
flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java:
##
@@ -312,6 +316,148 @@ public void testListStateChangedAfterSnapshotState() 
throws Exception {
 }
 }
 
+@Test
+public void testSnapshotStateDuringStopWithSavepoint() throws Exception {
+
+// 
--
+// setup config, initial state and expected state snapshot
+// 
--
+Properties config = TestUtils.getStandardProperties();
+
+ArrayList> initialState = 
new ArrayList<>(1);
+initialState.add(
+Tuple2.of(
+KinesisDataFetcher.convertToStreamShardMetadata(
+new StreamShardHandle(
+"fakeStream1",
+new Shard()
+.withShardId(
+KinesisShardIdGenerator
+
.generateFromShardOrder(0,
+new SequenceNumber("11")));
+
+ArrayList> 
expectedStateSnapshot1 =
+new ArrayList<>(1);
+expectedStateSnapshot1.add(
+Tuple2.of(
+KinesisDataFetcher.convertToStreamShardMetadata(
+new StreamShardHandle(
+"fakeStream1",
+new Shard()
+.withShardId(
+KinesisShardIdGenerator
+
.generateFromShardOrder(0,
+new SequenceNumber("12")));
+ArrayList> 
expectedStateSnapshot2 =
+new ArrayList<>(1);
+expectedStateSnapshot2.add(
+Tuple2.of(
+KinesisDataFetcher.convertToStreamShardMetadata(
+new StreamShardHandle(
+"fakeStream1",
+new Shard()
+.withShardId(
+KinesisShardIdGenerator
+
.generateFromShardOrder(0,
+new SequenceNumber("13")));
+
+// 
--
+// mock operator state backend and initial state for initializeState()
+// 
--
+
+TestingListState> 
listState =
+new TestingListState<>();
+for (Tuple2 state : initialState) 
{
+listState.add(state);
+}
+
+OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);

Review Comment:
   Since Mockito is already used in this calss, and this connector is on the 
deprecation path I ok to use it. However, the test is very long and it is hard 
to know what is going on. Can we break out into methods and reduce the 
complexity of the test case for readability.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35124) Connector Release Fails to run Checkstyle

2024-04-18 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer reassigned FLINK-35124:
-

Assignee: Danny Cranmer

> Connector Release Fails to run Checkstyle
> -
>
> Key: FLINK-35124
> URL: https://issues.apache.org/jira/browse/FLINK-35124
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Danny Cranmer
>Assignee: Danny Cranmer
>Priority: Major
>
> During a release of the AWS connectors the build was failing at the 
> \{{./tools/releasing/shared/stage_jars.sh}} step due to a checkstyle error.
>  
> {code:java}
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-checkstyle-plugin:3.1.2:check (validate) on 
> project flink-connector-aws: Failed during checkstyle execution: Unable to 
> find suppressions file at location: /tools/maven/suppressions.xml: Could not 
> find resource '/tools/maven/suppressions.xml'. -> [Help 1] {code}
>  
> Looks like it is caused by this 
> [https://github.com/apache/flink-connector-shared-utils/commit/a75b89ee3f8c9a03e97ead2d0bd9d5b7bb02b51a]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35088) watermark alignment maxAllowedWatermarkDrift and updateInterval param need check

2024-04-18 Thread elon_X (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838534#comment-17838534
 ] 

elon_X commented on FLINK-35088:


[~martijnvisser] [~masc]  I'm sorry for the late reply. I have conducted a 
retest based on Flink version 1.18 and found that the problem still persists. 
Then, I checked the latest code on the main Flink branch and found that there 
is no validation for these two parameters. What are your thoughts on this?

> watermark alignment maxAllowedWatermarkDrift and updateInterval param need 
> check
> 
>
> Key: FLINK-35088
> URL: https://issues.apache.org/jira/browse/FLINK-35088
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core, Runtime / Coordination
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-11-20-12-29-951.png
>
>
> When I use watermark alignment,
> 1.I found that setting maxAllowedWatermarkDrift to a negative number 
> initially led me to believe it could support delaying the consumption of the 
> source, so I tried it. Then, the upstream data flow would hang indefinitely.
> Root cause:
> {code:java}
> long maxAllowedWatermark = globalCombinedWatermark.getTimestamp()             
>     + watermarkAlignmentParams.getMaxAllowedWatermarkDrift();  {code}
> If maxAllowedWatermarkDrift is negative, SourceOperator: maxAllowedWatermark 
> < lastEmittedWatermark, then the SourceReader will be blocked indefinitely 
> and cannot recover.
> I'm not sure if this is a supported feature of watermark alignment. If it's 
> not, I think an additional parameter validation should be implemented to 
> throw an exception on the client side if the value is negative.
> 2.The updateInterval parameter also lacks validation. If I set it to 0, the 
> task will throw an exception when starting the job manager. The JDK class 
> java.util.concurrent.ScheduledThreadPoolExecutor performs the validation and 
> throws the exception.
> {code:java}
> java.lang.IllegalArgumentException: null
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:565)
>  ~[?:1.8.0_351]
>   at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:191)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:333)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:59)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:42)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:201)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:195)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:529)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:494)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830)
>  

[jira] [Commented] (FLINK-35124) Connector Release Fails to run Checkstyle

2024-04-18 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838537#comment-17838537
 ] 

Danny Cranmer commented on FLINK-35124:
---

This resulted in bad source archives being generated for JDBC and MongoDB 
connectors. I am going to revert this change for now:
 * [https://lists.apache.org/thread/q6dmc5dbz7kcfvpo99pj2sh5mzhffgl5]
 * [https://lists.apache.org/thread/s18g7obgp4sbdtl73571976vqvy1ftk8]

> Connector Release Fails to run Checkstyle
> -
>
> Key: FLINK-35124
> URL: https://issues.apache.org/jira/browse/FLINK-35124
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Danny Cranmer
>Assignee: Danny Cranmer
>Priority: Major
>
> During a release of the AWS connectors the build was failing at the 
> \{{./tools/releasing/shared/stage_jars.sh}} step due to a checkstyle error.
>  
> {code:java}
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-checkstyle-plugin:3.1.2:check (validate) on 
> project flink-connector-aws: Failed during checkstyle execution: Unable to 
> find suppressions file at location: /tools/maven/suppressions.xml: Could not 
> find resource '/tools/maven/suppressions.xml'. -> [Help 1] {code}
>  
> Looks like it is caused by this 
> [https://github.com/apache/flink-connector-shared-utils/commit/a75b89ee3f8c9a03e97ead2d0bd9d5b7bb02b51a]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35124) Connector Release Fails to run Checkstyle

2024-04-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35124:
---
Labels: pull-request-available  (was: )

> Connector Release Fails to run Checkstyle
> -
>
> Key: FLINK-35124
> URL: https://issues.apache.org/jira/browse/FLINK-35124
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Danny Cranmer
>Assignee: Danny Cranmer
>Priority: Major
>  Labels: pull-request-available
>
> During a release of the AWS connectors the build was failing at the 
> \{{./tools/releasing/shared/stage_jars.sh}} step due to a checkstyle error.
>  
> {code:java}
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-checkstyle-plugin:3.1.2:check (validate) on 
> project flink-connector-aws: Failed during checkstyle execution: Unable to 
> find suppressions file at location: /tools/maven/suppressions.xml: Could not 
> find resource '/tools/maven/suppressions.xml'. -> [Help 1] {code}
>  
> Looks like it is caused by this 
> [https://github.com/apache/flink-connector-shared-utils/commit/a75b89ee3f8c9a03e97ead2d0bd9d5b7bb02b51a]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35124] Include Maven build configuration in the pristine source clone [flink-connector-shared-utils]

2024-04-18 Thread via GitHub


dannycranmer opened a new pull request, #40:
URL: https://github.com/apache/flink-connector-shared-utils/pull/40

   Reverting 
https://github.com/apache/flink-connector-shared-utils/commit/a75b89ee3f8c9a03e97ead2d0bd9d5b7bb02b51a
 to ensure Maven build configuration is included in the source archive 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35002) GitHub action request timeout to ArtifactService

2024-04-18 Thread Ryan Skraba (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Skraba updated FLINK-35002:

Summary: GitHub action request timeout  to ArtifactService  (was: GitHub 
action/upload-artifact@v4 can timeout)

> GitHub action request timeout  to ArtifactService
> -
>
> Key: FLINK-35002
> URL: https://issues.apache.org/jira/browse/FLINK-35002
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Ryan Skraba
>Priority: Major
>  Labels: github-actions, test-stability
>
> A timeout can occur when uploading a successfully built artifact:
>  * [https://github.com/apache/flink/actions/runs/8516411871/job/23325392650]
> {code:java}
> 2024-04-02T02:20:15.6355368Z With the provided path, there will be 1 file 
> uploaded
> 2024-04-02T02:20:15.6360133Z Artifact name is valid!
> 2024-04-02T02:20:15.6362872Z Root directory input is valid!
> 2024-04-02T02:20:20.6975036Z Attempt 1 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 3000 ms...
> 2024-04-02T02:20:28.7084937Z Attempt 2 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 4785 ms...
> 2024-04-02T02:20:38.5015936Z Attempt 3 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 7375 ms...
> 2024-04-02T02:20:50.8901508Z Attempt 4 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 14988 ms...
> 2024-04-02T02:21:10.9028438Z ##[error]Failed to CreateArtifact: Failed to 
> make request after 5 attempts: Request timeout: 
> /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact
> 2024-04-02T02:22:59.9893296Z Post job cleanup.
> 2024-04-02T02:22:59.9958844Z Post job cleanup. {code}
> (This is unlikely to be something we can fix, but we can track it.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35154) Javadoc aggregate fails

2024-04-18 Thread Dmitriy Linevich (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitriy Linevich updated FLINK-35154:
-
Description: 
Javadoc plugin fails with error. Using
{code:java}
javadoc:aggregate{code}
ERROR:
{code:java}
[WARNING] The requested profile "include-hadoop" could not be activated because 
it does not exist.
[WARNING] The requested profile "arm" could not be activated because it does 
not exist.
[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-javadoc-plugin:2.9.1:aggregate (default-cli) on 
project flink-parent: An error has occurred in JavaDocs report generation: 
[ERROR] Exit code: 1 - 
/{flinkProjectDir}/flink-end-to-end-tests/flink-confluent-schema-registry/target/generated-sources/example/avro/EventType.java:8:
 error: type GenericEnumSymbol does not take parameters
[ERROR] public enum EventType implements 
org.apache.avro.generic.GenericEnumSymbol {
[ERROR]    {code}
 

For our flink 1.17 ERROR
{code:java}
[ERROR] 
/{flinkProjectDir}/flink-connectors/flink-sql-connector-hive-3.1.3/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java:21:
 error: cannot find symbol
[ERROR] import static 
org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME;
[ERROR] ^
[ERROR]   symbol:   static DEFAULT_DATABASE_NAME
[ERROR]   location: class{code}
 

Need to increase version of javadoc plugin from 2.9.1 to 2.10.4 (In current 
version exists bug same with 
[this|https://github.com/checkstyle/checkstyle/issues/291])

  was:
Javadoc plugin fails with error. Using
{code:java}
javadoc:aggregate{code}
ERROR:
{code:java}
[WARNING] The requested profile "include-hadoop" could not be activated because 
it does not exist.
[WARNING] The requested profile "arm" could not be activated because it does 
not exist.
[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-javadoc-plugin:2.9.1:aggregate (default-cli) on 
project flink-parent: An error has occurred in JavaDocs report generation: 
[ERROR] Exit code: 1 - 
/{flinkProjectDir}/flink-end-to-end-tests/flink-confluent-schema-registry/target/generated-sources/example/avro/EventType.java:8:
 error: type GenericEnumSymbol does not take parameters
[ERROR] public enum EventType implements 
org.apache.avro.generic.GenericEnumSymbol {
[ERROR]    {code}
 

For our flink 1.17 ERROR

 
{code:java}
[ERROR] 
/{flinkProjectDir}/flink-connectors/flink-sql-connector-hive-3.1.3/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java:21:
 error: cannot find symbol
[ERROR] import static 
org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME;
[ERROR] ^
[ERROR]   symbol:   static DEFAULT_DATABASE_NAME
[ERROR]   location: class{code}
 

Need to increase version of javadoc plugin from 2.9.1 to 2.10.4 (In current 
version exists bug same with 
[this|https://github.com/checkstyle/checkstyle/issues/291])


> Javadoc aggregate fails
> ---
>
> Key: FLINK-35154
> URL: https://issues.apache.org/jira/browse/FLINK-35154
> Project: Flink
>  Issue Type: Bug
>Reporter: Dmitriy Linevich
>Priority: Minor
>
> Javadoc plugin fails with error. Using
> {code:java}
> javadoc:aggregate{code}
> ERROR:
> {code:java}
> [WARNING] The requested profile "include-hadoop" could not be activated 
> because it does not exist.
> [WARNING] The requested profile "arm" could not be activated because it does 
> not exist.
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-javadoc-plugin:2.9.1:aggregate (default-cli) 
> on project flink-parent: An error has occurred in JavaDocs report generation: 
> [ERROR] Exit code: 1 - 
> /{flinkProjectDir}/flink-end-to-end-tests/flink-confluent-schema-registry/target/generated-sources/example/avro/EventType.java:8:
>  error: type GenericEnumSymbol does not take parameters
> [ERROR] public enum EventType implements 
> org.apache.avro.generic.GenericEnumSymbol {
> [ERROR]    {code}
>  
> For our flink 1.17 ERROR
> {code:java}
> [ERROR] 
> /{flinkProjectDir}/flink-connectors/flink-sql-connector-hive-3.1.3/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java:21:
>  error: cannot find symbol
> [ERROR] import static 
> org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME;
> [ERROR] ^
> [ERROR]   symbol:   static DEFAULT_DATABASE_NAME
> [ERROR]   location: class{code}
>  
> Need to increase version of javadoc plugin from 2.9.1 to 2.10.4 (In current 
> version exists bug same with 
> [this|https://github.com/checkstyle/checkstyle/issues/291])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35002) GitHub action request timeout to ArtifactService

2024-04-18 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838541#comment-17838541
 ] 

Ryan Skraba edited comment on FLINK-35002 at 4/18/24 8:55 AM:
--

I changed the title; requests to the artifact service can timeout during other 
stages of the build than just uploading, and it seems like the same network 
issue.

1.19 Java 17 / E2E (group 2) 
https://github.com/apache/flink/commit/a2c3d27f5dced2ba73307e8230cd07a11b26c401/checks/23956874905/logs
 

During **Download build artifacts from compile job**:

{code:java}
2024-04-18T02:20:57.1951531Z ##[group]Run actions/download-artifact@v4
2024-04-18T02:20:57.1952046Z with:
2024-04-18T02:20:57.1952529Z   name: build-artifacts-nightly-beta-java17-229
2024-04-18T02:20:57.1953033Z   path: /home/runner/work/flink/flink
2024-04-18T02:20:57.1953552Z   merge-multiple: false
2024-04-18T02:20:57.1953902Z   repository: apache/flink
2024-04-18T02:20:57.1954306Z   run-id: 8731358696
2024-04-18T02:20:57.1954704Z env:
2024-04-18T02:20:57.1955038Z   MOUNTED_WORKING_DIR: /__w/flink/flink
2024-04-18T02:20:57.1955517Z   CONTAINER_LOCAL_WORKING_DIR: /root/flink
2024-04-18T02:20:57.1956053Z   FLINK_ARTIFACT_DIR: /home/runner/work/flink/flink
2024-04-18T02:20:57.1956590Z   FLINK_ARTIFACT_FILENAME: flink_artifacts.tar.gz
2024-04-18T02:20:57.1957184Z   MAVEN_REPO_FOLDER: 
/home/runner/work/flink/flink/.m2/repository
2024-04-18T02:20:57.1957943Z   MAVEN_ARGS: 
-Dmaven.repo.local=/home/runner/work/flink/flink/.m2/repository
2024-04-18T02:20:57.1958748Z   DOCKER_IMAGES_CACHE_FOLDER: 
/home/runner/work/flink/flink/.docker-cache
2024-04-18T02:20:57.1959387Z   GHA_JOB_TIMEOUT: 310
2024-04-18T02:20:57.1959867Z   E2E_CACHE_FOLDER: 
/home/runner/work/flink/flink/.e2e-cache
2024-04-18T02:20:57.1960480Z   E2E_TARBALL_CACHE: 
/home/runner/work/flink/flink/.e2e-tar-cache
2024-04-18T02:20:57.1961116Z   GHA_PIPELINE_START_TIME: 2024-04-18 
02:19:06+00:00
2024-04-18T02:20:57.1961649Z   JAVA_HOME: /usr/lib/jvm/temurin-17-jdk-amd64
2024-04-18T02:20:57.1963231Z   PATH: 
/usr/lib/jvm/temurin-17-jdk-amd64/bin:/snap/bin:/home/runner/.local/bin:/opt/pipx_bin:/home/runner/.cargo/bin:/home/runner/.config/composer/vendor/bin:/usr/local/.ghcup/bin:/home/runner/.dotnet/tools:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin
2024-04-18T02:20:57.1964849Z ##[endgroup]
2024-04-18T02:20:57.3842499Z Downloading single artifact
2024-04-18T02:21:02.4187408Z Attempt 1 of 5 failed with error: Request timeout: 
/twirp/github.actions.results.api.v1.ArtifactService/ListArtifacts. Retrying 
request in 3000 ms...
2024-04-18T02:21:10.4281352Z Attempt 2 of 5 failed with error: Request timeout: 
/twirp/github.actions.results.api.v1.ArtifactService/ListArtifacts. Retrying 
request in 4605 ms...
2024-04-18T02:21:20.0388024Z Attempt 3 of 5 failed with error: Request timeout: 
/twirp/github.actions.results.api.v1.ArtifactService/ListArtifacts. Retrying 
request in 8717 ms...
2024-04-18T02:21:33.7715121Z Attempt 4 of 5 failed with error: Request timeout: 
/twirp/github.actions.results.api.v1.ArtifactService/ListArtifacts. Retrying 
request in 12219 ms...
2024-04-18T02:21:51.0125881Z ##[error]Unable to download artifact(s): Failed to 
ListArtifacts: Failed to make request after 5 attempts: Request timeout: 
/twirp/github.actions.results.api.v1.ArtifactService/ListArtifacts
{code}
 


was (Author: ryanskraba):
I changed the title; requests to the artifact service can timeout during other 
stages of the build than just uploading, and it seems like the same network 
issue.

During **Download build artifacts from compile job**:

{code:java}
2024-04-18T02:20:57.1951531Z ##[group]Run actions/download-artifact@v4
2024-04-18T02:20:57.1952046Z with:
2024-04-18T02:20:57.1952529Z   name: build-artifacts-nightly-beta-java17-229
2024-04-18T02:20:57.1953033Z   path: /home/runner/work/flink/flink
2024-04-18T02:20:57.1953552Z   merge-multiple: false
2024-04-18T02:20:57.1953902Z   repository: apache/flink
2024-04-18T02:20:57.1954306Z   run-id: 8731358696
2024-04-18T02:20:57.1954704Z env:
2024-04-18T02:20:57.1955038Z   MOUNTED_WORKING_DIR: /__w/flink/flink
2024-04-18T02:20:57.1955517Z   CONTAINER_LOCAL_WORKING_DIR: /root/flink
2024-04-18T02:20:57.1956053Z   FLINK_ARTIFACT_DIR: /home/runner/work/flink/flink
2024-04-18T02:20:57.1956590Z   FLINK_ARTIFACT_FILENAME: flink_artifacts.tar.gz
2024-04-18T02:20:57.1957184Z   MAVEN_REPO_FOLDER: 
/home/runner/work/flink/flink/.m2/repository
2024-04-18T02:20:57.1957943Z   MAVEN_ARGS: 
-Dmaven.repo.local=/home/runner/work/flink/flink/.m2/repository
2024-04-18T02:20:57.1958748Z   DOCKER_IMAGES_CACHE_FOLDER: 
/home/runner/work/flink/flink/.docker-cache
2024-04-18T02:20:57.1959387Z   GHA_JOB_TIMEOUT: 310
2024-04-18T02:20:57.1959867Z   E2E_CACHE_FOLDER: 
/home/runner/work/flink/flink/.e2e-cache
2024-04-18T02:20:57.1960480Z   

[jira] [Commented] (FLINK-35002) GitHub action request timeout to ArtifactService

2024-04-18 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838541#comment-17838541
 ] 

Ryan Skraba commented on FLINK-35002:
-

I changed the title; requests to the artifact service can timeout during other 
stages of the build than just uploading, and it seems like the same network 
issue.

During **Download build artifacts from compile job**:

{code:java}
2024-04-18T02:20:57.1951531Z ##[group]Run actions/download-artifact@v4
2024-04-18T02:20:57.1952046Z with:
2024-04-18T02:20:57.1952529Z   name: build-artifacts-nightly-beta-java17-229
2024-04-18T02:20:57.1953033Z   path: /home/runner/work/flink/flink
2024-04-18T02:20:57.1953552Z   merge-multiple: false
2024-04-18T02:20:57.1953902Z   repository: apache/flink
2024-04-18T02:20:57.1954306Z   run-id: 8731358696
2024-04-18T02:20:57.1954704Z env:
2024-04-18T02:20:57.1955038Z   MOUNTED_WORKING_DIR: /__w/flink/flink
2024-04-18T02:20:57.1955517Z   CONTAINER_LOCAL_WORKING_DIR: /root/flink
2024-04-18T02:20:57.1956053Z   FLINK_ARTIFACT_DIR: /home/runner/work/flink/flink
2024-04-18T02:20:57.1956590Z   FLINK_ARTIFACT_FILENAME: flink_artifacts.tar.gz
2024-04-18T02:20:57.1957184Z   MAVEN_REPO_FOLDER: 
/home/runner/work/flink/flink/.m2/repository
2024-04-18T02:20:57.1957943Z   MAVEN_ARGS: 
-Dmaven.repo.local=/home/runner/work/flink/flink/.m2/repository
2024-04-18T02:20:57.1958748Z   DOCKER_IMAGES_CACHE_FOLDER: 
/home/runner/work/flink/flink/.docker-cache
2024-04-18T02:20:57.1959387Z   GHA_JOB_TIMEOUT: 310
2024-04-18T02:20:57.1959867Z   E2E_CACHE_FOLDER: 
/home/runner/work/flink/flink/.e2e-cache
2024-04-18T02:20:57.1960480Z   E2E_TARBALL_CACHE: 
/home/runner/work/flink/flink/.e2e-tar-cache
2024-04-18T02:20:57.1961116Z   GHA_PIPELINE_START_TIME: 2024-04-18 
02:19:06+00:00
2024-04-18T02:20:57.1961649Z   JAVA_HOME: /usr/lib/jvm/temurin-17-jdk-amd64
2024-04-18T02:20:57.1963231Z   PATH: 
/usr/lib/jvm/temurin-17-jdk-amd64/bin:/snap/bin:/home/runner/.local/bin:/opt/pipx_bin:/home/runner/.cargo/bin:/home/runner/.config/composer/vendor/bin:/usr/local/.ghcup/bin:/home/runner/.dotnet/tools:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin
2024-04-18T02:20:57.1964849Z ##[endgroup]
2024-04-18T02:20:57.3842499Z Downloading single artifact
2024-04-18T02:21:02.4187408Z Attempt 1 of 5 failed with error: Request timeout: 
/twirp/github.actions.results.api.v1.ArtifactService/ListArtifacts. Retrying 
request in 3000 ms...
2024-04-18T02:21:10.4281352Z Attempt 2 of 5 failed with error: Request timeout: 
/twirp/github.actions.results.api.v1.ArtifactService/ListArtifacts. Retrying 
request in 4605 ms...
2024-04-18T02:21:20.0388024Z Attempt 3 of 5 failed with error: Request timeout: 
/twirp/github.actions.results.api.v1.ArtifactService/ListArtifacts. Retrying 
request in 8717 ms...
2024-04-18T02:21:33.7715121Z Attempt 4 of 5 failed with error: Request timeout: 
/twirp/github.actions.results.api.v1.ArtifactService/ListArtifacts. Retrying 
request in 12219 ms...
2024-04-18T02:21:51.0125881Z ##[error]Unable to download artifact(s): Failed to 
ListArtifacts: Failed to make request after 5 attempts: Request timeout: 
/twirp/github.actions.results.api.v1.ArtifactService/ListArtifacts
{code}
 

> GitHub action request timeout  to ArtifactService
> -
>
> Key: FLINK-35002
> URL: https://issues.apache.org/jira/browse/FLINK-35002
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Ryan Skraba
>Priority: Major
>  Labels: github-actions, test-stability
>
> A timeout can occur when uploading a successfully built artifact:
>  * [https://github.com/apache/flink/actions/runs/8516411871/job/23325392650]
> {code:java}
> 2024-04-02T02:20:15.6355368Z With the provided path, there will be 1 file 
> uploaded
> 2024-04-02T02:20:15.6360133Z Artifact name is valid!
> 2024-04-02T02:20:15.6362872Z Root directory input is valid!
> 2024-04-02T02:20:20.6975036Z Attempt 1 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 3000 ms...
> 2024-04-02T02:20:28.7084937Z Attempt 2 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 4785 ms...
> 2024-04-02T02:20:38.5015936Z Attempt 3 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 7375 ms...
> 2024-04-02T02:20:50.8901508Z Attempt 4 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 14988 ms...
> 2024-04-02T02:21:10.9028438Z ##[error]Failed to CreateArtifact: Failed to 
> make request after 5 attempts: Request timeout: 
> 

[jira] [Updated] (FLINK-35154) Javadoc aggregate fails

2024-04-18 Thread Dmitriy Linevich (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitriy Linevich updated FLINK-35154:
-
Description: 
Javadoc plugin fails with error "cannot find symbol". Using
{code:java}
javadoc:aggregate{code}
ERROR:
{code:java}
[WARNING] The requested profile "include-hadoop" could not be activated because 
it does not exist.
[WARNING] The requested profile "arm" could not be activated because it does 
not exist.
[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-javadoc-plugin:2.9.1:aggregate (default-cli) on 
project flink-parent: An error has occurred in JavaDocs report generation: 
[ERROR] Exit code: 1 - 
/{flinkProjectDir}/flink-end-to-end-tests/flink-confluent-schema-registry/target/generated-sources/example/avro/EventType.java:8:
 error: type GenericEnumSymbol does not take parameters
[ERROR] public enum EventType implements 
org.apache.avro.generic.GenericEnumSymbol {
[ERROR]    {code}
 

For our flink 1.17 ERROR

 
{code:java}
[ERROR] 
/{flinkProjectDir}/flink-connectors/flink-sql-connector-hive-3.1.3/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java:21:
 error: cannot find symbol
[ERROR] import static 
org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME;
[ERROR] ^
[ERROR]   symbol:   static DEFAULT_DATABASE_NAME
[ERROR]   location: class{code}
 

Need to increase version of javadoc plugin from 2.9.1 to 2.10.4 (In current 
version exists bug same with 
[this|https://github.com/checkstyle/checkstyle/issues/291])

  was:
Javadoc plugin fails with error "cannot find symbol". Using
{code:java}
javadoc:aggregate{code}
ERROR:
{code:java}
[WARNING] The requested profile "include-hadoop" could not be activated because 
it does not exist.
[WARNING] The requested profile "arm" could not be activated because it does 
not exist.
[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-javadoc-plugin:2.9.1:aggregate (default-cli) on 
project flink-parent: An error has occurred in JavaDocs report generation: 
[ERROR] Exit code: 1 - 
/{flinkProjectDir}/flink-end-to-end-tests/flink-confluent-schema-registry/target/generated-sources/example/avro/EventType.java:8:
 error: type GenericEnumSymbol does not take parameters
[ERROR] public enum EventType implements 
org.apache.avro.generic.GenericEnumSymbol {
[ERROR]    {code}
                                                                       ^

 

Need to increase version of javadoc plugin from 2.9.1 to 2.10.4 (In current 
version exists bug same with 
[this|https://github.com/checkstyle/checkstyle/issues/291])


> Javadoc aggregate fails
> ---
>
> Key: FLINK-35154
> URL: https://issues.apache.org/jira/browse/FLINK-35154
> Project: Flink
>  Issue Type: Bug
>Reporter: Dmitriy Linevich
>Priority: Minor
>
> Javadoc plugin fails with error "cannot find symbol". Using
> {code:java}
> javadoc:aggregate{code}
> ERROR:
> {code:java}
> [WARNING] The requested profile "include-hadoop" could not be activated 
> because it does not exist.
> [WARNING] The requested profile "arm" could not be activated because it does 
> not exist.
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-javadoc-plugin:2.9.1:aggregate (default-cli) 
> on project flink-parent: An error has occurred in JavaDocs report generation: 
> [ERROR] Exit code: 1 - 
> /{flinkProjectDir}/flink-end-to-end-tests/flink-confluent-schema-registry/target/generated-sources/example/avro/EventType.java:8:
>  error: type GenericEnumSymbol does not take parameters
> [ERROR] public enum EventType implements 
> org.apache.avro.generic.GenericEnumSymbol {
> [ERROR]    {code}
>  
> For our flink 1.17 ERROR
>  
> {code:java}
> [ERROR] 
> /{flinkProjectDir}/flink-connectors/flink-sql-connector-hive-3.1.3/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java:21:
>  error: cannot find symbol
> [ERROR] import static 
> org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME;
> [ERROR] ^
> [ERROR]   symbol:   static DEFAULT_DATABASE_NAME
> [ERROR]   location: class{code}
>  
> Need to increase version of javadoc plugin from 2.9.1 to 2.10.4 (In current 
> version exists bug same with 
> [this|https://github.com/checkstyle/checkstyle/issues/291])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35154) Javadoc aggregate fails

2024-04-18 Thread Dmitriy Linevich (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitriy Linevich updated FLINK-35154:
-
Description: 
Javadoc plugin fails with error. Using
{code:java}
javadoc:aggregate{code}
ERROR:
{code:java}
[WARNING] The requested profile "include-hadoop" could not be activated because 
it does not exist.
[WARNING] The requested profile "arm" could not be activated because it does 
not exist.
[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-javadoc-plugin:2.9.1:aggregate (default-cli) on 
project flink-parent: An error has occurred in JavaDocs report generation: 
[ERROR] Exit code: 1 - 
/{flinkProjectDir}/flink-end-to-end-tests/flink-confluent-schema-registry/target/generated-sources/example/avro/EventType.java:8:
 error: type GenericEnumSymbol does not take parameters
[ERROR] public enum EventType implements 
org.apache.avro.generic.GenericEnumSymbol {
[ERROR]    {code}
 

For our flink 1.17 ERROR

 
{code:java}
[ERROR] 
/{flinkProjectDir}/flink-connectors/flink-sql-connector-hive-3.1.3/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java:21:
 error: cannot find symbol
[ERROR] import static 
org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME;
[ERROR] ^
[ERROR]   symbol:   static DEFAULT_DATABASE_NAME
[ERROR]   location: class{code}
 

Need to increase version of javadoc plugin from 2.9.1 to 2.10.4 (In current 
version exists bug same with 
[this|https://github.com/checkstyle/checkstyle/issues/291])

  was:
Javadoc plugin fails with error "cannot find symbol". Using
{code:java}
javadoc:aggregate{code}
ERROR:
{code:java}
[WARNING] The requested profile "include-hadoop" could not be activated because 
it does not exist.
[WARNING] The requested profile "arm" could not be activated because it does 
not exist.
[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-javadoc-plugin:2.9.1:aggregate (default-cli) on 
project flink-parent: An error has occurred in JavaDocs report generation: 
[ERROR] Exit code: 1 - 
/{flinkProjectDir}/flink-end-to-end-tests/flink-confluent-schema-registry/target/generated-sources/example/avro/EventType.java:8:
 error: type GenericEnumSymbol does not take parameters
[ERROR] public enum EventType implements 
org.apache.avro.generic.GenericEnumSymbol {
[ERROR]    {code}
 

For our flink 1.17 ERROR

 
{code:java}
[ERROR] 
/{flinkProjectDir}/flink-connectors/flink-sql-connector-hive-3.1.3/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java:21:
 error: cannot find symbol
[ERROR] import static 
org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME;
[ERROR] ^
[ERROR]   symbol:   static DEFAULT_DATABASE_NAME
[ERROR]   location: class{code}
 

Need to increase version of javadoc plugin from 2.9.1 to 2.10.4 (In current 
version exists bug same with 
[this|https://github.com/checkstyle/checkstyle/issues/291])


> Javadoc aggregate fails
> ---
>
> Key: FLINK-35154
> URL: https://issues.apache.org/jira/browse/FLINK-35154
> Project: Flink
>  Issue Type: Bug
>Reporter: Dmitriy Linevich
>Priority: Minor
>
> Javadoc plugin fails with error. Using
> {code:java}
> javadoc:aggregate{code}
> ERROR:
> {code:java}
> [WARNING] The requested profile "include-hadoop" could not be activated 
> because it does not exist.
> [WARNING] The requested profile "arm" could not be activated because it does 
> not exist.
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-javadoc-plugin:2.9.1:aggregate (default-cli) 
> on project flink-parent: An error has occurred in JavaDocs report generation: 
> [ERROR] Exit code: 1 - 
> /{flinkProjectDir}/flink-end-to-end-tests/flink-confluent-schema-registry/target/generated-sources/example/avro/EventType.java:8:
>  error: type GenericEnumSymbol does not take parameters
> [ERROR] public enum EventType implements 
> org.apache.avro.generic.GenericEnumSymbol {
> [ERROR]    {code}
>  
> For our flink 1.17 ERROR
>  
> {code:java}
> [ERROR] 
> /{flinkProjectDir}/flink-connectors/flink-sql-connector-hive-3.1.3/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java:21:
>  error: cannot find symbol
> [ERROR] import static 
> org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME;
> [ERROR] ^
> [ERROR]   symbol:   static DEFAULT_DATABASE_NAME
> [ERROR]   location: class{code}
>  
> Need to increase version of javadoc plugin from 2.9.1 to 2.10.4 (In current 
> version exists bug same with 
> [this|https://github.com/checkstyle/checkstyle/issues/291])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35088) watermark alignment maxAllowedWatermarkDrift and updateInterval param need check

2024-04-18 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838543#comment-17838543
 ] 

Martijn Visser commented on FLINK-35088:


[~fanrui] What are your thoughts on this?

> watermark alignment maxAllowedWatermarkDrift and updateInterval param need 
> check
> 
>
> Key: FLINK-35088
> URL: https://issues.apache.org/jira/browse/FLINK-35088
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core, Runtime / Coordination
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-11-20-12-29-951.png
>
>
> When I use watermark alignment,
> 1.I found that setting maxAllowedWatermarkDrift to a negative number 
> initially led me to believe it could support delaying the consumption of the 
> source, so I tried it. Then, the upstream data flow would hang indefinitely.
> Root cause:
> {code:java}
> long maxAllowedWatermark = globalCombinedWatermark.getTimestamp()             
>     + watermarkAlignmentParams.getMaxAllowedWatermarkDrift();  {code}
> If maxAllowedWatermarkDrift is negative, SourceOperator: maxAllowedWatermark 
> < lastEmittedWatermark, then the SourceReader will be blocked indefinitely 
> and cannot recover.
> I'm not sure if this is a supported feature of watermark alignment. If it's 
> not, I think an additional parameter validation should be implemented to 
> throw an exception on the client side if the value is negative.
> 2.The updateInterval parameter also lacks validation. If I set it to 0, the 
> task will throw an exception when starting the job manager. The JDK class 
> java.util.concurrent.ScheduledThreadPoolExecutor performs the validation and 
> throws the exception.
> {code:java}
> java.lang.IllegalArgumentException: null
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:565)
>  ~[?:1.8.0_351]
>   at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:191)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:333)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:59)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:42)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:201)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:195)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:529)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:494)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> 

[jira] [Updated] (FLINK-35153) Internal Async State Implementation and StateDescriptor for Map/List State

2024-04-18 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan updated FLINK-35153:

Parent: FLINK-34974
Issue Type: Sub-task  (was: Bug)

> Internal Async State Implementation and StateDescriptor for Map/List State
> --
>
> Key: FLINK-35153
> URL: https://issues.apache.org/jira/browse/FLINK-35153
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Zakelly Lan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35153) Internal Async State Implementation and StateDescriptor for Map/List State

2024-04-18 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35153:
---

 Summary: Internal Async State Implementation and StateDescriptor 
for Map/List State
 Key: FLINK-35153
 URL: https://issues.apache.org/jira/browse/FLINK-35153
 Project: Flink
  Issue Type: Bug
Reporter: Zakelly Lan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35028][runtime] Timer firing under async execution model [flink]

2024-04-18 Thread via GitHub


fredia commented on PR #24672:
URL: https://github.com/apache/flink/pull/24672#issuecomment-2063204411

   @Zakelly @yunfengzhou-hub Thanks for the detailed review, I updated the PR 
and addressed some comments, would you please take a look again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35151) Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is full

2024-04-18 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838499#comment-17838499
 ] 

Leonard Xu commented on FLINK-35151:


Thanks [~pacinogong]for the report, [~ruanhang1993] Would you like to take a 
look this issue?

> Flink mysql cdc will  stuck when suspend binlog split and ChangeEventQueue is 
> full
> --
>
> Key: FLINK-35151
> URL: https://issues.apache.org/jira/browse/FLINK-35151
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
> Environment: I use master branch reproduce it.
>Reporter: Xin Gong
>Priority: Major
> Attachments: dumpstack.txt
>
>
> Flink mysql cdc will  stuck when suspend binlog split and ChangeEventQueue is 
> full.
> Reason is that producing binlog is too fast.  
> MySqlSplitReader#suspendBinlogReaderIfNeed will execute 
> BinlogSplitReader#stopBinlogReadTask to set 
> currentTaskRunning to be false after MysqSourceReader receives binlog split 
> update event.
> MySqlSplitReader#pollSplitRecords is executed and 
> dataIt is null to execute closeBinlogReader when currentReader is 
> BinlogSplitReader. closeBinlogReader will execute 
> statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. 
> Because BinaryLogClient#connectLock is not release  when 
> MySqlStreamingChangeEventSource add element to full queue.
>  
> You can set StatefulTaskContext#queue to be 1 and run UT 
> NewlyAddedTableITCase#testRemoveAndAddNewTable.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35097) Table API Filesystem connector with 'raw' format repeats last line

2024-04-18 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther reassigned FLINK-35097:


Assignee: Kumar Mallikarjuna

> Table API Filesystem connector with 'raw' format repeats last line
> --
>
> Key: FLINK-35097
> URL: https://issues.apache.org/jira/browse/FLINK-35097
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.17.1
> Environment: I ran the above test with 1.17.1. I checked for existing 
> bug tickets and release notes, but did not find anything, so assuming this 
> effects 1.18 and 1.19.
>Reporter: David Perkins
>Assignee: Kumar Mallikarjuna
>Priority: Major
>  Labels: pull-request-available
>
> When using the Filesystem connector with 'raw' format to read text data that 
> contains new lines, a row is returned for every line, but always contains the 
> contents of the last line.
> For example, with the following file.
> {quote}
> line 1
> line 2
> line 3
> {quote}
> And table definition
> {quote}
> create TABLE MyRawTable (
>  `doc` string,
>  ) WITH (
>   'path' = 'file:///path/to/data',
>   'format' = 'raw',
>'connector' = 'filesystem'
> );
> {quote}
> Selecting `*` from the table produces three rows all with "line 3" for `doc`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35097][table] Fix 'raw' format deserialization [flink]

2024-04-18 Thread via GitHub


twalthr commented on code in PR #24661:
URL: https://github.com/apache/flink/pull/24661#discussion_r1570166387


##
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatSerDeSchemaTest.java:
##
@@ -197,12 +247,12 @@ public static TestSpec type(DataType fieldType) {
 return new TestSpec(fieldType);
 }
 
-public TestSpec value(Object value) {
-this.value = value;
+public TestSpec values(Object[] values) {

Review Comment:
   make this a var arg to avoid the need for `new X[]{}` in the test specs. 
this will improve code readibility.



##
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatSerDeSchemaTest.java:
##
@@ -197,12 +247,12 @@ public static TestSpec type(DataType fieldType) {
 return new TestSpec(fieldType);
 }
 
-public TestSpec value(Object value) {
-this.value = value;
+public TestSpec values(Object[] values) {
+this.values = values;
 return this;
 }
 
-public TestSpec binary(byte[] bytes) {
+public TestSpec binary(byte[][] bytes) {

Review Comment:
   same as above, make this a var arg



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35153) Internal Async State Implementation and StateDescriptor for Map/List State

2024-04-18 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan updated FLINK-35153:

Component/s: Runtime / State Backends

> Internal Async State Implementation and StateDescriptor for Map/List State
> --
>
> Key: FLINK-35153
> URL: https://issues.apache.org/jira/browse/FLINK-35153
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Zakelly Lan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]

2024-04-18 Thread via GitHub


gaborgsomogyi commented on code in PR #789:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1570203474


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##
@@ -129,8 +133,10 @@ public boolean scaleResource(
 scalingSummaries,
 autoScalerEventHandler);
 
-if (scalingWouldExceedClusterResources(
-configOverrides.newConfigWithOverrides(conf),
+var memoryTuningEnabled = 
conf.get(AutoScalerOptions.MEMORY_TUNING_ENABLED);
+if (scalingWouldExceedMaxResources(
+memoryTuningEnabled ? 
configOverrides.newConfigWithOverrides(conf) : conf,

Review Comment:
   Forgotten, but now I've added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35115][Connectors/Kinesis] Allow kinesis consumer to snapshotState after operator had been cancelled [flink-connector-aws]

2024-04-18 Thread via GitHub


vahmed-hamdy commented on code in PR #138:
URL: 
https://github.com/apache/flink-connector-aws/pull/138#discussion_r1570201177


##
flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java:
##
@@ -312,6 +316,148 @@ public void testListStateChangedAfterSnapshotState() 
throws Exception {
 }
 }
 
+@Test
+public void testSnapshotStateDuringStopWithSavepoint() throws Exception {
+
+// 
--
+// setup config, initial state and expected state snapshot
+// 
--
+Properties config = TestUtils.getStandardProperties();
+
+ArrayList> initialState = 
new ArrayList<>(1);
+initialState.add(
+Tuple2.of(
+KinesisDataFetcher.convertToStreamShardMetadata(
+new StreamShardHandle(
+"fakeStream1",
+new Shard()
+.withShardId(
+KinesisShardIdGenerator
+
.generateFromShardOrder(0,
+new SequenceNumber("11")));
+
+ArrayList> 
expectedStateSnapshot1 =
+new ArrayList<>(1);
+expectedStateSnapshot1.add(
+Tuple2.of(
+KinesisDataFetcher.convertToStreamShardMetadata(
+new StreamShardHandle(
+"fakeStream1",
+new Shard()
+.withShardId(
+KinesisShardIdGenerator
+
.generateFromShardOrder(0,
+new SequenceNumber("12")));
+ArrayList> 
expectedStateSnapshot2 =
+new ArrayList<>(1);
+expectedStateSnapshot2.add(
+Tuple2.of(
+KinesisDataFetcher.convertToStreamShardMetadata(
+new StreamShardHandle(
+"fakeStream1",
+new Shard()
+.withShardId(
+KinesisShardIdGenerator
+
.generateFromShardOrder(0,
+new SequenceNumber("13")));
+
+// 
--
+// mock operator state backend and initial state for initializeState()
+// 
--
+
+TestingListState> 
listState =
+new TestingListState<>();
+for (Tuple2 state : initialState) 
{
+listState.add(state);
+}
+
+OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);

Review Comment:
   It is against [coding standards to use 
mockito](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#avoid-mockito---use-reusable-test-implementations),
 aware that the standard is already broken in this test suite but I believe we 
shouldn't add more debt.
   Can we try using an [existing test util 
instead](https://github.com/apache/flink/blob/43a3d50ce3982b9abf04b81407fed46c5c25f819/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java#L34)
   
   Also we can extend the existing `TestableFlinkKinesisConsumer` hierarchy



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34444] Initial implementation of JM operator metric rest api [flink]

2024-04-18 Thread via GitHub


mas-chen commented on code in PR #24564:
URL: https://github.com/apache/flink/pull/24564#discussion_r1570041925


##
docs/static/generated/rest_v1_dispatcher.yml:
##
@@ -1089,6 +1089,37 @@ paths:
 application/json:
   schema:
 $ref: '#/components/schemas/JobVertexBackPressureInfo'
+  /jobs/{jobid}/vertices/{vertexid}/coordinator-metrics:
+get:
+  description: Provides access to job manager operator metrics

Review Comment:
   Yes, introducing the operator id would make it difficult for the UI to 
integrate since metrics APIs rely on the vertexid at the lowest level of 
granularity.
   
   >When focusing on the coordinator's metrics for autoscaling purposes, how 
will API users distinguish these from other metrics retrieved from 
/jm-operator-metrics? Can be sure that the metrics of interest are always 
uniquely identified by their names, preventing any overlap with those emitted 
by other operators?
   
   Ah yes, thanks for the suggestion! It would be possible if we formatted the 
metric by `.`, which we already do in the PR! I 
think we are aligned so I'll go ahead and make the changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35028][runtime] Timer firing under async execution model [flink]

2024-04-18 Thread via GitHub


fredia commented on code in PR #24672:
URL: https://github.com/apache/flink/pull/24672#discussion_r1570075307


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java:
##
@@ -117,6 +120,7 @@ public class InternalTimerServiceImpl implements 
InternalTimerService {
 startIdx = Math.min(keyGroupIdx, startIdx);
 }
 this.localKeyGroupRangeStartIdx = startIdx;
+this.processingTimeCallback = this::onProcessingTime;

Review Comment:
   Good suggestion, override `onProcessingTime` and delete 
`processingTimeCallback` now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35152) Flink CDC Doris/Starrocks Sink Auto create table event should support setting auto partition fields for each table

2024-04-18 Thread tumengyao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tumengyao updated FLINK-35152:
--
Description: 
In some scenarios, when creating a physical table in Doris, appropriate 
partition fields need to be selected to speed up the efficiency of data query 
and calculation. In addition, partition tables support more applications, such 
as hot and cold data layering and so on.


The current Flink CDC Doris Sink's create table event creates a table with no 
partitions set.


The Auto Partition function supported by doris 2.1.x simplifies the creation 
and management of partitions. We just need to add some configuration items to 
the Flink CDC job. To tell Flink CDC which fields Doris Sink will use in the 
create table event to create partitions, you can get a partition table in Doris.

Here's an example:
source: Mysql
source_table:
CREATE TABLE table1 (
col1 INT AUTO_INCREMENT PRIMARY KEY,
col2 DECIMAL(18, 2),
col3 VARCHAR(500),
col4 TEXT,
col5 DATETIME DEFAULT CURRENT_TIMESTAMP
);


If you want to specify the partition of table test.table1, you need to add 
sink-table-partition-keys , sink-table-partition-type information ,, to 
mysql_to_doris.yaml

route:

source-table: test.table1
sink-table:ods.ods_table1
sink-table-partition-key:col5
sink-table-partition-func-call-expr:date_trunc(`col5`, 'month')
sink-table-partition-type:auto range

The auto range partition in Doris 2.1.x does not support null partitions. So 
you need to set test.table1.col5 == null then '1990-01-01 00:00:00' else 
test.table1.col5 end

Now after submitting the mysql_to_doris.ymal Flink CDC job, an ods.ods_table1 
data table should appear in the Doris database
The data table DDL is as follows:
CREATE TABLE table1 (
col1 INT ,
col5 DATETIME not null,
col2 DECIMAL(18, 2),
col3 VARCHAR(500),
col4 STRING
) unique KEY(`col1`,`col5`)
AUTO PARTITION BY RANGE date_trunc(`col5`, 'month')()
DISTRIBUTED BY HASH (`id`) BUCKETS AUTO
PROPERTIES (
...
);

  was:
In some scenarios, when creating a physical table in Doris, appropriate 
partition fields need to be selected to speed up the efficiency of data query 
and calculation. In addition, partition tables support more applications, such 
as hot and cold data layering and so on.
The current Flink CDC Doris Sink's create table event creates a table with no 
partitions set.
The Auto Partition function supported by doris 2.1.x simplifies the creation 
and management of partitions. We just need to add some configuration items to 
the Flink CDC job. To tell Flink CDC which fields Doris Sink will use in the 
create table event to create partitions, you can get a partition table in Doris.

Here's an example:
source: Mysql
source_table:
CREATE TABLE table1 (
col1 INT AUTO_INCREMENT PRIMARY KEY,
col2 DECIMAL(18, 2),
col3 VARCHAR(500),
col4 TEXT,
col5 DATETIME DEFAULT CURRENT_TIMESTAMP
);
If you want to specify the partition of table test.table1, you need to add 
sink-table-partition-keys and sink-table-partition-type information to the 
mysql_to_doris
route:
- source-table: test.table1
sink-table:ods.ods_table1
sink-table-partition-key:col5
sink-table-partition-func-call-expr:date_trunc(`col5`, 'month')
sink-table-partition-type:auto range

The auto range partition in Doris 2.1.x does not support null partitions. So 
you need to set test.table1.col5 == null then '1990-01-01 00:00:00' else 
test.table1.col5 end

Now after submitting the mysql_to_doris.ymal Flink CDC job, an ods.ods_table1 
data table should appear in the Doris database
The data table DDL is as follows:
CREATE TABLE table1 (
col1 INT ,
col5 DATETIME not null,
col2 DECIMAL(18, 2),
col3 VARCHAR(500),
col4 TEXT
) unique KEY(`col1`,`col5`)
AUTO PARTITION BY RANGE date_trunc(`col5`, 'month')()
DISTRIBUTED BY HASH (`id`) BUCKETS AUTO
PROPERTIES (
...
);


> Flink CDC  Doris/Starrocks Sink Auto create table event should support 
> setting auto partition fields for each table
> ---
>
> Key: FLINK-35152
> URL: https://issues.apache.org/jira/browse/FLINK-35152
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: 3.1.0
>Reporter: tumengyao
>Priority: Minor
>  Labels: Doris
>
> In some scenarios, when creating a physical table in Doris, appropriate 
> partition fields need to be selected to speed up the efficiency of data query 
> and calculation. In addition, partition tables support more applications, 
> such as hot and cold data layering and so on.
> The current Flink CDC Doris Sink's create table event creates a table with no 
> partitions set.
> The Auto Partition function supported by doris 2.1.x simplifies the creation 
> and management of partitions. We just need to add some configuration items to 

Re: [PR] [FLINK-35045][state] Introduce ForStFlinkFileSystem to support reading and writing with ByteBuffer [flink]

2024-04-18 Thread via GitHub


masteryhx commented on code in PR #24632:
URL: https://github.com/apache/flink/pull/24632#discussion_r1570109129


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ByteBufferReadableFSDataInputStream.java:
##
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst.fs;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.PositionedReadable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * A {@link FSDataInputStream} delegates requests to other one and supports 
reading data with {@link
+ * ByteBuffer}.
+ *
+ * All methods in this class maybe used by ForSt, please start a discussion 
firstly if it has to
+ * be modified.
+ */
+public class ByteBufferReadableFSDataInputStream extends FSDataInputStream {
+
+private final FSDataInputStream originalInputStream;
+
+/**
+ * InputStream Pool which provides multiple input streams to random read 
concurrently. An input
+ * stream should only be used by a thread at a point in time.
+ */
+private final Queue readInputStreamPool;
+
+private final Callable inputStreamBuilder;
+
+public ByteBufferReadableFSDataInputStream(
+FSDataInputStream originalInputStream,
+Callable inputStreamBuilder,
+int inputStreamCapacity) {
+this.originalInputStream = originalInputStream;
+this.inputStreamBuilder = inputStreamBuilder;
+this.readInputStreamPool = new 
LinkedBlockingQueue<>(inputStreamCapacity);
+}
+
+/**
+ * Reads up to ByteBuffer#remaining bytes of data from the 
input stream into a
+ * ByteBuffer. Not Thread-safe yet since the interface of sequential read 
of ForSt only be
+ * accessed by one thread at a time.
+ *
+ * @param bb the buffer into which the data is read.
+ * @return the total number of bytes read into the buffer.
+ * @exception IOException If the first byte cannot be read for any reason 
other than end of
+ * file, or if the input stream has been closed, or if some other I/O 
error occurs.
+ * @exception NullPointerException If bb is null.
+ */
+public int readFully(ByteBuffer bb) throws IOException {
+if (bb == null) {
+throw new NullPointerException();
+} else if (bb.remaining() == 0) {
+return 0;
+}
+return readFullyFromFSDataInputStream(originalInputStream, bb);
+}
+
+/**
+ * Reads up to ByteBuffer#remaining bytes of data from the 
specific position of the
+ * input stream into a ByteBuffer. Tread-safe since the interface of 
random read of ForSt may be
+ * concurrently accessed by multiple threads. TODO: Support to split this 
method to other class.
+ *
+ * @param position the start offset in input stream at which the data is 
read.
+ * @param bb the buffer into which the data is read.
+ * @return the total number of bytes read into the buffer.
+ * @exception IOException If the first byte cannot be read for any reason 
other than end of
+ * file, or if the input stream has been closed, or if some other I/O 
error occurs.
+ * @exception NullPointerException If bb is null.
+ */
+public int readFully(long position, ByteBuffer bb) throws Exception {
+if (bb == null) {
+throw new NullPointerException();
+} else if (bb.remaining() == 0) {
+return 0;
+}
+
+FSDataInputStream fsDataInputStream = readInputStreamPool.poll();
+if (fsDataInputStream == null) {
+fsDataInputStream = inputStreamBuilder.call();

Review Comment:
   I think concurrent read may be a common case which is controlled in ForSt 
random read.
   I'd prefer to remain this currently, maybe we could test it later and give a 
better default value for it, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To 

Re: [PR] [FLINK-35028][runtime] Timer firing under async execution model [flink]

2024-04-18 Thread via GitHub


fredia commented on code in PR #24672:
URL: https://github.com/apache/flink/pull/24672#discussion_r1570140913


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.RecordContext;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import static 
org.apache.flink.runtime.asyncprocessing.KeyAccountingUnit.EMPTY_RECORD;
+
+/**
+ * An implementation of {@link InternalTimerService} that is used by {@link
+ * 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperator}.
+ * The timer service will set {@link RecordContext} for the timers before 
invoking action to
+ * preserve the execution order between timer firing and records processing.
+ *
+ * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Timers>FLIP-425
+ * timers section.
+ * @param  Type of timer's key.
+ * @param  Type of the namespace to which timers are scoped.
+ */
+public class InternalTimerServiceAsyncImpl extends 
InternalTimerServiceImpl {
+
+private AsyncExecutionController asyncExecutionController;
+
+InternalTimerServiceAsyncImpl(
+TaskIOMetricGroup taskIOMetricGroup,
+KeyGroupRange localKeyGroupRange,
+KeyContext keyContext,
+ProcessingTimeService processingTimeService,
+KeyGroupedInternalPriorityQueue processingTimeTimersQueue,
+KeyGroupedInternalPriorityQueue eventTimeTimersQueue,
+StreamTaskCancellationContext cancellationContext,
+AsyncExecutionController asyncExecutionController) {
+super(
+taskIOMetricGroup,
+localKeyGroupRange,
+keyContext,
+processingTimeService,
+processingTimeTimersQueue,
+eventTimeTimersQueue,
+cancellationContext);
+this.asyncExecutionController = asyncExecutionController;
+this.processingTimeCallback = this::onProcessingTime;
+}
+
+private void onProcessingTime(long time) throws Exception {
+// null out the timer in case the Triggerable calls 
registerProcessingTimeTimer()
+// inside the callback.
+nextTimer = null;
+
+InternalTimer timer;
+
+while ((timer = processingTimeTimersQueue.peek()) != null
+&& timer.getTimestamp() <= time
+&& !cancellationContext.isCancelled()) {
+RecordContext recordCtx =
+asyncExecutionController.buildContext(EMPTY_RECORD, 
timer.getKey());
+recordCtx.retain();
+asyncExecutionController.setCurrentContext(recordCtx);
+keyContext.setCurrentKey(timer.getKey());
+processingTimeTimersQueue.poll();
+final InternalTimer timerToTrigger = timer;
+asyncExecutionController.syncPointRequestWithCallback(
+() -> triggerTarget.onProcessingTime(timerToTrigger));
+taskIOMetricGroup.getNumFiredTimers().inc();
+recordCtx.release();
+}
+
+if (timer != null && nextTimer == null) {
+nextTimer =
+processingTimeService.registerTimer(
+timer.getTimestamp(), this::onProcessingTime);
+}
+}
+
+/**
+ * Advance one watermark, this will fire some event timers.
+ *
+ * @param time the time in watermark.
+ */
+@Override
+public void 

Re: [PR] [FLINK-34444] Initial implementation of JM operator metric rest api [flink]

2024-04-18 Thread via GitHub


mas-chen commented on code in PR #24564:
URL: https://github.com/apache/flink/pull/24564#discussion_r1570041925


##
docs/static/generated/rest_v1_dispatcher.yml:
##
@@ -1089,6 +1089,37 @@ paths:
 application/json:
   schema:
 $ref: '#/components/schemas/JobVertexBackPressureInfo'
+  /jobs/{jobid}/vertices/{vertexid}/coordinator-metrics:
+get:
+  description: Provides access to job manager operator metrics

Review Comment:
   Yes, introducing the operator id would make it difficult for the UI to 
integrate since metrics APIs rely on the vertexid at the lowest level of 
granularity.
   
   >When focusing on the coordinator's metrics for autoscaling purposes, how 
will API users distinguish these from other metrics retrieved from 
/jm-operator-metrics? Can be sure that the metrics of interest are always 
uniquely identified by their names, preventing any overlap with those emitted 
by other operators?
   Ah yes, thanks for the suggestion! It would be possible if we formatted the 
metric by `.`, which I already do in the PR!



##
docs/static/generated/rest_v1_dispatcher.yml:
##
@@ -1089,6 +1089,37 @@ paths:
 application/json:
   schema:
 $ref: '#/components/schemas/JobVertexBackPressureInfo'
+  /jobs/{jobid}/vertices/{vertexid}/coordinator-metrics:
+get:
+  description: Provides access to job manager operator metrics

Review Comment:
   Yes, introducing the operator id would make it difficult for the UI to 
integrate since metrics APIs rely on the vertexid at the lowest level of 
granularity.
   
   >When focusing on the coordinator's metrics for autoscaling purposes, how 
will API users distinguish these from other metrics retrieved from 
/jm-operator-metrics? Can be sure that the metrics of interest are always 
uniquely identified by their names, preventing any overlap with those emitted 
by other operators?
   
   Ah yes, thanks for the suggestion! It would be possible if we formatted the 
metric by `.`, which I already do in the PR!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34444] Initial implementation of JM operator metric rest api [flink]

2024-04-18 Thread via GitHub


mas-chen commented on code in PR #24564:
URL: https://github.com/apache/flink/pull/24564#discussion_r1570041925


##
docs/static/generated/rest_v1_dispatcher.yml:
##
@@ -1089,6 +1089,37 @@ paths:
 application/json:
   schema:
 $ref: '#/components/schemas/JobVertexBackPressureInfo'
+  /jobs/{jobid}/vertices/{vertexid}/coordinator-metrics:
+get:
+  description: Provides access to job manager operator metrics

Review Comment:
   Yes, introducing the operator id would make it difficult for the UI to 
integrate since metrics APIs rely on the vertexid at the lowest level of 
granularity.
   
   >When focusing on the coordinator's metrics for autoscaling purposes, how 
will API users distinguish these from other metrics retrieved from 
/jm-operator-metrics? Can be sure that the metrics of interest are always 
uniquely identified by their names, preventing any overlap with those emitted 
by other operators?
   
   Ah yes, thanks for the suggestion! It would be possible if we formatted the 
metric by `.`, which we already do in the PR!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31860] FlinkDeployments never finalize when namespace is deleted [flink-kubernetes-operator]

2024-04-18 Thread via GitHub


gyfora merged PR #817:
URL: https://github.com/apache/flink-kubernetes-operator/pull/817


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-31860) FlinkDeployments never finalize when namespace is deleted

2024-04-18 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-31860.
--
Fix Version/s: kubernetes-operator-1.9.0
 Assignee: Zhou JIANG  (was: Jayme Howard)
   Resolution: Fixed

merged to main 4293d58329af562e9c50216c3005b4577a289b90

> FlinkDeployments never finalize when namespace is deleted
> -
>
> Key: FLINK-31860
> URL: https://issues.apache.org/jira/browse/FLINK-31860
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.1
> Environment: Apache Flink Kubernetes Operator 1.3.1
> Kubernetes 1.24.9
>Reporter: Jayme Howard
>Assignee: Zhou JIANG
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: kubernetes-operator-1.9.0
>
>
> This appears to be a pretty straightforward issue, but I don't know the 
> codebase well enough to propose a fix.  When a FlinkDeployment is present in 
> a namespace, and the namespace is deleted, the FlinkDeployment never 
> reconciles and fails to complete its finalizer.  This leads to the namespace 
> being blocked from deletion indefinitely, requiring manual manipulation to 
> remove the finalizer on the FlinkDeployment.
>  
> Namespace conditions:
> {code:java}
> conditions:
> - lastTransitionTime: '2023-04-18T22:17:48Z'
>   message: All resources successfully discovered
>   reason: ResourcesDiscovered
>   status: 'False'
>   type: NamespaceDeletionDiscoveryFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: All legacy kube types successfully parsed
>   reason: ParsedGroupVersions
>   status: 'False'
>   type: NamespaceDeletionGroupVersionParsingFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: All content successfully deleted, may be waiting on finalization
>   reason: ContentDeleted
>   status: 'False'
>   type: NamespaceDeletionContentFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: 'Some resources are remaining: flinkdeployments.flink.apache.org 
> has 2
> resource instances'
>   reason: SomeResourcesRemain
>   status: 'True'
>   type: NamespaceContentRemaining
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: 'Some content in the namespace has finalizers remaining: 
> flinkdeployments.flink.apache.org/finalizer
> in 2 resource instances'
>   reason: SomeFinalizersRemain
>   status: 'True'
>   type: NamespaceFinalizersRemaining
> phase: Terminating {code}
> FlinkDeployment example (some fields redacted):
> {code:java}
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   creationTimestamp: '2023-03-23T18:27:02Z'
>   deletionGracePeriodSeconds: 0
>   deletionTimestamp: '2023-03-23T18:27:35Z'
>   finalizers:
>   - flinkdeployments.flink.apache.org/finalizer
>   generation: 3
>   name: 
>   namespace: 
>   resourceVersion: '10565277081'
>   uid: e50d2683-6c0c-467e-b10c-fe0f4e404692
> spec:
>   flinkConfiguration:
>     taskmanager.numberOfTaskSlots: '2'
>   flinkVersion: v1_16
>   image: 
>   job:
>     args: []
>     entryClass: 
>     jarURI: 
>     parallelism: 2
>     state: running
>     upgradeMode: stateless
>   jobManager:
>     replicas: 1
>     resource:
>       cpu: 1
>       memory: 2048m
>   logConfiguration:
>     log4j-console.properties: '# This affects logging for both user code and 
> Flink      rootLogger.level = INFO      rootLogger.appenderRef.console.ref = 
> ConsoleAppender      rootLogger.appenderRef.rolling.ref = RollingFileAppender 
>      # Uncomment this if you want to _only_ change Flink''s logging      
> #logger.flink.name = org.apache.flink      #logger.flink.level = INFO      # 
> The following lines keep the log level of common libraries/connectors on      
> # log level INFO. The root logger does not override this. You have to 
> manually      # change the log levels here.      logger.akka.name = akka      
> logger.akka.level = INFO      logger.kafka.name= org.apache.kafka      
> logger.kafka.level = INFO      logger.hadoop.name = org.apache.hadoop      
> logger.hadoop.level = INFO      logger.zookeeper.name = org.apache.zookeeper  
>     logger.zookeeper.level = INFO      # Log all infos to the console      
> appender.console.name = ConsoleAppender      appender.console.type = CONSOLE  
>     appender.console.layout.type = PatternLayout      
> appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
>       - %m%n      # Log all infos in the given rolling file      
> appender.rolling.name = RollingFileAppender      appender.rolling.type = 
> RollingFile      appender.rolling.append = false      
> appender.rolling.fileName = ${sys:log.file}      appender.rolling.filePattern 
> = ${sys:log.file}.%i      

Re: [PR] [hotfix][values] Temporary fix for ValuesDataSource stuck in infinite loop [flink-cdc]

2024-04-18 Thread via GitHub


lvyanquan commented on PR #3235:
URL: https://github.com/apache/flink-cdc/pull/3235#issuecomment-2063146409

   Looks like there are other places that use ValueSource with paralism > 1, 
and I'm trying to find out the reason of this failure. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35028][runtime] Timer firing under async execution model [flink]

2024-04-18 Thread via GitHub


fredia commented on code in PR #24672:
URL: https://github.com/apache/flink/pull/24672#discussion_r1570100169


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/KeyAccountingUnit.java:
##
@@ -32,6 +32,8 @@
  */
 public class KeyAccountingUnit {
 
+public static final Object EMPTY_RECORD = "EMPTY_RECORD";

Review Comment:
   The implementation of  `KeyAccountingUnit ` restricts the value(record) from 
being null.
   I changed the `EMPTY_RECORD` to `new Object()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35045][state] Introduce ForStFlinkFileSystem to support reading and writing with ByteBuffer [flink]

2024-04-18 Thread via GitHub


masteryhx commented on code in PR #24632:
URL: https://github.com/apache/flink/pull/24632#discussion_r1570103838


##
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:
##
@@ -140,4 +144,43 @@ public void skipFully(long bytes) throws IOException {
 bytes -= fsDataInputStream.skip(bytes);
 }
 }
+
+@Override
+public int read(ByteBuffer byteBuffer) throws IOException {
+// Not all internal stream supports ByteBufferReadable
+if 
(fsDataInputStream.hasCapability(StreamCapabilities.READBYTEBUFFER)) {
+return fsDataInputStream.read(byteBuffer);
+} else {
+// Fallback to read byte then put

Review Comment:
   Since position read may fallback to just use the method of position read 
with extra byte array in hadoop, I'd prefer to implement this logic without 
extra array overhead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35152) Flink CDC Doris/Starrocks Sink Auto create table event should support setting auto partition fields for each table

2024-04-18 Thread tumengyao (Jira)
tumengyao created FLINK-35152:
-

 Summary: Flink CDC  Doris/Starrocks Sink Auto create table event 
should support setting auto partition fields for each table
 Key: FLINK-35152
 URL: https://issues.apache.org/jira/browse/FLINK-35152
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: 3.1.0
Reporter: tumengyao


In some scenarios, when creating a physical table in Doris, appropriate 
partition fields need to be selected to speed up the efficiency of data query 
and calculation. In addition, partition tables support more applications, such 
as hot and cold data layering and so on.
The current Flink CDC Doris Sink's create table event creates a table with no 
partitions set.
The Auto Partition function supported by doris 2.1.x simplifies the creation 
and management of partitions. We just need to add some configuration items to 
the Flink CDC job. To tell Flink CDC which fields Doris Sink will use in the 
create table event to create partitions, you can get a partition table in Doris.

Here's an example:
source: Mysql
source_table:
CREATE TABLE table1 (
col1 INT AUTO_INCREMENT PRIMARY KEY,
col2 DECIMAL(18, 2),
col3 VARCHAR(500),
col4 TEXT,
col5 DATETIME DEFAULT CURRENT_TIMESTAMP
);
If you want to specify the partition of table test.table1, you need to add 
sink-table-partition-keys and sink-table-partition-type information to the 
mysql_to_doris
route:
- source-table: test.table1
sink-table:ods.ods_table1
sink-table-partition-key:col5
sink-table-partition-func-call-expr:date_trunc(`col5`, 'month')
sink-table-partition-type:auto range

The auto range partition in Doris 2.1.x does not support null partitions. So 
you need to set test.table1.col5 == null then '1990-01-01 00:00:00' else 
test.table1.col5 end

Now after submitting the mysql_to_doris.ymal Flink CDC job, an ods.ods_table1 
data table should appear in the Doris database
The data table DDL is as follows:
CREATE TABLE table1 (
col1 INT ,
col5 DATETIME not null,
col2 DECIMAL(18, 2),
col3 VARCHAR(500),
col4 TEXT
) unique KEY(`col1`,`col5`)
AUTO PARTITION BY RANGE date_trunc(`col5`, 'month')()
DISTRIBUTED BY HASH (`id`) BUCKETS AUTO
PROPERTIES (
...
);



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35045][state] Introduce ForStFlinkFileSystem to support reading and writing with ByteBuffer [flink]

2024-04-18 Thread via GitHub


masteryhx commented on code in PR #24632:
URL: https://github.com/apache/flink/pull/24632#discussion_r1570106555


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ByteBufferReadableFSDataInputStream.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst.fs;
+
+import org.apache.flink.core.fs.ByteBufferReadable;
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * A {@link FSDataInputStream} delegates requests to other one and supports 
reading data with {@link
+ * ByteBuffer}.
+ *
+ * All methods in this class maybe used by ForSt, please start a discussion 
firstly if it has to
+ * be modified.
+ */
+public class ByteBufferReadableFSDataInputStream extends FSDataInputStream {
+
+private final FSDataInputStream originalInputStream;
+
+/**
+ * InputStream Pool which provides multiple input streams to random read 
concurrently. An input
+ * stream should only be used by a thread at a point in time.
+ */
+private final Queue readInputStreamPool;
+
+private final Callable inputStreamBuilder;
+
+public ByteBufferReadableFSDataInputStream(
+Callable inputStreamBuilder, int 
inputStreamCapacity)
+throws IOException {
+try {
+this.originalInputStream = inputStreamBuilder.call();
+} catch (Exception e) {
+throw new IOException("Exception when build original input 
stream", e);
+}
+this.inputStreamBuilder = inputStreamBuilder;
+this.readInputStreamPool = new 
LinkedBlockingQueue<>(inputStreamCapacity);
+}
+
+/**
+ * Reads up to ByteBuffer#remaining bytes of data from the 
input stream into a
+ * ByteBuffer. Not Thread-safe yet since the interface of sequential read 
of ForSt only be
+ * accessed by one thread at a time.
+ *
+ * @param bb the buffer into which the data is read.
+ * @return the total number of bytes read into the buffer.
+ * @exception IOException If the first byte cannot be read for any reason 
other than end of
+ * file, or if the input stream has been closed, or if some other I/O 
error occurs.
+ * @exception NullPointerException If bb is null.
+ */
+public int readFully(ByteBuffer bb) throws IOException {
+if (bb == null) {
+throw new NullPointerException();
+} else if (bb.remaining() == 0) {
+return 0;
+}
+return originalInputStream instanceof ByteBufferReadable
+? ((ByteBufferReadable) originalInputStream).read(bb)
+: readFullyFromFSDataInputStream(originalInputStream, bb);
+}
+
+/**
+ * Reads up to ByteBuffer#remaining bytes of data from the 
specific position of the
+ * input stream into a ByteBuffer. Thread-safe since the interface of 
random read of ForSt may
+ * be concurrently accessed by multiple threads. TODO: Support to split 
this method to other
+ * class.
+ *
+ * @param position the start offset in input stream at which the data is 
read.
+ * @param bb the buffer into which the data is read.
+ * @return the total number of bytes read into the buffer.
+ * @exception IOException If the first byte cannot be read for any reason 
other than end of
+ * file, or if the input stream has been closed, or if some other I/O 
error occurs.
+ * @exception NullPointerException If bb is null.
+ */
+public int readFully(long position, ByteBuffer bb) throws Exception {

Review Comment:
   Thanks for the suggestion.
   Considering current version of ForSt relys on this method, I'd prefer to 
modify this when next version of ForSt is ready. WDYT ?
   I just added TODO on the above method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For 

[jira] [Updated] (FLINK-35152) Flink CDC Doris Sink Auto create table event should support setting auto partition fields for each table

2024-04-18 Thread tumengyao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tumengyao updated FLINK-35152:
--
Summary: Flink CDC  Doris Sink Auto create table event should support 
setting auto partition fields for each table  (was: Flink CDC  Doris/Starrocks 
Sink Auto create table event should support setting auto partition fields for 
each table)

> Flink CDC  Doris Sink Auto create table event should support setting auto 
> partition fields for each table
> -
>
> Key: FLINK-35152
> URL: https://issues.apache.org/jira/browse/FLINK-35152
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: 3.1.0
>Reporter: tumengyao
>Priority: Minor
>  Labels: Doris
>
> In some scenarios, when creating a physical table in Doris, appropriate 
> partition fields need to be selected to speed up the efficiency of data query 
> and calculation. In addition, partition tables support more applications, 
> such as hot and cold data layering and so on.
> The current Flink CDC Doris Sink's create table event creates a table with no 
> partitions set.
> The Auto Partition function supported by doris 2.1.x simplifies the creation 
> and management of partitions. We just need to add some configuration items to 
> the Flink CDC job. To tell Flink CDC which fields Doris Sink will use in the 
> create table event to create partitions, you can get a partition table in 
> Doris.
> Here's an example:
> source: Mysql
> source_table:
> CREATE TABLE table1 (
> col1 INT AUTO_INCREMENT PRIMARY KEY,
> col2 DECIMAL(18, 2),
> col3 VARCHAR(500),
> col4 TEXT,
> col5 DATETIME DEFAULT CURRENT_TIMESTAMP
> );
> If you want to specify the partition of table test.table1, you need to add 
> sink-table-partition-keys , sink-table-partition-type information ,, to 
> mysql_to_doris.yaml
> route:
> source-table: test.table1
> sink-table:ods.ods_table1
> sink-table-partition-key:col5
> sink-table-partition-func-call-expr:date_trunc(`col5`, 'month')
> sink-table-partition-type:auto range
> The auto range partition in Doris 2.1.x does not support null partitions. So 
> you need to set test.table1.col5 == null then '1990-01-01 00:00:00' else 
> test.table1.col5 end
> Now after submitting the mysql_to_doris.ymal Flink CDC job, an ods.ods_table1 
> data table should appear in the Doris database
> The data table DDL is as follows:
> CREATE TABLE table1 (
> col1 INT ,
> col5 DATETIME not null,
> col2 DECIMAL(18, 2),
> col3 VARCHAR(500),
> col4 STRING
> ) unique KEY(`col1`,`col5`)
> AUTO PARTITION BY RANGE date_trunc(`col5`, 'month')()
> DISTRIBUTED BY HASH (`id`) BUCKETS AUTO
> PROPERTIES (
> ...
> );



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34633][table] Support unnesting array constants [flink]

2024-04-18 Thread via GitHub


jeyhunkarimov commented on PR #24510:
URL: https://github.com/apache/flink/pull/24510#issuecomment-2063179382

   Thanks a lot for the review @LadyForest 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



<    1   2