Re: [PR] [FLINK-35072][cdc][doris] Support applying compatible `AlterColumnTypeEvent` to Doris sink [flink-cdc]

2024-04-14 Thread via GitHub


yuxiqian commented on code in PR #3215:
URL: https://github.com/apache/flink-cdc/pull/3215#discussion_r1565211096


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.doris.sink;
+
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.sink.schema.SchemaChangeManager;
+
+import java.io.IOException;
+
+/** Provides schema change operations based on {@link SchemaChangeManager}. */
+public class DorisSchemaChangeManager extends SchemaChangeManager {
+public DorisSchemaChangeManager(DorisOptions dorisOptions) {
+super(dorisOptions);
+}
+
+private static final String MODIFY_COLUMN_DDL = "ALTER TABLE %s MODIFY 
COLUMN %s %s";
+
+public boolean alterColumn(
+String database, String table, String columnName, String 
newColumnType)
+throws IOException, IllegalArgumentException {
+String tableIdentifier = String.format("%s.%s", database, table);
+FieldSchema alterFieldSchema = new FieldSchema(columnName, 
newColumnType, "");
+
+String alterColumnDDL =
+String.format(
+MODIFY_COLUMN_DDL,
+tableIdentifier,
+columnName,
+alterFieldSchema.getTypeString());
+
+try {
+return this.schemaChange(
+database, table, buildRequestParam(true, columnName), 
alterColumnDDL);
+} catch (RuntimeException ex) {
+if (ex.getMessage().contains("Nothing is changed. please check 
your alter stmt.")) {

Review Comment:
   @lvyanquan Sure, but this error message comes from [Doris server 
side](https://github.com/apache/doris/blob/debb83f35d37cac9089b2e0282e2d555ad9c1d62/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java#L1508),
 which is wrapped as a generic `DorisSchemaChangeException` on client-side, and 
could only be identified by checking error message. Any suggestions about this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34962) flink-connector-pulsa starts failed due to incorrect use of Pulsar API: LookupService. getPartitionedTopicMetadata

2024-04-14 Thread Yufan Sheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837080#comment-17837080
 ] 

Yufan Sheng commented on FLINK-34962:
-

[~Tison] This issue should be marked resolved.

> flink-connector-pulsa starts failed due to incorrect use of Pulsar API: 
> LookupService. getPartitionedTopicMetadata
> --
>
> Key: FLINK-34962
> URL: https://issues.apache.org/jira/browse/FLINK-34962
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: pulsar-4.2.0, pulsar-4.1.1
> Environment: * flink 1.17
>  * pulsar client 3.0.0
>  * org.apache.flink:flink-connector-pulsar:4.1.0-1.17 (connector)
>Reporter: Yubiao Feng
>Priority: Major
>  Labels: easyfix, pull-request-available
>
> - The unnecessary codes calls 
> `pulsarClient.getLookup().getPartitionedTopicMetadata()` to create the 
> partitioned topic metadata(in fact, this behavior of is not correct)
>   - Why it is unnecessary: the [following 
> code]([https://github.com/apache/flink-connector-pulsar/blob/main/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java#L245])
>  that is creating a producer will also trigger partitioned topic metadata to 
> create.
>  - The method `pulsarClient.getLookup().getPartitionedTopicMetadata()` will 
> not retry if the connection is closed so that users will get an error. The 
> following code creates a producer that will retry if the connection is 
> closed, reducing the probability of an error occurring.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35024][Runtime/State] Implement the record buffer of AsyncExecutionController [flink]

2024-04-14 Thread via GitHub


fredia merged PR #24633:
URL: https://github.com/apache/flink/pull/24633


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34414) EXACTLY_ONCE guarantee doesn't work properly for Flink/Pulsar connector

2024-04-14 Thread Yufan Sheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837079#comment-17837079
 ] 

Yufan Sheng commented on FLINK-34414:
-

I don't think this is a issue from my side. First of all 
{{.setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true)}} only works on the 
PulsarSink side. The source didn't use transaction for its poor performance, we 
have to drop the support of transaction in source. In your sample code, you 
have set {{.setConfig(PulsarSourceOptions.PULSAR_ACK_RECEIPT_ENABLED, true)}} 
and {{.setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, 
false)}}. This means the duplicated message could be get because the Pulsar 
can't the acknowledege from connector and resend the messages. So I think this 
is the root cause.

> EXACTLY_ONCE guarantee doesn't work properly for Flink/Pulsar connector 
> 
>
> Key: FLINK-34414
> URL: https://issues.apache.org/jira/browse/FLINK-34414
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.2
>Reporter: Rafał Trójczak
>Priority: Major
>
> Using Pulsar connector for Flink (version 4.1.0-1.17) with Flink job (version 
> 1.17.2) when there is an exception thrown within the job, the job gets 
> restarted, starts from the last checkpoint, but the sink writes to the output 
> more events than it should, even though the EXACT_ONCE guarantees are set 
> everywhere. To be more specific, here is my Job's flow:
>  * a Pulsar source that reads from the input topic,
>  * a simple processing function,
>  * and a sink that writes to the output topic.
> Here is a fragment of the source creation:
> {code:java}
> .setDeserializationSchema(Schema.AVRO(inClass), inClass)
> .setSubscriptionName(subscription)
> .enableSchemaEvolution()
> .setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true)
> .setConfig(PulsarSourceOptions.PULSAR_ACK_RECEIPT_ENABLED, true)
> .setConfig(PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS, 1)
> .setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, 
> false);
> {code}
> Here is the fragment of the sink creation:
> {code:java}
> .setSerializationSchema(Schema.AVRO(outClass), outClass)
> .setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true)
> .setConfig(PulsarSinkOptions.PULSAR_WRITE_DELIVERY_GUARANTEE, 
> DeliveryGuarantee.EXACTLY_ONCE)
> .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE);
> {code}
> And here is the Flink environment preparation:
> {code:java}
> environment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
> environment.enableCheckpointing(CHECKPOINTING_INTERVAL, 
> CheckpointingMode.EXACTLY_ONCE);
> {code}
> After sending 1000 events on the input topic, on the output topic I got 1048 
> events.
> I ran the job on my local Kubernetes cluster, using Kubernetes Flink Operator.
> Here is the MRE for this problem (mind that there is an internal dependency, 
> but it may be commented out together with the code that relies on it): 
> [https://github.com/trojczak/flink-pulsar-connector-problem]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35072][cdc][doris] Support applying compatible `AlterColumnTypeEvent` to Doris sink [flink-cdc]

2024-04-14 Thread via GitHub


yuxiqian commented on code in PR #3215:
URL: https://github.com/apache/flink-cdc/pull/3215#discussion_r1565211096


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.doris.sink;
+
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.sink.schema.SchemaChangeManager;
+
+import java.io.IOException;
+
+/** Provides schema change operations based on {@link SchemaChangeManager}. */
+public class DorisSchemaChangeManager extends SchemaChangeManager {
+public DorisSchemaChangeManager(DorisOptions dorisOptions) {
+super(dorisOptions);
+}
+
+private static final String MODIFY_COLUMN_DDL = "ALTER TABLE %s MODIFY 
COLUMN %s %s";
+
+public boolean alterColumn(
+String database, String table, String columnName, String 
newColumnType)
+throws IOException, IllegalArgumentException {
+String tableIdentifier = String.format("%s.%s", database, table);
+FieldSchema alterFieldSchema = new FieldSchema(columnName, 
newColumnType, "");
+
+String alterColumnDDL =
+String.format(
+MODIFY_COLUMN_DDL,
+tableIdentifier,
+columnName,
+alterFieldSchema.getTypeString());
+
+try {
+return this.schemaChange(
+database, table, buildRequestParam(true, columnName), 
alterColumnDDL);
+} catch (RuntimeException ex) {
+if (ex.getMessage().contains("Nothing is changed. please check 
your alter stmt.")) {

Review Comment:
   @lvyanquan Sure, but this error message comes from [Doris server 
side](https://github.com/apache/doris/blob/debb83f35d37cac9089b2e0282e2d555ad9c1d62/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java#L1508),
 which is wrapped as a generic `RuntimeException` on client-side, and could 
only be identified by checking error message. Any suggestions about this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35072][cdc][doris] Support applying compatible `AlterColumnTypeEvent` to Doris sink [flink-cdc]

2024-04-14 Thread via GitHub


yuxiqian commented on code in PR #3215:
URL: https://github.com/apache/flink-cdc/pull/3215#discussion_r1565211096


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.doris.sink;
+
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.sink.schema.SchemaChangeManager;
+
+import java.io.IOException;
+
+/** Provides schema change operations based on {@link SchemaChangeManager}. */
+public class DorisSchemaChangeManager extends SchemaChangeManager {
+public DorisSchemaChangeManager(DorisOptions dorisOptions) {
+super(dorisOptions);
+}
+
+private static final String MODIFY_COLUMN_DDL = "ALTER TABLE %s MODIFY 
COLUMN %s %s";
+
+public boolean alterColumn(
+String database, String table, String columnName, String 
newColumnType)
+throws IOException, IllegalArgumentException {
+String tableIdentifier = String.format("%s.%s", database, table);
+FieldSchema alterFieldSchema = new FieldSchema(columnName, 
newColumnType, "");
+
+String alterColumnDDL =
+String.format(
+MODIFY_COLUMN_DDL,
+tableIdentifier,
+columnName,
+alterFieldSchema.getTypeString());
+
+try {
+return this.schemaChange(
+database, table, buildRequestParam(true, columnName), 
alterColumnDDL);
+} catch (RuntimeException ex) {
+if (ex.getMessage().contains("Nothing is changed. please check 
your alter stmt.")) {

Review Comment:
   @lvyanquan Sure, but this error message comes from [Doris API server 
side](https://github.com/apache/doris/blob/debb83f35d37cac9089b2e0282e2d555ad9c1d62/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java#L1508),
 which is wrapped as a generic `RuntimeException` on client-side, and could 
only be identified by checking error message. Any suggestions about this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-33884) Update Pulsar dependency to 3.0.2 in Pulsar Connector

2024-04-14 Thread Zili Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zili Chen resolved FLINK-33884.
---
Fix Version/s: pulsar-4.2.0
   Resolution: Fixed

Master via 
https://github.com/apache/flink-connector-pulsar/commit/9f4b902c2a478d0105eec1e32bac3ea40f318d00

> Update Pulsar dependency to 3.0.2 in Pulsar Connector
> -
>
> Key: FLINK-33884
> URL: https://issues.apache.org/jira/browse/FLINK-33884
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Pulsar
>Affects Versions: pulsar-4.0.1
>Reporter: David Christle
>Assignee: David Christle
>Priority: Major
>  Labels: pull-request-available
> Fix For: pulsar-4.2.0
>
>
> The [3.0.2 
> patch|https://pulsar.apache.org/release-notes/versioned/pulsar-3.0.2/] 
> includes various bug fixes, including a few for the Pulsar client (e.g. 
> [link]([https://github.com/apache/pulsar/pull/21144)). Upgrading the 
> dependency in the connector will pick up these fixes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33884] Update Pulsar dependency to 3.0.2 in Pulsar Connector [flink-connector-pulsar]

2024-04-14 Thread via GitHub


tisonkun merged PR #72:
URL: https://github.com/apache/flink-connector-pulsar/pull/72


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-34629) Pulsar source lost topic subscribe

2024-04-14 Thread Zili Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zili Chen resolved FLINK-34629.
---
Fix Version/s: pulsar-4.2.0
   Resolution: Fixed

Master via 
https://github.com/apache/flink-connector-pulsar/commit/7a5eef268cb3f598589ad9cc32648ac92fbbee1d

> Pulsar source lost topic subscribe
> --
>
> Key: FLINK-34629
> URL: https://issues.apache.org/jira/browse/FLINK-34629
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: pulsar-3.0.1
>Reporter: WangMinChao
>Assignee: WangMinChao
>Priority: Major
>  Labels: pull-request-available
> Fix For: pulsar-4.2.0
>
>
> The non-partition pulsar topic partition id is `-1`, using multiples of the 
> non-partition topics  
>  in Pulsar source maybe lose topic subscribe.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34436) Avro schema evolution and compatibility issues in Pulsar connector

2024-04-14 Thread Yufan Sheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837073#comment-17837073
 ] 

Yufan Sheng commented on FLINK-34436:
-

I think this is not a issue. It's just a design on purpose. The schema 
evolution on Pulsar doesn't get proper designed. We just send the desired 
schema to Pulsar server and let the validation be performed on server side. The 
client side shouldn't perform any validation. Instead, we just use the schema 
provided by user for Pub/Sub messages. Sometimes, we even bypass the schema 
with pure bytes for (de)serializing messages on flink.

> Avro schema evolution and compatibility issues in Pulsar connector
> --
>
> Key: FLINK-34436
> URL: https://issues.apache.org/jira/browse/FLINK-34436
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.2
>Reporter: Jacek Wislicki
>Priority: Major
>
> We noticed a couple of critical issues in the Pulsar-Flink connector related 
> to schema evolution and compatibility. Please see the MRE available at 
> https://github.com/JacekWislicki/test11. More details are in the project's 
> README file, here is the summary:
> Library versions:
> * Pulsar 3.0.1
> * Flink 1.17.2
> * Pulsar-Flink connector 4.1.0-1.17
> Problems:
> * Exception thrown when schema's fields are added/removed
> * Avro's enum default value is ignored, instead the last known applied
> I believe that I observed the same behaviour in the Pulsar itself, still now 
> we are focusing on the connector, hence I was able to document the problems 
> when using it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34629] Fix invalid subtask assignment for non-partitioned topics [flink-connector-pulsar]

2024-04-14 Thread via GitHub


boring-cyborg[bot] commented on PR #85:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/85#issuecomment-2055403591

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34629] Fix invalid subtask assignment for non-partitioned topics [flink-connector-pulsar]

2024-04-14 Thread via GitHub


tisonkun merged PR #85:
URL: https://github.com/apache/flink-connector-pulsar/pull/85


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-25537][JUnit5 Migration] Module: flink-core with,Package: Configuration [flink]

2024-04-14 Thread via GitHub


Jiabao-Sun commented on code in PR #24612:
URL: https://github.com/apache/flink/pull/24612#discussion_r1565204190


##
flink-core/src/test/java/org/apache/flink/configuration/ConfigurationConversionsTest.java:
##
@@ -52,7 +51,9 @@ public class ConfigurationConversionsTest {
 
 private Configuration pc;
 
-@Before
+@Parameter private TestSpec testSpec;
+
+@BeforeEach
 public void init() {

Review Comment:
   The visibility of class and methods can be package default.



##
flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java:
##
@@ -21,83 +21,75 @@
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.UnsupportedFileSystemSchemeException;
 import org.apache.flink.core.fs.local.LocalFileSystem;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
-import java.io.IOException;
+import java.io.File;
 import java.net.URI;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the configuration of the default file system scheme. */
-public class FilesystemSchemeConfigTest extends TestLogger {
+public class FilesystemSchemeConfigTest {
 
-@Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+@TempDir public File tempFolder;

Review Comment:
   private



##
flink-core/src/test/java/org/apache/flink/configuration/ConfigurationParsingInvalidFormatsTest.java:
##
@@ -64,35 +66,35 @@ public static Object[][] getSpecs() {
 };
 }
 
-@Parameterized.Parameter public ConfigOption option;
+@Parameter public ConfigOption option;

Review Comment:
   visibility can be private.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35064) Flink sql connector pulsar/hive com.fasterxml.jackson.annotation.JsonFormat$Value conflict

2024-04-14 Thread Yufan Sheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837068#comment-17837068
 ] 

Yufan Sheng commented on FLINK-35064:
-

I have made a comment in the related PR. I just copy/paste it here for 
reference.

The {{pulsar-client-all}} bundle its Jackson internally with a package named 
{{org.apache.pulsar.shade.com.fasterxml}}. The jackson used in flink shouldn't 
be shaded into the same package name. Or it may cause more severe issues in 
class conflicting. I don't think this is a good idea.

The root cause is from pulsar client side with the Jackson shading issues. 
Shade a jackson causes a lot of issues in developing and running the 
flink-sql-connector-pulsar before we (StreamNative) donate it to flink 
community. I think we should ask pulsar do not shade the {{jackson-annotation}} 
in client for fixing this problem.

> Flink sql connector pulsar/hive 
> com.fasterxml.jackson.annotation.JsonFormat$Value conflict
> --
>
> Key: FLINK-35064
> URL: https://issues.apache.org/jira/browse/FLINK-35064
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive, Connectors / Pulsar
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
>  Labels: pull-request-available
>
> When I compile and package {{flink-sql-connector-pulsar}} & 
> {{{}flink-sql-connector-hive{}}}, and then put these two jar files into the 
> Flink lib directory, I execute the following SQL statement through 
> {{{}bin/sql-client.sh{}}}:
>  
> {code:java}
> // code placeholder
> CREATE TABLE
> pulsar_table (
> content string,
> proc_time AS PROCTIME ()
> )
> WITH
> (
> 'connector' = 'pulsar',
> 'topics' = 'persistent://xxx',
> 'service-url' = 'pulsar://xxx',
> 'source.subscription-name' = 'xxx',
> 'source.start.message-id' = 'latest',
> 'format' = 'csv',
> 'pulsar.client.authPluginClassName' = 
> 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
> 'pulsar.client.authParams' = 'token:xxx'
> );
>  
> select * from pulsar_table; {code}
> The task error exception stack is as follows:
>  
> {code:java}
> Caused by: java.lang.NoSuchMethodError: 
> com.fasterxml.jackson.annotation.JsonFormat$Value.empty()Lcom/fasterxml/jackson/annotation/JsonFormat$Value;
> at 
> org.apache.pulsar.shade.com.fasterxml.jackson.databind.cfg.MapperConfig.(MapperConfig.java:56)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectMapper.(ObjectMapper.java:660)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectMapper.(ObjectMapper.java:576)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.pulsar.common.util.ObjectMapperFactory.createObjectMapperInstance(ObjectMapperFactory.java:151)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.pulsar.common.util.ObjectMapperFactory.(ObjectMapperFactory.java:142)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.pulsar.client.impl.conf.ConfigurationDataUtils.create(ConfigurationDataUtils.java:35)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.pulsar.client.impl.conf.ConfigurationDataUtils.(ConfigurationDataUtils.java:43)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.pulsar.client.impl.ClientBuilderImpl.loadConf(ClientBuilderImpl.java:77)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient(PulsarClientFactory.java:105)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator.(PulsarSourceEnumerator.java:95)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator.(PulsarSourceEnumerator.java:76)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.flink.connector.pulsar.source.PulsarSource.createEnumerator(PulsarSource.java:144)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:213)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
> {code}
>  
> The exception shows a conflict with 
> {{{}com.fasterxml.jackson.annotation.JsonFormat$Value{}}}. I investigated and 
> found that {{flink-sql-connector-pulsar}} and {{flink-sql-connector-hive}} 
> depend on different versions, leading to this conflict.
> {code:java}
> // flink-sql-connector-pulsar pom.xml
> 
>     

Re: [PR] [FLINK-35064] Update Pulsar dependency to solve the conflict of com.f… [flink-connector-pulsar]

2024-04-14 Thread via GitHub


syhily commented on PR #88:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/88#issuecomment-2055283881

   The root cause is from pulsar client side with the Jackson shading issues. 
Shade a jackson causes a lot of issues. I think we should ask pulsar client not 
to shade the jackson.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34629] Fix non-partition topic subscribe lost. [flink-connector-pulsar]

2024-04-14 Thread via GitHub


syhily commented on PR #84:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/84#issuecomment-2055266431

   I'm in favor of the 
https://github.com/apache/flink-connector-pulsar/pull/85, maybe we can continue 
the review progress on that PR and close this PR instead. WDT @minchowang ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33884] Update Pulsar dependency to 3.0.2 in Pulsar Connector [flink-connector-pulsar]

2024-04-14 Thread via GitHub


syhily commented on PR #72:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/72#issuecomment-2055251321

   Thanks, there are no issues from my side. @tisonkun Can you help me double 
check this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35064] Update Pulsar dependency to solve the conflict of com.f… [flink-connector-pulsar]

2024-04-14 Thread via GitHub


syhily commented on PR #88:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/88#issuecomment-2055227659

   Sorry, I don't think this PR is accepted, indeed. IIUC, the 
`pulsar-client-all` bundle a Jackson internally with a package named 
`org.apache.pulsar.shade.com.fasterxml`. The normal jackson can't be shaded 
into the same package name. Or it may cause more severe issues in class 
conflicting. I don't think this is a good idea.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow specifying the ServerSocket port for the collect function when accessing the TaskManager from the client. [flink]

2024-04-14 Thread via GitHub


hzjhjjyy closed pull request #23254: Allow specifying the ServerSocket port for 
the collect function when accessing the TaskManager from the client.
URL: https://github.com/apache/flink/pull/23254


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35072][cdc][doris] Support applying compatible `AlterColumnTypeEvent` to Doris sink [flink-cdc]

2024-04-14 Thread via GitHub


lvyanquan commented on code in PR #3215:
URL: https://github.com/apache/flink-cdc/pull/3215#discussion_r1565182494


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.doris.sink;
+
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.sink.schema.SchemaChangeManager;
+
+import java.io.IOException;
+
+/** Provides schema change operations based on {@link SchemaChangeManager}. */
+public class DorisSchemaChangeManager extends SchemaChangeManager {
+public DorisSchemaChangeManager(DorisOptions dorisOptions) {
+super(dorisOptions);
+}
+
+private static final String MODIFY_COLUMN_DDL = "ALTER TABLE %s MODIFY 
COLUMN %s %s";
+
+public boolean alterColumn(
+String database, String table, String columnName, String 
newColumnType)
+throws IOException, IllegalArgumentException {
+String tableIdentifier = String.format("%s.%s", database, table);
+FieldSchema alterFieldSchema = new FieldSchema(columnName, 
newColumnType, "");
+
+String alterColumnDDL =
+String.format(
+MODIFY_COLUMN_DDL,
+tableIdentifier,
+columnName,
+alterFieldSchema.getTypeString());
+
+try {
+return this.schemaChange(
+database, table, buildRequestParam(true, columnName), 
alterColumnDDL);
+} catch (RuntimeException ex) {
+if (ex.getMessage().contains("Nothing is changed. please check 
your alter stmt.")) {

Review Comment:
   Considering future maintenance, could you please add some description about 
the source code of this error message? and is there a method to determine 
through exception class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]

2024-04-14 Thread via GitHub


fredia commented on code in PR #24651:
URL: https://github.com/apache/flink/pull/24651#discussion_r1565164680


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java:
##
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for state descriptors. A {@code StateDescriptor} is used for 
creating partitioned
+ * State in stateful operations internally.
+ *
+ * @param  The type of the value of the state object described by this 
state descriptor.
+ */
+@Internal
+public abstract class StateDescriptor implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+/** An enumeration of the types of supported states. */
+public enum Type {
+VALUE,
+LIST,
+REDUCING,
+FOLDING,
+AGGREGATING,
+MAP
+}
+
+/** ID that uniquely identifies state created from this StateDescriptor. */
+@Nonnull private final String stateId;

Review Comment:
   How is `stateId` generated? If the sync state and async state co-exist, can 
they use the same `stateId`?



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.StateRequestType;
+
+/**
+ * The {@code InternalKeyedState} is the root of the internal state type 
hierarchy, similar to the
+ * {@link State} being the root of the public API state hierarchy.
+ *
+ * The public API state hierarchy is intended to be programmed against by 
Flink applications. The
+ * internal state hierarchy holds all the auxiliary methods that communicates 
with {@link
+ * AsyncExecutionController} and not intended to be used by user applications.
+ *
+ * @param  The type of key the state is associated to.
+ * @param  The type of values kept internally in state.
+ */
+@Internal
+public abstract class InternalKeyedState implements State {

Review Comment:
   Is the `namespace` required here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]

2024-04-14 Thread via GitHub


Zakelly commented on code in PR #24651:
URL: https://github.com/apache/flink/pull/24651#discussion_r1565167042


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java:
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for state descriptors. A {@code StateDescriptor} is used for 
creating partitioned
+ * State in stateful operations internally.
+ *
+ * @param  The type of the value of the state object described by this 
state descriptor.
+ */
+@Internal
+public abstract class StateDescriptor implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+/** An enumeration of the types of supported states. */
+public enum Type {
+VALUE,
+LIST,
+REDUCING,
+FOLDING,
+AGGREGATING,
+MAP
+}
+
+/** ID that uniquely identifies state created from this StateDescriptor. */
+@Nonnull private final String stateId;
+
+/** The serializer for the type. */
+@Nonnull private final TypeSerializer typeSerializer;
+
+/**
+ * The type information describing the value type. Remain this since it 
could provide more
+ * information which could be used internally in the future.
+ */
+@Nonnull private final TypeInformation typeInfo;

Review Comment:
   I thought the statement `which could be used internally in the future` means 
it will be used to create the serializer lazily, which no longer applies to 
current implementation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34936][Checkpointing] Register reused shared state handle to FileMergingSnapshotManager [flink]

2024-04-14 Thread via GitHub


Zakelly commented on code in PR #24644:
URL: https://github.com/apache/flink/pull/24644#discussion_r1565161944


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##
@@ -459,13 +470,28 @@ public void notifyCheckpointSubsumed(SubtaskKey 
subtaskKey, long checkpointId)
 uploadedStates.headMap(checkpointId, 
true).entrySet().iterator();
 while (uploadedStatesIterator.hasNext()) {
 Map.Entry> entry = 
uploadedStatesIterator.next();
-if (discardLogicalFiles(subtaskKey, entry.getKey(), 
entry.getValue())) {
+if (discardLogicalFiles(subtaskKey, checkpointId, 
entry.getValue())) {

Review Comment:
   FYI: This is a bug found by newly added UTs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]

2024-04-14 Thread via GitHub


masteryhx commented on code in PR #24651:
URL: https://github.com/apache/flink/pull/24651#discussion_r1565160755


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java:
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for state descriptors. A {@code StateDescriptor} is used for 
creating partitioned
+ * State in stateful operations internally.
+ *
+ * @param  The type of the value of the state object described by this 
state descriptor.
+ */
+@Internal
+public abstract class StateDescriptor implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+/** An enumeration of the types of supported states. */
+public enum Type {
+VALUE,
+LIST,
+REDUCING,
+FOLDING,
+AGGREGATING,
+MAP
+}
+
+/** ID that uniquely identifies state created from this StateDescriptor. */
+@Nonnull private final String stateId;
+
+/** The serializer for the type. */
+@Nonnull private final TypeSerializer typeSerializer;
+
+/**
+ * The type information describing the value type. Remain this since it 
could provide more
+ * information which could be used internally in the future.
+ */
+@Nonnull private final TypeInformation typeInfo;

Review Comment:
   I just remain this since it could provide/persist more information e.g. 
schema than TypeSerializer as its comment.
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35046][state] Introduce AsyncKeyedStateBackend supporting to create StateExecutor [flink]

2024-04-14 Thread via GitHub


Zakelly commented on code in PR #24663:
URL: https://github.com/apache/flink/pull/24663#discussion_r1565158902


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java:
##
@@ -104,6 +104,25 @@ default String getName() {
  CheckpointableKeyedStateBackend createKeyedStateBackend(
 KeyedStateBackendParameters parameters) throws Exception;
 
+/**
+ * Creates a new {@link AsyncKeyedStateBackend} which supports to access 
keyed state
+ * asynchronously.
+ *
+ * Keyed State is state where each value is bound to a key.
+ *
+ * @param parameters The arguments bundle for creating {@link 
AsyncKeyedStateBackend}.
+ * @param  The type of the keys by which the state is organized.
+ * @return The Async Keyed State Backend for the given job, operator.
+ * @throws Exception This method may forward all exceptions that occur 
while instantiating the
+ * backend.
+ */
+@Experimental
+default  AsyncKeyedStateBackend createAsyncKeyedStateBackend(
+KeyedStateBackendParameters parameters) throws Exception {
+throw new UnsupportedOperationException(
+"Don't support createAsyncKeyedStateBackend by default");
+}
+

Review Comment:
   Should we add another method `isSupportAsyncKeyedStateBackend` within 
`StateBackend`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32440][checkpoint] Introduce file merging configurations [flink]

2024-04-14 Thread via GitHub


Zakelly commented on code in PR #22973:
URL: https://github.com/apache/flink/pull/22973#discussion_r1565156390


##
docs/layouts/shortcodes/generated/checkpointing_configuration.html:
##
@@ -44,6 +44,42 @@
 String
 The default directory used for storing the data files and meta 
data of checkpoints in a Flink supported filesystem. The storage path must be 
accessible from all participating processes/nodes(i.e. all TaskManagers and 
JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only 
the meta data of checkpoints will be stored in this directory.
 
+
+
state.checkpoints.file-merging.across-checkpoint-boundary
+false
+Boolean
+Only relevant if state.checkpoints.file-merging.enabled is 
enabled.Whether to allow merging data of multiple checkpoints into one 
physical file. If this option is set to false, only merge files within 
checkpoint boundaries will be merged. Otherwise, it is possible for the logical 
files of different checkpoints to share the same physical file.
+
+
+state.checkpoints.file-merging.enabled

Review Comment:
   I see. Thus we leave it be.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-25537][JUnit5 Migration] Module: flink-core with,Package: Configuration [flink]

2024-04-14 Thread via GitHub


1996fanrui commented on code in PR #24612:
URL: https://github.com/apache/flink/pull/24612#discussion_r1565149231


##
flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java:
##
@@ -21,83 +21,79 @@
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.UnsupportedFileSystemSchemeException;
 import org.apache.flink.core.fs.local.LocalFileSystem;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /** Tests for the configuration of the default file system scheme. */
-public class FilesystemSchemeConfigTest extends TestLogger {
+public class FilesystemSchemeConfigTest {
 
-@Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+@TempDir public File tempFolder;
 
-@After
-public void clearFsSettings() throws IOException {
+@AfterEach
+void clearFsSettings() throws IOException {
 FileSystem.initialize(new Configuration());
 }
 
 // 
 
 @Test
-public void testDefaultsToLocal() throws Exception {
-URI justPath = new URI(tempFolder.newFile().toURI().getPath());
-assertNull(justPath.getScheme());
+void testDefaultsToLocal() throws Exception {
+URI justPath = new URI(File.createTempFile("junit", null, 
tempFolder).toURI().getPath());
+assertThat(justPath.getScheme()).isNull();
 
 FileSystem fs = FileSystem.get(justPath);
-assertEquals("file", fs.getUri().getScheme());
+assertThat(fs.getUri().getScheme()).isEqualTo("file");
 }
 
 @Test
-public void testExplicitlySetToLocal() throws Exception {
+void testExplicitlySetToLocal() throws Exception {
 final Configuration conf = new Configuration();
 conf.set(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, 
LocalFileSystem.getLocalFsURI().toString());
 FileSystem.initialize(conf);
 
-URI justPath = new URI(tempFolder.newFile().toURI().getPath());
-assertNull(justPath.getScheme());
+URI justPath = new URI(File.createTempFile("junit", null, 
tempFolder).toURI().getPath());
+assertThat(justPath.getScheme()).isNull();
 
 FileSystem fs = FileSystem.get(justPath);
-assertEquals("file", fs.getUri().getScheme());
+assertThat(fs.getUri().getScheme()).isEqualTo("file");
 }
 
 @Test
-public void testExplicitlySetToOther() throws Exception {
+void testExplicitlySetToOther() throws Exception {
 final Configuration conf = new Configuration();
 conf.set(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, 
"otherFS://localhost:1234/");
 FileSystem.initialize(conf);
 
-URI justPath = new URI(tempFolder.newFile().toURI().getPath());
-assertNull(justPath.getScheme());
+URI justPath = new URI(File.createTempFile("junit", null, 
tempFolder).toURI().getPath());
+assertThat(justPath.getScheme()).isNull();
 
 try {
 FileSystem.get(justPath);
 fail("should have failed with an exception");
 } catch (UnsupportedFileSystemSchemeException e) {
-assertTrue(e.getMessage().contains("otherFS"));
+assertThat(e.getMessage()).contains("otherFS");

Review Comment:
   It's better to use `assertThatThrownBy` instead.



##
flink-core/src/test/java/org/apache/flink/configuration/MemorySizeTest.java:
##
@@ -20,243 +20,232 @@
 
 import org.apache.flink.core.testutils.CommonTestUtils;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 
 import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static 
org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
 
 /** Tests for the {@link MemorySize} class. */
-public class MemorySizeTest {
+class MemorySizeTest {
 
 @Test
-public void testUnitConversion() {
+void testUnitConversion() {
 final MemorySize zero = 

[jira] [Updated] (FLINK-34987) Introduce Internal State Interface for Async State API

2024-04-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34987:
---
Labels: pull-request-available  (was: )

> Introduce Internal State Interface for Async State API
> --
>
> Key: FLINK-34987
> URL: https://issues.apache.org/jira/browse/FLINK-34987
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Hangxiang Yu
>Assignee: Hangxiang Yu
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]

2024-04-14 Thread via GitHub


Zakelly commented on code in PR #24651:
URL: https://github.com/apache/flink/pull/24651#discussion_r1565148947


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.StateRequestType;
+
+/**
+ * The {@code InternalKvState} is the root of the internal state type 
hierarchy, similar to the

Review Comment:
   nit. replace all `InternalKvState` with `InternalKeyedState`.



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java:
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for state descriptors. A {@code StateDescriptor} is used for 
creating partitioned
+ * State in stateful operations internally.
+ *
+ * @param  The type of the value of the state object described by this 
state descriptor.
+ */
+@Internal
+public abstract class StateDescriptor implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+/** An enumeration of the types of supported states. */
+public enum Type {
+VALUE,
+LIST,
+REDUCING,
+FOLDING,
+AGGREGATING,
+MAP
+}
+
+/** ID that uniquely identifies state created from this StateDescriptor. */
+@Nonnull private final String stateId;
+
+/** The serializer for the type. */
+@Nonnull private final TypeSerializer typeSerializer;
+
+/**
+ * The type information describing the value type. Remain this since it 
could provide more
+ * information which could be used internally in the future.
+ */
+@Nonnull private final TypeInformation typeInfo;

Review Comment:
   IIUC, we don't need this?



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java:
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed 

Re: [PR] [FLINK-35064] Update Pulsar dependency to solve the conflict of com.f… [flink-connector-pulsar]

2024-04-14 Thread via GitHub


elon-X commented on PR #88:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/88#issuecomment-2054774574

   @MartijnVisser @syhily Could you please help approve this PR when you have 
some free time? Thank you very much.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35102][doris] Fix Doris connector type mapping issues [flink-cdc]

2024-04-14 Thread via GitHub


yuxiqian commented on PR #3224:
URL: https://github.com/apache/flink-cdc/pull/3224#issuecomment-2054696577

   cc @PatrickRen @lvyanquan


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35102) Incorret Type mapping for Flink CDC Doris connector

2024-04-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35102:
---
Labels: pull-request-available  (was: )

> Incorret Type mapping for Flink CDC Doris connector
> ---
>
> Key: FLINK-35102
> URL: https://issues.apache.org/jira/browse/FLINK-35102
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Xiqian YU
>Priority: Major
>  Labels: pull-request-available
>
> According to Flink CDC Doris connector docs, CHAR and VARCHAR are mapped to 
> 3-bytes since Doris uses UTF-8 variable-length encoding internally.
> |CHAR(n)|CHAR(n*3)|In Doris, strings are stored in UTF-8 encoding, so English 
> characters occupy 1 byte and Chinese characters occupy 3 bytes. The length 
> here is multiplied by 3. The maximum length of CHAR is 255. Once exceeded, it 
> will automatically be converted to VARCHAR type.|
> |VARCHAR(n)|VARCHAR(n*3)|Same as above. The length here is multiplied by 3. 
> The maximum length of VARCHAR is 65533. Once exceeded, it will automatically 
> be converted to STRING type.|
> However, currently Doris connector maps `CHAR(n)` to `CHAR(n)` and 
> `VARCHAR(n)` to `VARCHAR(n * 4)`, which is inconsistent with specification in 
> docs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix][runtime]Fix the missing methed parameter annotation problem [flink]

2024-04-14 Thread via GitHub


chenyu-opensource commented on PR #24662:
URL: https://github.com/apache/flink/pull/24662#issuecomment-2054684404

   > Thanks for the quick fix, looks good to me.
   > 
   > I just think a hotfix commit should be good enough, would you mind 
renaming this to `[hotfix][runtime]x`?
   
   That sounds great. I have made this change. Thank you so much. @reswqa 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35102) Incorret Type mapping for Flink CDC Doris connector

2024-04-14 Thread Xiqian YU (Jira)
Xiqian YU created FLINK-35102:
-

 Summary: Incorret Type mapping for Flink CDC Doris connector
 Key: FLINK-35102
 URL: https://issues.apache.org/jira/browse/FLINK-35102
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: Xiqian YU


According to Flink CDC Doris connector docs, CHAR and VARCHAR are mapped to 
3-bytes since Doris uses UTF-8 variable-length encoding internally.
|CHAR(n)|CHAR(n*3)|In Doris, strings are stored in UTF-8 encoding, so English 
characters occupy 1 byte and Chinese characters occupy 3 bytes. The length here 
is multiplied by 3. The maximum length of CHAR is 255. Once exceeded, it will 
automatically be converted to VARCHAR type.|
|VARCHAR(n)|VARCHAR(n*3)|Same as above. The length here is multiplied by 3. The 
maximum length of VARCHAR is 65533. Once exceeded, it will automatically be 
converted to STRING type.|

However, currently Doris connector maps `CHAR(n)` to `CHAR(n)` and `VARCHAR(n)` 
to `VARCHAR(n * 4)`, which is inconsistent with specification in docs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [minor][cdc][docs] Optimize styles of the Flink CDC website docs home… [flink-cdc]

2024-04-14 Thread via GitHub


leonardBang merged PR #3208:
URL: https://github.com/apache/flink-cdc/pull/3208


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [minor][docs] Optimize markdown styles in quickstart doc [flink-cdc]

2024-04-14 Thread via GitHub


xleoken commented on PR #3223:
URL: https://github.com/apache/flink-cdc/pull/3223#issuecomment-2054589844

   cc @GOODBOY008


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [minor][docs] Optimize markdown styles in quickstart doc [flink-cdc]

2024-04-14 Thread via GitHub


xleoken opened a new pull request, #3223:
URL: https://github.com/apache/flink-cdc/pull/3223

   
![image](https://github.com/apache/flink-cdc/assets/95013770/038dda76-0ede-432d-9abf-c29d1cf256b0)
   
   
   
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/get-started/quickstart/mysql-to-doris/#synchronize-schema-and-data-changes
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35046][state] Introduce AsyncKeyedStateBackend supporting to create StateExecutor [flink]

2024-04-14 Thread via GitHub


flinkbot commented on PR #24663:
URL: https://github.com/apache/flink/pull/24663#issuecomment-2054586620

   
   ## CI report:
   
   * b551d919ce3a63501e9ebc6f8769bd56f0c74d0c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35101][runtime]Fix the missing methed parameter annotation problem [flink]

2024-04-14 Thread via GitHub


flinkbot commented on PR #24662:
URL: https://github.com/apache/flink/pull/24662#issuecomment-2054585372

   
   ## CI report:
   
   * 0260e42af4748141a97fc036dc42b87623829b36 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35040) The performance of serializerHeavyString regresses since April 3

2024-04-14 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837041#comment-17837041
 ] 

Rui Fan commented on FLINK-35040:
-

Thanks [~slfan1989] and [~rmetzger] for the comment!

I didn't find any related issue from commons-io JIRA[1]. Also, I ran benchmark 
on my Mac with jdk11, and try to analyze why code path causes this regression. 
I use async-profiler wall mode to analyze the benchmark, and didn't find any 
code from commons.io package.

Do you have any idea to troubleshooting?

[1]https://issues.apache.org/jira/browse/IO-855?jql=project%20%3D%20IO%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)

> The performance of serializerHeavyString regresses since April 3
> 
>
> Key: FLINK-35040
> URL: https://issues.apache.org/jira/browse/FLINK-35040
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: image-2024-04-08-10-51-07-403.png, 
> image-2024-04-11-12-53-53-353.png, screenshot-1.png
>
>
> The performance of serializerHeavyString regresses since April 3, and had not 
> yet recovered on April 8th.
> It seems Java 11 regresses, and Java 8 and Java 17 are fine.
> http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerHeavyString=on=on=off=3=200
>  !screenshot-1.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35046) Introduce New KeyedStateBackend related Async interfaces

2024-04-14 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-35046:


Assignee: Hangxiang Yu

> Introduce New KeyedStateBackend related Async interfaces
> 
>
> Key: FLINK-35046
> URL: https://issues.apache.org/jira/browse/FLINK-35046
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Hangxiang Yu
>Assignee: Hangxiang Yu
>Priority: Major
>  Labels: pull-request-available
>
> Since we have introduced new State API, the async version of some classes 
> should be introduced to support it, e.g. AsyncKeyedStateBackend, new State 
> Descriptor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35046) Introduce New KeyedStateBackend related Async interfaces

2024-04-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35046:
---
Labels: pull-request-available  (was: )

> Introduce New KeyedStateBackend related Async interfaces
> 
>
> Key: FLINK-35046
> URL: https://issues.apache.org/jira/browse/FLINK-35046
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Hangxiang Yu
>Priority: Major
>  Labels: pull-request-available
>
> Since we have introduced new State API, the async version of some classes 
> should be introduced to support it, e.g. AsyncKeyedStateBackend, new State 
> Descriptor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35046][state] Introduce AsyncKeyedStateBackend supporting to create StateExecutor [flink]

2024-04-14 Thread via GitHub


masteryhx opened a new pull request, #24663:
URL: https://github.com/apache/flink/pull/24663

   
   
   ## What is the purpose of the change
   
   Introduce AsyncKeyedStateBackend which supports to create StateExecutor
   
   ## Brief change log
   
   - Introduce a new interface called AsyncKeyedStateBackend 
   - Introduce a new method for StateBackend to create AsyncKeyedStateBackend
   
   
   ## Verifying this change
   
   
   This change modified AsyncExecutionControllerTest to verify the new added 
interface.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport

2024-04-14 Thread chenyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837040#comment-17837040
 ] 

chenyu commented on FLINK-35101:


[~Weijie Guo] Thank you so much for your review.

> Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
> -
>
> Key: FLINK-35101
> URL: https://issues.apache.org/jira/browse/FLINK-35101
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0, 1.19.0, 1.20.0
>Reporter: chenyu
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: image-2024-04-15-10-17-53-803.png
>
>
> The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed.
> !image-2024-04-15-10-17-53-803.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport

2024-04-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35101:
---
Labels: pull-request-available  (was: )

> Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
> -
>
> Key: FLINK-35101
> URL: https://issues.apache.org/jira/browse/FLINK-35101
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0, 1.19.0, 1.20.0
>Reporter: chenyu
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: image-2024-04-15-10-17-53-803.png
>
>
> The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed.
> !image-2024-04-15-10-17-53-803.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35101][runtime]Fix the missing methed parameter annotation problem [flink]

2024-04-14 Thread via GitHub


chenyu-opensource opened a new pull request, #24662:
URL: https://github.com/apache/flink/pull/24662

   
   
   
   ## What is the purpose of the change
   
   This pull request fixs the problem of missing methed parameter annotation.
   
   
   ## Brief change log
   
   Add the parameter annotation  of 'taskManagerResourceId' for the method 
'ResourceManagerGateway.sendSlotReport'.
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35093) Postgres source connector support SPECIFIC_OFFSETS start up mode from an existed replication slot.

2024-04-14 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo reassigned FLINK-35093:
--

Assignee: Hongshun Wang

> Postgres source connector support SPECIFIC_OFFSETS start up mode from an 
> existed replication slot.
> --
>
> Key: FLINK-35093
> URL: https://issues.apache.org/jira/browse/FLINK-35093
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Major
>
> Current, Postgres source connector  only support INITIAL and LATEST mode.
> However, sometimes, user want to restart from an existed replication slot's 
> confiermed_lsn.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport

2024-04-14 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837038#comment-17837038
 ] 

Weijie Guo commented on FLINK-35101:


Just feel free to open a hotfix PR.

> Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
> -
>
> Key: FLINK-35101
> URL: https://issues.apache.org/jira/browse/FLINK-35101
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0, 1.19.0
>Reporter: chenyu
>Priority: Minor
> Fix For: 1.19.0
>
> Attachments: image-2024-04-15-10-17-53-803.png
>
>
> The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed.
> !image-2024-04-15-10-17-53-803.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport

2024-04-14 Thread chenyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chenyu updated FLINK-35101:
---
Component/s: Table SQL / Runtime

> Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
> -
>
> Key: FLINK-35101
> URL: https://issues.apache.org/jira/browse/FLINK-35101
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0, 1.19.0
>Reporter: chenyu
>Priority: Minor
> Fix For: 1.19.0
>
> Attachments: image-2024-04-15-10-17-53-803.png
>
>
> The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed.
> !image-2024-04-15-10-17-53-803.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport

2024-04-14 Thread chenyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chenyu updated FLINK-35101:
---
Fix Version/s: (was: 1.18.0)

> Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
> -
>
> Key: FLINK-35101
> URL: https://issues.apache.org/jira/browse/FLINK-35101
> Project: Flink
>  Issue Type: Improvement
>Reporter: chenyu
>Priority: Minor
> Fix For: 1.19.0
>
> Attachments: image-2024-04-15-10-17-53-803.png
>
>
> The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed.
> !image-2024-04-15-10-17-53-803.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport

2024-04-14 Thread chenyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837036#comment-17837036
 ] 

chenyu commented on FLINK-35101:


Please, assign this issue to me. I can fix it.

> Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
> -
>
> Key: FLINK-35101
> URL: https://issues.apache.org/jira/browse/FLINK-35101
> Project: Flink
>  Issue Type: Improvement
>Reporter: chenyu
>Priority: Minor
> Fix For: 1.18.0, 1.19.0
>
> Attachments: image-2024-04-15-10-17-53-803.png
>
>
> The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed.
> !image-2024-04-15-10-17-53-803.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport

2024-04-14 Thread chenyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chenyu updated FLINK-35101:
---
Affects Version/s: 1.19.0
   1.18.0

> Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
> -
>
> Key: FLINK-35101
> URL: https://issues.apache.org/jira/browse/FLINK-35101
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.18.0, 1.19.0
>Reporter: chenyu
>Priority: Minor
> Fix For: 1.19.0
>
> Attachments: image-2024-04-15-10-17-53-803.png
>
>
> The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed.
> !image-2024-04-15-10-17-53-803.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport

2024-04-14 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35101:
---
Affects Version/s: 1.20.0

> Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
> -
>
> Key: FLINK-35101
> URL: https://issues.apache.org/jira/browse/FLINK-35101
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0, 1.19.0, 1.20.0
>Reporter: chenyu
>Priority: Minor
> Fix For: 1.19.0
>
> Attachments: image-2024-04-15-10-17-53-803.png
>
>
> The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed.
> !image-2024-04-15-10-17-53-803.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport

2024-04-14 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo closed FLINK-35101.
--
Resolution: Fixed

> Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
> -
>
> Key: FLINK-35101
> URL: https://issues.apache.org/jira/browse/FLINK-35101
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0, 1.19.0, 1.20.0
>Reporter: chenyu
>Priority: Minor
> Fix For: 1.20.0
>
> Attachments: image-2024-04-15-10-17-53-803.png
>
>
> The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed.
> !image-2024-04-15-10-17-53-803.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport

2024-04-14 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35101:
---
Component/s: Runtime / Coordination
 (was: Table SQL / Runtime)

> Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
> -
>
> Key: FLINK-35101
> URL: https://issues.apache.org/jira/browse/FLINK-35101
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0, 1.19.0
>Reporter: chenyu
>Priority: Minor
> Fix For: 1.19.0
>
> Attachments: image-2024-04-15-10-17-53-803.png
>
>
> The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed.
> !image-2024-04-15-10-17-53-803.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport

2024-04-14 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35101:
---
Fix Version/s: 1.20.0
   (was: 1.19.0)

> Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
> -
>
> Key: FLINK-35101
> URL: https://issues.apache.org/jira/browse/FLINK-35101
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0, 1.19.0, 1.20.0
>Reporter: chenyu
>Priority: Minor
> Fix For: 1.20.0
>
> Attachments: image-2024-04-15-10-17-53-803.png
>
>
> The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed.
> !image-2024-04-15-10-17-53-803.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport

2024-04-14 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837037#comment-17837037
 ] 

Weijie Guo commented on FLINK-35101:


Yeap, this should be aligned. A hot-fix is good enough, I will close this then.

> Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
> -
>
> Key: FLINK-35101
> URL: https://issues.apache.org/jira/browse/FLINK-35101
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0, 1.19.0
>Reporter: chenyu
>Priority: Minor
> Fix For: 1.19.0
>
> Attachments: image-2024-04-15-10-17-53-803.png
>
>
> The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed.
> !image-2024-04-15-10-17-53-803.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34273) git fetch fails

2024-04-14 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837034#comment-17837034
 ] 

Weijie Guo commented on FLINK-34273:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58895=logs=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3=2cec4644-024f-529b-9379-b711878ccf41=262

> git fetch fails
> ---
>
> Key: FLINK-34273
> URL: https://issues.apache.org/jira/browse/FLINK-34273
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI, Test Infrastructure
>Affects Versions: 1.19.0, 1.18.1, 1.20.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: test-stability
>
> We've seen multiple {{git fetch}} failures. I assume this to be an 
> infrastructure issue. This Jira issue is for documentation purposes.
> {code:java}
> error: RPC failed; curl 18 transfer closed with outstanding read data 
> remaining
> error: 5211 bytes of body are still expected
> fetch-pack: unexpected disconnect while reading sideband packet
> fatal: early EOF
> fatal: fetch-pack: invalid index-pack output {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57080=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=5d6dc3d3-393d-5111-3a40-c6a5a36202e6=667



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport

2024-04-14 Thread chenyu (Jira)
chenyu created FLINK-35101:
--

 Summary: Missing method parameter annotation for 
ResourceManagerGateway.sendSlotReport
 Key: FLINK-35101
 URL: https://issues.apache.org/jira/browse/FLINK-35101
 Project: Flink
  Issue Type: Improvement
Reporter: chenyu
 Fix For: 1.19.0, 1.18.0
 Attachments: image-2024-04-15-10-17-53-803.png

The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed.

!image-2024-04-15-10-17-53-803.png!

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-18476) PythonEnvUtilsTest#testStartPythonProcess fails

2024-04-14 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837035#comment-17837035
 ] 

Weijie Guo commented on FLINK-18476:


1.20 jdk17 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58895=logs=d871f0ce-7328-5d00-023b-e7391f5801c8=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6=23253

> PythonEnvUtilsTest#testStartPythonProcess fails
> ---
>
> Key: FLINK-18476
> URL: https://issues.apache.org/jira/browse/FLINK-18476
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Tests
>Affects Versions: 1.11.0, 1.15.3, 1.18.0, 1.19.0, 1.20.0
>Reporter: Dawid Wysakowicz
>Priority: Major
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> test-stability
>
> The 
> {{org.apache.flink.client.python.PythonEnvUtilsTest#testStartPythonProcess}} 
> failed in my local environment as it assumes the environment has 
> {{/usr/bin/python}}. 
> I don't know exactly how did I get python in Ubuntu 20.04, but I have only 
> alias for {{python = python3}}. Therefore the tests fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35100) Quickstarts Java nightly end-to-end test failed on Azure

2024-04-14 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35100:
---
Description: 

{code:java}
java.lang.IllegalStateException: Trying to open gateway for unseen checkpoint: 
latest known checkpoint = 1, incoming checkpoint = 2
Apr 13 01:11:42 at 
org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl.openGatewayAndUnmarkCheckpoint(SubtaskGatewayImpl.java:223)
 ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
Apr 13 01:11:42 at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$null$3(OperatorCoordinatorHolder.java:276)
 ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
Apr 13 01:11:42 at java.util.HashMap$Values.forEach(HashMap.java:982) 
~[?:1.8.0_402]
Apr 13 01:11:42 at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$notifyCheckpointAborted$4(OperatorCoordinatorHolder.java:276)
 ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
Apr 13 01:11:42 at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:460)
 ~[flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT]
Apr 13 01:11:42 at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
Apr 13 01:11:42 at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:460)
 ~[flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT]
Apr 13 01:11:42 at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:225)
 ~[flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT]
Apr 13 01:11:42 at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
 ~[flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT]
Apr 13 01:11:42 at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
 ~[flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT]
Apr 13 01:11:42 at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) 
[flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT]
Apr 13 01:11:42 at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) 
[flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT]
Apr 13 01:11:42 at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
[flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT]
Apr 13 01:11:42 at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 
[flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT]
Apr 13 01:11:42 at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) 
[flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT]
Apr 13 01:11:42 at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 
[flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT]
Apr 13 01:11:42 at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
[flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT]
Apr 13 01:11:42 at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
[flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT]
Apr 13 01:11:42 at 
org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) 
[flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT]

{code}

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58895=logs=fb37c667-81b7-5c22-dd91-846535e99a97=011e961e-597c-5c96-04fe-7941c8b83f23=5714


> Quickstarts Java nightly end-to-end test failed on Azure
> 
>
> Key: FLINK-35100
> URL: https://issues.apache.org/jira/browse/FLINK-35100
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Major
>
> {code:java}
> java.lang.IllegalStateException: Trying to open gateway for unseen 
> checkpoint: latest known checkpoint = 1, incoming checkpoint = 2
> Apr 13 01:11:42   at 
> org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl.openGatewayAndUnmarkCheckpoint(SubtaskGatewayImpl.java:223)
>  ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
> Apr 13 01:11:42   at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$null$3(OperatorCoordinatorHolder.java:276)
>  ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
> Apr 13 01:11:42   at java.util.HashMap$Values.forEach(HashMap.java:982) 
> ~[?:1.8.0_402]
> Apr 13 

[jira] [Comment Edited] (FLINK-34273) git fetch fails

2024-04-14 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837034#comment-17837034
 ] 

Weijie Guo edited comment on FLINK-34273 at 4/15/24 2:15 AM:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58895=logs=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3=2cec4644-024f-529b-9379-b711878ccf41=262

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58895=logs=baf26b34-3c6a-54e8-f93f-cf269b32f802=c853d405-a51b-5dcf-f438-2de530b016d4=269


was (Author: weijie guo):
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58895=logs=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3=2cec4644-024f-529b-9379-b711878ccf41=262

> git fetch fails
> ---
>
> Key: FLINK-34273
> URL: https://issues.apache.org/jira/browse/FLINK-34273
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI, Test Infrastructure
>Affects Versions: 1.19.0, 1.18.1, 1.20.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: test-stability
>
> We've seen multiple {{git fetch}} failures. I assume this to be an 
> infrastructure issue. This Jira issue is for documentation purposes.
> {code:java}
> error: RPC failed; curl 18 transfer closed with outstanding read data 
> remaining
> error: 5211 bytes of body are still expected
> fetch-pack: unexpected disconnect while reading sideband packet
> fatal: early EOF
> fatal: fetch-pack: invalid index-pack output {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57080=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=5d6dc3d3-393d-5111-3a40-c6a5a36202e6=667



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35100) Quickstarts Java nightly end-to-end test failed on Azure

2024-04-14 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-35100:
--

 Summary: Quickstarts Java nightly end-to-end test failed on Azure
 Key: FLINK-35100
 URL: https://issues.apache.org/jira/browse/FLINK-35100
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.20.0
Reporter: Weijie Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35097][table] Fix 'raw' format deserialization [flink]

2024-04-14 Thread via GitHub


kumar-mallikarjuna commented on PR #24661:
URL: https://github.com/apache/flink/pull/24661#issuecomment-2054358863

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35093) Postgres source connector support SPECIFIC_OFFSETS start up mode from an existed replication slot.

2024-04-14 Thread Hongshun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-35093:
--
Description: 
Current, Postgres source connector  only support INITIAL and LATEST mode.

However, sometimes, user want to restart from an existed replication slot's 
confiermed_lsn.

  was:
Current, Postgres source connector  only support INITIAL and LATEST mode.

However, sometimes, user want to restart from an existed replication slot.


> Postgres source connector support SPECIFIC_OFFSETS start up mode from an 
> existed replication slot.
> --
>
> Key: FLINK-35093
> URL: https://issues.apache.org/jira/browse/FLINK-35093
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: Hongshun Wang
>Priority: Major
>
> Current, Postgres source connector  only support INITIAL and LATEST mode.
> However, sometimes, user want to restart from an existed replication slot's 
> confiermed_lsn.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [pipeline-connector][paimon] add paimon pipeline data sink connector. [flink-cdc]

2024-04-14 Thread via GitHub


lvyanquan commented on PR #2916:
URL: https://github.com/apache/flink-cdc/pull/2916#issuecomment-2054304285

   Thanks @yuxiqian for those comments, I've addressed it and resubmitted. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [pipeline-connector][paimon] add paimon pipeline data sink connector. [flink-cdc]

2024-04-14 Thread via GitHub


lvyanquan commented on code in PR #2916:
URL: https://github.com/apache/flink-cdc/pull/2916#discussion_r1565075579


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2;
+
+import org.apache.flink.api.connector.sink2.Committer;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.sink.MultiTableCommittable;
+import org.apache.paimon.flink.sink.StoreMultiCommitter;
+import org.apache.paimon.manifest.WrappedManifestCommittable;
+import org.apache.paimon.options.Options;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** A {@link Committer} to commit write results for multiple tables. */
+public class PaimonCommitter implements Committer {
+
+private final StoreMultiCommitter storeMultiCommitter;
+
+public PaimonCommitter(Options catalogOptions, String commitUser) {
+Catalog catalog = 
FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+// flinkMetricGroup could be passed after FLIP-371.
+storeMultiCommitter = new StoreMultiCommitter(() -> catalog, 
commitUser, null);
+}
+
+@Override
+public void commit(Collection> 
commitRequests)
+throws IOException, InterruptedException {
+if (commitRequests.isEmpty()) {
+return;
+}
+List committables =
+commitRequests.stream()
+.map(CommitRequest::getCommittable)
+.collect(Collectors.toList());
+long checkpointId = committables.get(0).checkpointId();
+WrappedManifestCommittable wrappedManifestCommittable =
+storeMultiCommitter.combine(checkpointId, 1L, committables);
+
storeMultiCommitter.commit(Collections.singletonList(wrappedManifestCommittable));

Review Comment:
   Yes, you can refer to 
[here](https://github.com/apache/flink-cdc/pull/2916/files#diff-c39b348dcbc97b5eea6e288f55b9884b7d7cdd83cdc3bc8a81d3d78179c0f9d4R66).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [pipeline-connector][paimon] add paimon pipeline data sink connector. [flink-cdc]

2024-04-14 Thread via GitHub


lvyanquan commented on code in PR #2916:
URL: https://github.com/apache/flink-cdc/pull/2916#discussion_r1565069967


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java:
##
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink;
+
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DropColumnEvent;
+import org.apache.flink.cdc.common.event.RenameColumnEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.sink.MetadataApplier;
+import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.LogicalTypeConversion;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.SchemaChange;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@code MetadataApplier} that applies metadata changes to Paimon. Support 
primary key table
+ * only.
+ */
+public class PaimonMetadataApplier implements MetadataApplier {
+
+// Catalog is unSerializable.
+private transient Catalog catalog;
+
+// currently, we set table options for all tables using the same options.
+private final Map tableOptions;
+
+private final Options catalogOptions;
+
+private final Map> partitionMaps;
+
+public PaimonMetadataApplier(Options catalogOptions) {
+this.catalogOptions = catalogOptions;
+this.tableOptions = new HashMap<>();
+this.partitionMaps = new HashMap<>();
+}
+
+public PaimonMetadataApplier(
+Options catalogOptions,
+Map tableOptions,
+Map> partitionMaps) {
+this.catalogOptions = catalogOptions;
+this.tableOptions = tableOptions;
+this.partitionMaps = partitionMaps;
+}
+
+@Override
+public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
+if (catalog == null) {
+catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+}
+try {
+if (schemaChangeEvent instanceof CreateTableEvent) {
+applyCreateTable((CreateTableEvent) schemaChangeEvent);
+} else if (schemaChangeEvent instanceof AddColumnEvent) {
+applyAddColumn((AddColumnEvent) schemaChangeEvent);
+} else if (schemaChangeEvent instanceof DropColumnEvent) {
+applyDropColumn((DropColumnEvent) schemaChangeEvent);
+} else if (schemaChangeEvent instanceof RenameColumnEvent) {
+applyRenameColumn((RenameColumnEvent) schemaChangeEvent);
+} else if (schemaChangeEvent instanceof AlterColumnTypeEvent) {
+applyAlterColumn((AlterColumnTypeEvent) schemaChangeEvent);
+} else {
+throw new UnsupportedOperationException(
+"PaimonDataSink doesn't support schema change event " 
+ schemaChangeEvent);
+}
+} catch (Exception e) {
+throw new RuntimeException(e);
+}
+}
+
+/** TODO support partition column. */

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [pipeline-connector][paimon] add paimon pipeline data sink connector. [flink-cdc]

2024-04-14 Thread via GitHub


lvyanquan commented on code in PR #2916:
URL: https://github.com/apache/flink-cdc/pull/2916#discussion_r1565072387


##
docs/content/pipelines/paimon-pipeline(ZH).md:
##
@@ -0,0 +1,209 @@
+# Paimon Pipeline 连接器
+
+Paimon Pipeline 连接器可以用作 Pipeline 的 *Data 
Sink*,将数据写入[Paimon](https://paimon.apache.org)。 本文档介绍如何设置 Paimon Pipeline 连接器。
+
+## 连接器的功能
+* 自动建表
+* 表结构变更同步
+* 数据实时同步
+
+如何创建 Pipeline
+
+
+从 MySQL 读取数据同步到 Paimon 的 Pipeline 可以定义如下:
+
+```yaml
+source:
+   type: mysql
+   name: MySQL Source
+   hostname: 127.0.0.1
+   port: 3306
+   username: admin
+   password: pass
+   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
+   server-id: 5401-5404
+
+sink:
+  type: paimon
+  name: Paimon Sink
+  metastore: filesystem
+  warehouse: /path/warehouse
+
+pipeline:
+   name: MySQL to Paimon Pipeline
+   parallelism: 2
+```

Review Comment:
   addressed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]

2024-04-14 Thread via GitHub


skymilong commented on code in PR #3194:
URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1565044902


##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/MemorySize.java:
##
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * MemorySize is a representation of a number of bytes, viewable in different 
units.
+ *
+ * Parsing
+ *
+ * The size can be parsed from a text expression. If the expression is a 
pure number, the value
+ * will be interpreted as bytes.
+ */
+@PublicEvolving
+public class MemorySize implements java.io.Serializable, 
Comparable {

Review Comment:
   > I think for `MemorySize` we should use the one the Flink instead of 
creating our own. It is a public API (marked as `@PublicEvolving`), and Flink 
CDC doesn't use this one in our production code. IIUC it is just used for 
parsing memory size expressions in configuration.
   
   I apologize for this oversight, and thank you for your patience.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35099) Support for multiple datasources

2024-04-14 Thread melin (Jira)
melin created FLINK-35099:
-

 Summary: Support for multiple datasources
 Key: FLINK-35099
 URL: https://issues.apache.org/jira/browse/FLINK-35099
 Project: Flink
  Issue Type: New Feature
  Components: Flink CDC
Reporter: melin


In a hyperscale scenario, databases are distributed on multiple mysql database 
servers instead of one database server. In our case, there are up to 16 
servers. Support for multiple datasources in a single flink cdc task.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-04-14 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837027#comment-17837027
 ] 

dongwoo.kim edited comment on FLINK-34470 at 4/15/24 1:15 AM:
--

Hello [~m.orazow], 

I'm glad that you're interested in collaborating, I’ll send a pr soon and keep 
you updated.
Thanks for reaching out.

Best,

Dongwoo


was (Author: JIRAUSER300481):
Hello [~m.orazow], 

I'm glad that you're interested in collaborating, I’ll send a pr soon and keep 
you updated.
Thanks for reaching out.

Best regards,

Dongwoo

> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---
>
> Key: FLINK-34470
> URL: https://issues.apache.org/jira/browse/FLINK-34470
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: dongwoo.kim
>Priority: Major
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's 
> request timeouts after hanging. We can always reproduce this unexpected 
> behavior by following below steps.
> This is related to this 
> [issue|https://issues.apache.org/jira/browse/FLINK-33484] too.
> h2. How to reproduce
> 1. Deploy transactional producer and stop after producing certain amount of 
> messages
> 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple 
> query such as getting count of the produced messages
> 3. Flink sql job gets stucked and timeouts.
> h2. Cause
> Transaction producer always produces [control 
> records|https://kafka.apache.org/documentation/#controlbatch] at the end of 
> the transaction. And these control messages are not polled by 
> {*}consumer.poll(){*}. (It is filtered internally). In 
> *KafkaPartitionSplitReader* code, split is finished only when 
> *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
> well with non transactional messages or streaming environment but in some 
> batch use cases it causes unexpected hanging.
> h2. Proposed solution
> {code:java}
> if (consumer.position(tp) >= stoppingOffset) {
> recordsBySplits.setPartitionStoppingOffset(tp, 
> stoppingOffset);
> finishSplitAtRecord(
> tp,
> stoppingOffset,
> lastRecord.offset(),
> finishedPartitions,
> recordsBySplits);
> }
> {code}
> Replacing if condition to *consumer.position(tp) >= stoppingOffset* in 
> [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
>  can solve the problem. 
> *consumer.position(tp)* gets next record's offset if it exist and the last 
> record's offset if the next record doesn't exist. 
> By this KafkaPartitionSplitReader is available to finish the split even when 
> the stopping offset is configured to control record's offset. 
> I would be happy to implement about this fix if we can reach on agreement. 
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-04-14 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837027#comment-17837027
 ] 

dongwoo.kim commented on FLINK-34470:
-

Hello [~m.orazow], 

I'm glad that you're interested in collaborating, I’ll send a pr soon and keep 
you updated.
Thanks for reaching out.

Best regards,

Dongwoo

> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---
>
> Key: FLINK-34470
> URL: https://issues.apache.org/jira/browse/FLINK-34470
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: dongwoo.kim
>Priority: Major
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's 
> request timeouts after hanging. We can always reproduce this unexpected 
> behavior by following below steps.
> This is related to this 
> [issue|https://issues.apache.org/jira/browse/FLINK-33484] too.
> h2. How to reproduce
> 1. Deploy transactional producer and stop after producing certain amount of 
> messages
> 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple 
> query such as getting count of the produced messages
> 3. Flink sql job gets stucked and timeouts.
> h2. Cause
> Transaction producer always produces [control 
> records|https://kafka.apache.org/documentation/#controlbatch] at the end of 
> the transaction. And these control messages are not polled by 
> {*}consumer.poll(){*}. (It is filtered internally). In 
> *KafkaPartitionSplitReader* code, split is finished only when 
> *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
> well with non transactional messages or streaming environment but in some 
> batch use cases it causes unexpected hanging.
> h2. Proposed solution
> {code:java}
> if (consumer.position(tp) >= stoppingOffset) {
> recordsBySplits.setPartitionStoppingOffset(tp, 
> stoppingOffset);
> finishSplitAtRecord(
> tp,
> stoppingOffset,
> lastRecord.offset(),
> finishedPartitions,
> recordsBySplits);
> }
> {code}
> Replacing if condition to *consumer.position(tp) >= stoppingOffset* in 
> [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
>  can solve the problem. 
> *consumer.position(tp)* gets next record's offset if it exist and the last 
> record's offset if the next record doesn't exist. 
> By this KafkaPartitionSplitReader is available to finish the split even when 
> the stopping offset is configured to control record's offset. 
> I would be happy to implement about this fix if we can reach on agreement. 
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [pipeline-connector][paimon] add paimon pipeline data sink connector. [flink-cdc]

2024-04-14 Thread via GitHub


yuxiqian commented on code in PR #2916:
URL: https://github.com/apache/flink-cdc/pull/2916#discussion_r1565028345


##
docs/content/pipelines/paimon-pipeline(ZH).md:
##
@@ -0,0 +1,209 @@
+# Paimon Pipeline 连接器
+
+Paimon Pipeline 连接器可以用作 Pipeline 的 *Data 
Sink*,将数据写入[Paimon](https://paimon.apache.org)。 本文档介绍如何设置 Paimon Pipeline 连接器。
+
+## 连接器的功能
+* 自动建表
+* 表结构变更同步
+* 数据实时同步
+
+如何创建 Pipeline
+
+
+从 MySQL 读取数据同步到 Paimon 的 Pipeline 可以定义如下:
+
+```yaml
+source:
+   type: mysql
+   name: MySQL Source
+   hostname: 127.0.0.1
+   port: 3306
+   username: admin
+   password: pass
+   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
+   server-id: 5401-5404
+
+sink:
+  type: paimon
+  name: Paimon Sink
+  metastore: filesystem
+  warehouse: /path/warehouse
+
+pipeline:
+   name: MySQL to Paimon Pipeline
+   parallelism: 2
+```

Review Comment:
   > This indentation is to maintain the definition of yaml.
   
   `source` and `pipeline` phase uses 3-space indentation while `sink` uses 2. 
Any reason for this inconsistency?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35098][orc] Fix incorrect results in thanEquals filters [flink]

2024-04-14 Thread via GitHub


snuyanzin commented on PR #24658:
URL: https://github.com/apache/flink/pull/24658#issuecomment-2054184159

   Yep, you're right probably i was not aware of the context
   I was judging only by timestamps
   I'm really sorry in case I offended in any way
   
   I would suggest  in case you are interested in something: let others know 
about it with a small comment in jira for instance, this should allow to avoid 
this in future imho...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34942][connectors/opensearch] Add support for Flink 1.19+ [flink-connector-opensearch]

2024-04-14 Thread via GitHub


snuyanzin merged PR #42:
URL: https://github.com/apache/flink-connector-opensearch/pull/42


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34942][connectors/opensearch] Add support for Flink 1.19+ [flink-connector-opensearch]

2024-04-14 Thread via GitHub


snuyanzin commented on PR #42:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/42#issuecomment-2054124759

   Thanks for taking a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [pipeline-connector][paimon] add paimon pipeline data sink connector. [flink-cdc]

2024-04-14 Thread via GitHub


lvyanquan commented on code in PR #2916:
URL: https://github.com/apache/flink-cdc/pull/2916#discussion_r1564775105


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TableSchemaInfo.java:
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2;
+
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.schema.Schema;
+
+import java.time.ZoneId;
+import java.util.List;
+
+/** Keep a list of {@link RecordData.FieldGetter} for a specific {@link 
Schema}. */
+public class TableSchemaInfo {

Review Comment:
   If we use the complete `Schema`, we can directly reuse the `SchemeUtils. 
applySchemeChangeEvent` method. However, if we use `SchemeChangeEvent`, we need 
to handle various SchemeChangeEvents, so when adding new `SchemeChangeEvent` 
types, we need to modify the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [pipeline-connector][paimon] add paimon pipeline data sink connector. [flink-cdc]

2024-04-14 Thread via GitHub


lvyanquan commented on code in PR #2916:
URL: https://github.com/apache/flink-cdc/pull/2916#discussion_r1564773416


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java:
##
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2;
+
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypeChecks;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.RowKind;
+
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.List;
+
+/** A helper class for {@link PaimonWriter} to create FieldGetter and 
GenericRow. */
+public class PaimonWriterHelper {
+
+/** create a list of {@link RecordData.FieldGetter} for {@link 
PaimonWriter}. */
+public static List createFieldGetters(Schema 
schema, ZoneId zoneId) {
+List fieldGetters = new 
ArrayList<>(schema.getColumns().size());
+for (int i = 0; i < schema.getColumns().size(); i++) {
+
fieldGetters.add(createFieldGetter(schema.getColumns().get(i).getType(), i, 
zoneId));
+}
+return fieldGetters;
+}
+
+private static RecordData.FieldGetter createFieldGetter(
+DataType fieldType, int fieldPos, ZoneId zoneId) {
+final RecordData.FieldGetter fieldGetter;
+// ordered by type root definition
+switch (fieldType.getTypeRoot()) {
+case CHAR:
+case VARCHAR:
+fieldGetter = row -> 
BinaryString.fromString(row.getString(fieldPos).toString());
+break;
+case BOOLEAN:
+fieldGetter = row -> row.getBoolean(fieldPos);
+break;
+case BINARY:
+case VARBINARY:
+fieldGetter = row -> row.getBinary(fieldPos);
+break;
+case DECIMAL:
+final int decimalPrecision = 
DataTypeChecks.getPrecision(fieldType);
+final int decimalScale = DataTypeChecks.getScale(fieldType);
+fieldGetter =
+row -> {
+DecimalData decimalData =
+row.getDecimal(fieldPos, decimalPrecision, 
decimalScale);
+return Decimal.fromBigDecimal(
+decimalData.toBigDecimal(), 
decimalPrecision, decimalScale);
+};
+break;
+case TINYINT:
+fieldGetter = row -> row.getByte(fieldPos);
+break;
+case SMALLINT:
+fieldGetter = row -> row.getShort(fieldPos);
+break;
+case INTEGER:
+case DATE:
+case TIME_WITHOUT_TIME_ZONE:
+fieldGetter = row -> row.getInt(fieldPos);
+break;
+case BIGINT:
+fieldGetter = row -> row.getLong(fieldPos);
+break;
+case FLOAT:
+fieldGetter = row -> row.getFloat(fieldPos);
+break;
+case DOUBLE:
+fieldGetter = row -> row.getDouble(fieldPos);
+break;
+case TIMESTAMP_WITHOUT_TIME_ZONE:
+fieldGetter =
+row ->
+Timestamp.fromSQLTimestamp(
+row.getTimestamp(
+fieldPos,
+
DataTypeChecks.getPrecision(fieldType))
+.toTimestamp());
+break;
+case 

Re: [PR] [FLINK-33859] Support OpenSearch v2 [flink-connector-opensearch]

2024-04-14 Thread via GitHub


snuyanzin commented on PR #38:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/38#issuecomment-2054094757

   first this should be  reviewed/approved/merged 
   https://github.com/apache/flink-connector-opensearch/pull/42
   then we could continue with this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35097) Table API Filesystem connector with 'raw' format repeats last line

2024-04-14 Thread Kumar Mallikarjuna (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836959#comment-17836959
 ] 

Kumar Mallikarjuna edited comment on FLINK-35097 at 4/14/24 3:05 PM:
-

[~david.perkins] , I've raised a fix here:

[https://github.com/apache/flink/pull/24661]

 

I just realised, I need to look for a committer and get the issue assigned 
first!


was (Author: JIRAUSER303984):
[~david.perkins] , I've raised a fix here:

https://github.com/apache/flink/pull/24661

> Table API Filesystem connector with 'raw' format repeats last line
> --
>
> Key: FLINK-35097
> URL: https://issues.apache.org/jira/browse/FLINK-35097
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.17.1
> Environment: I ran the above test with 1.17.1. I checked for existing 
> bug tickets and release notes, but did not find anything, so assuming this 
> effects 1.18 and 1.19.
>Reporter: David Perkins
>Priority: Major
>  Labels: pull-request-available
>
> When using the Filesystem connector with 'raw' format to read text data that 
> contains new lines, a row is returned for every line, but always contains the 
> contents of the last line.
> For example, with the following file.
> {quote}
> line 1
> line 2
> line 3
> {quote}
> And table definition
> {quote}
> create TABLE MyRawTable (
>  `doc` string,
>  ) WITH (
>   'path' = 'file:///path/to/data',
>   'format' = 'raw',
>'connector' = 'filesystem'
> );
> {quote}
> Selecting `*` from the table produces three rows all with "line 3" for `doc`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35098][ORC] Fix incorrect results with literal first expressions [flink]

2024-04-14 Thread via GitHub


empathy87 commented on PR #24659:
URL: https://github.com/apache/flink/pull/24659#issuecomment-2054089997

   > Hi @empathy87 thanks a lot for driving this PR. It looks good to me. Could 
you please add tests for each changelog in 'OrcFilters.java' ? Thanks!
   
   Sure! I reworked the tests to cover all changes in `OrcFilters.java`.

   Please, note that expressions like **10 > y** could not be tested by 
executing SQL queries. This is because Filesystem connector predicate push down 
works in a best effort manner. And if extra rows are returned from the ORC 
file, they will be further filtered out.
   
   Expressions like **10 > y** are tested in 
`OrcFileSystemFilterTest::testApplyPredicateReverse` to assert that it is 
converted to **y < 10** ORC filer predicate.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [pipeline-connector][paimon] add paimon pipeline data sink connector. [flink-cdc]

2024-04-14 Thread via GitHub


yuxiqian commented on code in PR #2916:
URL: https://github.com/apache/flink-cdc/pull/2916#discussion_r1564742486


##
docs/content/pipelines/paimon-pipeline.md:
##
@@ -0,0 +1,209 @@
+# Paimon Pipeline Connector
+
+The Paimon Pipeline connector can be used as the *Data Sink* of the pipeline, 
and write data to [Paimon](https://paimon.apache.org). This document describes 
how to set up the Paimon Pipeline connector.
+
+## What can the connector do?
+* Create table automatically if not exist
+* Schema change synchronization
+* Data synchronization
+
+How to create Pipeline
+
+
+The pipeline for reading data from MySQL and sink to Paimon can be defined as 
follows:
+
+```yaml
+source:
+   type: mysql
+   name: MySQL Source
+   hostname: 127.0.0.1
+   port: 3306
+   username: admin
+   password: pass
+   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
+   server-id: 5401-5404
+
+sink:
+  type: paimon
+  name: Paimon Sink
+  metastore: filesystem
+  warehouse: /path/warehouse
+
+pipeline:
+  name: MySQL to Paimon Pipeline
+  parallelism: 2
+```

Review Comment:
   Sink & Pipeline phase uses 2 spaces and source phase uses 3 spaces. Any 
reasons for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [pipeline-connector][paimon] add paimon pipeline data sink connector. [flink-cdc]

2024-04-14 Thread via GitHub


lvyanquan commented on code in PR #2916:
URL: https://github.com/apache/flink-cdc/pull/2916#discussion_r1564741797


##
docs/content/pipelines/paimon-pipeline.md:
##
@@ -0,0 +1,209 @@
+# Paimon Pipeline Connector
+
+The Paimon Pipeline connector can be used as the *Data Sink* of the pipeline, 
and write data to [Paimon](https://paimon.apache.org). This document describes 
how to set up the Paimon Pipeline connector.
+
+## What can the connector do?
+* Create table automatically if not exist
+* Schema change synchronization
+* Data synchronization
+
+How to create Pipeline
+
+
+The pipeline for reading data from MySQL and sink to Paimon can be defined as 
follows:
+
+```yaml
+source:
+   type: mysql
+   name: MySQL Source
+   hostname: 127.0.0.1
+   port: 3306
+   username: admin
+   password: pass
+   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
+   server-id: 5401-5404
+
+sink:
+  type: paimon
+  name: Paimon Sink
+  metastore: filesystem
+  warehouse: /path/warehouse
+
+pipeline:
+  name: MySQL to Paimon Pipeline
+  parallelism: 2
+```

Review Comment:
   This indentation is to maintain the definition of yaml.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [pipeline-connector][paimon] add paimon pipeline data sink connector. [flink-cdc]

2024-04-14 Thread via GitHub


lvyanquan commented on code in PR #2916:
URL: https://github.com/apache/flink-cdc/pull/2916#discussion_r1564741417


##
docs/content/pipelines/paimon-pipeline(ZH).md:
##
@@ -0,0 +1,209 @@
+# Paimon Pipeline 连接器
+
+Paimon Pipeline 连接器可以用作 Pipeline 的 *Data 
Sink*,将数据写入[Paimon](https://paimon.apache.org)。 本文档介绍如何设置 Paimon Pipeline 连接器。
+
+## 连接器的功能
+* 自动建表
+* 表结构变更同步
+* 数据实时同步
+
+如何创建 Pipeline
+
+
+从 MySQL 读取数据同步到 Paimon 的 Pipeline 可以定义如下:
+
+```yaml
+source:
+   type: mysql
+   name: MySQL Source
+   hostname: 127.0.0.1
+   port: 3306
+   username: admin
+   password: pass
+   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
+   server-id: 5401-5404
+
+sink:
+  type: paimon
+  name: Paimon Sink
+  metastore: filesystem
+  warehouse: /path/warehouse
+
+pipeline:
+   name: MySQL to Paimon Pipeline
+   parallelism: 2
+```

Review Comment:
   This indentation is to maintain the definition of yaml.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35097][table] Fix 'raw' format deserialization [flink]

2024-04-14 Thread via GitHub


flinkbot commented on PR #24661:
URL: https://github.com/apache/flink/pull/24661#issuecomment-2054068968

   
   ## CI report:
   
   * b3c8502a9ca370eab1c023588571f6f3fc18addd UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35097) Table API Filesystem connector with 'raw' format repeats last line

2024-04-14 Thread Kumar Mallikarjuna (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836959#comment-17836959
 ] 

Kumar Mallikarjuna commented on FLINK-35097:


[~david.perkins] , I've raised a fix here:

https://github.com/apache/flink/pull/24661

> Table API Filesystem connector with 'raw' format repeats last line
> --
>
> Key: FLINK-35097
> URL: https://issues.apache.org/jira/browse/FLINK-35097
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.17.1
> Environment: I ran the above test with 1.17.1. I checked for existing 
> bug tickets and release notes, but did not find anything, so assuming this 
> effects 1.18 and 1.19.
>Reporter: David Perkins
>Priority: Major
>  Labels: pull-request-available
>
> When using the Filesystem connector with 'raw' format to read text data that 
> contains new lines, a row is returned for every line, but always contains the 
> contents of the last line.
> For example, with the following file.
> {quote}
> line 1
> line 2
> line 3
> {quote}
> And table definition
> {quote}
> create TABLE MyRawTable (
>  `doc` string,
>  ) WITH (
>   'path' = 'file:///path/to/data',
>   'format' = 'raw',
>'connector' = 'filesystem'
> );
> {quote}
> Selecting `*` from the table produces three rows all with "line 3" for `doc`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35097][table] Fix 'raw' format deserialization [flink]

2024-04-14 Thread via GitHub


kumar-mallikarjuna opened a new pull request, #24661:
URL: https://github.com/apache/flink/pull/24661

   
   
   ## What is the purpose of the change
   
   Fixes broken deserialization of the `raw` format.
   
   ## Brief change log
   
   The existing `reuse` attribute is returned by the `deserialize()` method in 
`RawFormatDeserializationSchema`. The attribute is successively modified during 
deserialization but since it's returned by reference in the method, all the 
returned deserializations refer to a single value. This change removes the 
attributes and instead constructs the returned value in the `deserialize()` 
method inside the method itself, thus avoiding overwrites.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   **Existing Behavior**
   1. Create a test file with the content:
   ```
   line 1
   line 2
   line3
   ```
   2. Run
   ```SQL
   CREATE TABLE MyRawTable (
   `doc` string,
   ) WITH (
   'path' = 'file:///path/to/data',
   'format' = 'raw',
   'connector' = 'filesystem'
   );
   ```
   3. Check inserted values
   ```SQL
   SELECT * FROM MyRawTable;
   ```
   ```
   doc
   -
   line 3
   line 3
   line 3
   ```
   
   **After the Change**
   ```SQL
   SELECT * FROM MyRawTable;
   ```
   ```
   doc
   -
   line 1
   line 2
   line 3
   ```
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (yes)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35097) Table API Filesystem connector with 'raw' format repeats last line

2024-04-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35097:
---
Labels: pull-request-available  (was: )

> Table API Filesystem connector with 'raw' format repeats last line
> --
>
> Key: FLINK-35097
> URL: https://issues.apache.org/jira/browse/FLINK-35097
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.17.1
> Environment: I ran the above test with 1.17.1. I checked for existing 
> bug tickets and release notes, but did not find anything, so assuming this 
> effects 1.18 and 1.19.
>Reporter: David Perkins
>Priority: Major
>  Labels: pull-request-available
>
> When using the Filesystem connector with 'raw' format to read text data that 
> contains new lines, a row is returned for every line, but always contains the 
> contents of the last line.
> For example, with the following file.
> {quote}
> line 1
> line 2
> line 3
> {quote}
> And table definition
> {quote}
> create TABLE MyRawTable (
>  `doc` string,
>  ) WITH (
>   'path' = 'file:///path/to/data',
>   'format' = 'raw',
>'connector' = 'filesystem'
> );
> {quote}
> Selecting `*` from the table produces three rows all with "line 3" for `doc`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35097) Table API Filesystem connector with 'raw' format repeats last line

2024-04-14 Thread Kumar Mallikarjuna (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836952#comment-17836952
 ] 

Kumar Mallikarjuna commented on FLINK-35097:


> assuming this effects 1.18 and 1.19.

This is reproducible on `master`. I'll look into a fix.

> Table API Filesystem connector with 'raw' format repeats last line
> --
>
> Key: FLINK-35097
> URL: https://issues.apache.org/jira/browse/FLINK-35097
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.17.1
> Environment: I ran the above test with 1.17.1. I checked for existing 
> bug tickets and release notes, but did not find anything, so assuming this 
> effects 1.18 and 1.19.
>Reporter: David Perkins
>Priority: Major
>
> When using the Filesystem connector with 'raw' format to read text data that 
> contains new lines, a row is returned for every line, but always contains the 
> contents of the last line.
> For example, with the following file.
> {quote}
> line 1
> line 2
> line 3
> {quote}
> And table definition
> {quote}
> create TABLE MyRawTable (
>  `doc` string,
>  ) WITH (
>   'path' = 'file:///path/to/data',
>   'format' = 'raw',
>'connector' = 'filesystem'
> );
> {quote}
> Selecting `*` from the table produces three rows all with "line 3" for `doc`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34903) Add mysql-pipeline-connector with tables.exclude option to exclude unnecessary tables

2024-04-14 Thread Thorne (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thorne updated FLINK-34903:
---
Summary: Add mysql-pipeline-connector with tables.exclude option to exclude 
unnecessary tables  (was: Add mysql-pipeline-connector with  table.exclude.list 
option to exclude unnecessary tables )

> Add mysql-pipeline-connector with tables.exclude option to exclude 
> unnecessary tables
> -
>
> Key: FLINK-34903
> URL: https://issues.apache.org/jira/browse/FLINK-34903
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: Thorne
>Priority: Major
>  Labels: cdc, pull-request-available
> Fix For: cdc-3.1.0
>
> Attachments: screenshot-1.png, screenshot-2.png, screenshot-3.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
>     When using the MySQL Pipeline connector for whole-database 
> synchronization, users currently cannot exclude unnecessary tables. Taking 
> reference from Debezium's parameters, specifically the 
> {*}table.exclude.list{*}, if the *table.include.list* is declared, then the 
> *table.exclude.list* parameter will not take effect. However, the tables 
> specified in the tables parameter of the MySQL Pipeline connector are 
> effectively added to the *table.include.list* in Debezium's context.
> !screenshot-1.png!
> !screenshot-2.png|width=834,height=86!
> debezium opthion  desc
> !screenshot-3.png|width=831,height=217!
>     In summary, it is necessary to introduce an externally-exposed 
> *table.exclude.list* parameter within the MySQL Pipeline connector to 
> facilitate the exclusion of tables. This is because the current setup does 
> not allow for excluding unnecessary tables when including others through the 
> tables parameter.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34903) Add mysql-pipeline-connector with tables.exclude option to exclude unnecessary tables

2024-04-14 Thread Thorne (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thorne updated FLINK-34903:
---
Release Note: [features] Add mysql-pipeline-connector with  tables.exclude 
option to exclude unnecessary tables   (was: [features] Add 
mysql-pipeline-connector with  table.exclude.list option to exclude unnecessary 
tables )

> Add mysql-pipeline-connector with tables.exclude option to exclude 
> unnecessary tables
> -
>
> Key: FLINK-34903
> URL: https://issues.apache.org/jira/browse/FLINK-34903
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: Thorne
>Priority: Major
>  Labels: cdc, pull-request-available
> Fix For: cdc-3.1.0
>
> Attachments: screenshot-1.png, screenshot-2.png, screenshot-3.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
>     When using the MySQL Pipeline connector for whole-database 
> synchronization, users currently cannot exclude unnecessary tables. Taking 
> reference from Debezium's parameters, specifically the 
> {*}table.exclude.list{*}, if the *table.include.list* is declared, then the 
> *table.exclude.list* parameter will not take effect. However, the tables 
> specified in the tables parameter of the MySQL Pipeline connector are 
> effectively added to the *table.include.list* in Debezium's context.
> !screenshot-1.png!
> !screenshot-2.png|width=834,height=86!
> debezium opthion  desc
> !screenshot-3.png|width=831,height=217!
>     In summary, it is necessary to introduce an externally-exposed 
> *table.exclude.list* parameter within the MySQL Pipeline connector to 
> facilitate the exclusion of tables. This is because the current setup does 
> not allow for excluding unnecessary tables when including others through the 
> tables parameter.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-30687][table] Support pushdown for aggregate filters [flink]

2024-04-14 Thread via GitHub


flinkbot commented on PR #24660:
URL: https://github.com/apache/flink/pull/24660#issuecomment-2054018936

   
   ## CI report:
   
   * e5e78f553679c92f336b4f4fdccb31b95dd7bad9 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34903][MySQL][Feature] Add mysql-pipeline-connector with table.exclude.list option to exclude unnecessary tables [flink-cdc]

2024-04-14 Thread via GitHub


shiyiky commented on PR #3186:
URL: https://github.com/apache/flink-cdc/pull/3186#issuecomment-2054018232

   @PatrickRen  I have done it .Please review it if u have free time ,tks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-30687][table] Support pushdown for aggregate filters [flink]

2024-04-14 Thread via GitHub


jeyhunkarimov opened a new pull request, #24660:
URL: https://github.com/apache/flink/pull/24660

   ## What is the purpose of the change
   
   This PR supports pushing down filters for aggregate expressions.
   Not that this optimization works if there is single aggregate expression 
with filter. Otherwise, pushing down filters leads incorrect results. 
   
   ## Brief change log
   
 - Introduce `AggregateFilterPushdownRule`
 - Implement tests
   
   
   ## Verifying this change
   
   Via tests in 
`org.apache.flink.table.planner.plan.rules.logical.AggregateFilterPushdownRuleTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): (no )
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30687) FILTER not effect in count(*)

2024-04-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30687:
---
Labels: pull-request-available  (was: )

> FILTER not effect in count(*)
> -
>
> Key: FLINK-30687
> URL: https://issues.apache.org/jira/browse/FLINK-30687
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: tanjialiang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-01-16-10-54-04-673.png
>
>
> When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' 
> is not effect.
> {code:java}
> CREATE TABLE student
> (
> id INT NOT NULL,
> name STRING,
> class_id INT NOT NULL
> )
> WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://localhost:3306/test',
> 'table-name' = 'student',
> 'username' = 'root',
> 'password' = '12345678'
> );
> SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;
> or
> SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code}
>  
> But when i tried Flink SQL like this, it worked.
> {code:java}
> SELECT COUNT(*) FROM student WHERE class_id = 1;
> or 
> SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
>  
> By the way, mysql connector has a bug and fixed in 
> https://issues.apache.org/jira/browse/FLINK-29558. Maybe you try this demo 
> should cherry-pick this PR first.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] add support for protobuf-confluent [flink]

2024-04-14 Thread via GitHub


anupamaggarwal commented on code in PR #24482:
URL: https://github.com/apache/flink/pull/24482#discussion_r1564640169


##
flink-formats/flink-protobuf-confluent-registry/src/main/java/org/apache/flink/formats/protobuf/registry/confluent/SchemaCoderProviders.java:
##
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.registry.confluent;
+
+import 
org.apache.flink.formats.protobuf.registry.confluent.utils.FlinkToProtoSchemaConverter;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.schemaregistry.protobuf.MessageIndexes;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static java.lang.String.format;
+
+/** Factory for {@link 
org.apache.flink.formats.protobuf.registry.confluent.SchemaCoder}. */
+public class SchemaCoderProviders {
+
+/**
+ * Creates a {@link 
org.apache.flink.formats.protobuf.registry.confluent.SchemaCoder} in cases
+ * where the schema has already been setup before-hand/exists in Confluent 
Schema Registry.
+ *
+ * Useful in scenarios where users want to be more explicit with 
schemas used. In these cases
+ * the external schema specified through schemaId will take precedence for 
encoding/decoding
+ * data. Also, the step of registering with schemaRegistry during 
serialization, will be
+ * skipped.
+ *
+ * A single Schema Registry Protobuf entry may contain multiple 
Protobuf messages, some of
+ * which may have nested messages. The messageName identifies the exact 
message/schema to use
+ * for serialization/deserialization. Consider the following protobuf 
message
+ *
+ * 
+ * package test.package;
+ * message MessageA {
+ * message MessageB {
+ * message MessageC {
+ * ...
+ * }
+ * }
+ * message MessageD {
+ * ...
+ * }
+ * message MessageE {
+ * message MessageF {
+ * ...
+ * }
+ * message MessageG {
+ * ...
+ * }
+ * ...
+ * }
+ * ...
+ * }
+ * 
+ *
+ * In order to use messageD the messageName should contain the value of
+ * test.package.messageD. Similarly, for messageF to be used messageName 
should contain
+ * test.package.MessageE.MessageF.
+ *
+ * @param schemaId SchemaId for external schema referenced for 
encoding/decoding of payload.
+ * @param messageName Optional message name to be used to select the right 
{@link
+ * com.google.protobuf.Message} for Serialialization/Deserialization. 
In absence of
+ * messageName the outermost message will be used.
+ * @param schemaRegistryClient client handle to Schema Registry {@link
+ * io.confluent.kafka.schemaregistry.client.SchemaRegistryClient}
+ * @return
+ */
+public static SchemaCoder createForPreRegisteredSchema(
+int schemaId, @Nullable String messageName, SchemaRegistryClient 
schemaRegistryClient) {
+return new PreRegisteredSchemaCoder(schemaId, messageName, 
schemaRegistryClient);
+}
+
+/**
+ * Creates a default schema coder.
+ *
+ * For serialization schema coder will infer the schema from Flink 
{@link
+ * org.apache.flink.table.types.logical.RowType}. Schema obtained from 
rowType will also be
+ * registered to Schema Registry using the subject passed in by invoking 
{@link
+ * 
io.confluent.kafka.schemaregistry.client.SchemaRegistryClient#register(String,
+ * 

Re: [PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] add support for protobuf-confluent [flink]

2024-04-14 Thread via GitHub


anupamaggarwal commented on code in PR #24482:
URL: https://github.com/apache/flink/pull/24482#discussion_r1564639354


##
flink-formats/flink-protobuf-confluent-registry/src/test/java/org/apache/flink/formats/protobuf/registry/confluent/utils/FlinkToProtoSchemaConverterTest.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.registry.confluent.utils;
+
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.util.TestLoggerExtension;
+
+import com.google.protobuf.Descriptors.Descriptor;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Collections;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FlinkToProtoSchemaConverter}. */
+@ExtendWith(TestLoggerExtension.class)
+class FlinkToProtoSchemaConverterTest {

Review Comment:
   Hi @klam-shop, I don't think I understood your comment completely, if we are 
converting from the rowType the protobuf schema will only have 1 (root) 
element. IIUC, we won't have a case which is similar to what is outlined in 
[wire-format](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format)
 example.
   I added a test testMessageIndexHandlingInferredFromRowTypeWithConnectDecoder 
to verify if connect's deserializer is happy with empty message index. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-31992) FlinkKafkaConsumer API is suggested to use as part of documentation, when that API is deprecated for flink version 1.14

2024-04-14 Thread Sandesh Mendan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17756352#comment-17756352
 ] 

Sandesh Mendan edited comment on FLINK-31992 at 4/14/24 11:07 AM:
--

Since this issue has been corrected in higher flink version, I am proceeding to 
close this ticket


was (Author: JIRAUSER300172):
Since this issue has been corrected in higher flink version, I am proceeding to 
this ticket

> FlinkKafkaConsumer API is suggested to use as part of documentation, when 
> that API is deprecated for flink version 1.14
> ---
>
> Key: FLINK-31992
> URL: https://issues.apache.org/jira/browse/FLINK-31992
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.14.2
>Reporter: Sandesh Mendan
>Priority: Minor
>  Labels: auto-deprioritized-major, documentation, 
> documentation-update, good-first-issue, newbie
>
> In Flink version 1.14, even though the API class FlinkKafkaConsumer had been 
> [deprecated|https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/],
>  the official 
> [documentation|https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector]
>  suggests that API to use.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33859] Support OpenSearch v2 [flink-connector-opensearch]

2024-04-14 Thread via GitHub


joshbsemperis commented on PR #38:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/38#issuecomment-2054001884

   Can this be resolved and pushed? Is there a reason why it hasnt? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >