Re: [PR] [FLINK-35072][cdc][doris] Support applying compatible `AlterColumnTypeEvent` to Doris sink [flink-cdc]
yuxiqian commented on code in PR #3215: URL: https://github.com/apache/flink-cdc/pull/3215#discussion_r1565211096 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.doris.sink; + +import org.apache.doris.flink.catalog.doris.FieldSchema; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.exception.IllegalArgumentException; +import org.apache.doris.flink.sink.schema.SchemaChangeManager; + +import java.io.IOException; + +/** Provides schema change operations based on {@link SchemaChangeManager}. */ +public class DorisSchemaChangeManager extends SchemaChangeManager { +public DorisSchemaChangeManager(DorisOptions dorisOptions) { +super(dorisOptions); +} + +private static final String MODIFY_COLUMN_DDL = "ALTER TABLE %s MODIFY COLUMN %s %s"; + +public boolean alterColumn( +String database, String table, String columnName, String newColumnType) +throws IOException, IllegalArgumentException { +String tableIdentifier = String.format("%s.%s", database, table); +FieldSchema alterFieldSchema = new FieldSchema(columnName, newColumnType, ""); + +String alterColumnDDL = +String.format( +MODIFY_COLUMN_DDL, +tableIdentifier, +columnName, +alterFieldSchema.getTypeString()); + +try { +return this.schemaChange( +database, table, buildRequestParam(true, columnName), alterColumnDDL); +} catch (RuntimeException ex) { +if (ex.getMessage().contains("Nothing is changed. please check your alter stmt.")) { Review Comment: @lvyanquan Sure, but this error message comes from [Doris server side](https://github.com/apache/doris/blob/debb83f35d37cac9089b2e0282e2d555ad9c1d62/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java#L1508), which is wrapped as a generic `DorisSchemaChangeException` on client-side, and could only be identified by checking error message. Any suggestions about this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34962) flink-connector-pulsa starts failed due to incorrect use of Pulsar API: LookupService. getPartitionedTopicMetadata
[ https://issues.apache.org/jira/browse/FLINK-34962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837080#comment-17837080 ] Yufan Sheng commented on FLINK-34962: - [~Tison] This issue should be marked resolved. > flink-connector-pulsa starts failed due to incorrect use of Pulsar API: > LookupService. getPartitionedTopicMetadata > -- > > Key: FLINK-34962 > URL: https://issues.apache.org/jira/browse/FLINK-34962 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: pulsar-4.2.0, pulsar-4.1.1 > Environment: * flink 1.17 > * pulsar client 3.0.0 > * org.apache.flink:flink-connector-pulsar:4.1.0-1.17 (connector) >Reporter: Yubiao Feng >Priority: Major > Labels: easyfix, pull-request-available > > - The unnecessary codes calls > `pulsarClient.getLookup().getPartitionedTopicMetadata()` to create the > partitioned topic metadata(in fact, this behavior of is not correct) > - Why it is unnecessary: the [following > code]([https://github.com/apache/flink-connector-pulsar/blob/main/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java#L245]) > that is creating a producer will also trigger partitioned topic metadata to > create. > - The method `pulsarClient.getLookup().getPartitionedTopicMetadata()` will > not retry if the connection is closed so that users will get an error. The > following code creates a producer that will retry if the connection is > closed, reducing the probability of an error occurring. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35024][Runtime/State] Implement the record buffer of AsyncExecutionController [flink]
fredia merged PR #24633: URL: https://github.com/apache/flink/pull/24633 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34414) EXACTLY_ONCE guarantee doesn't work properly for Flink/Pulsar connector
[ https://issues.apache.org/jira/browse/FLINK-34414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837079#comment-17837079 ] Yufan Sheng commented on FLINK-34414: - I don't think this is a issue from my side. First of all {{.setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true)}} only works on the PulsarSink side. The source didn't use transaction for its poor performance, we have to drop the support of transaction in source. In your sample code, you have set {{.setConfig(PulsarSourceOptions.PULSAR_ACK_RECEIPT_ENABLED, true)}} and {{.setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, false)}}. This means the duplicated message could be get because the Pulsar can't the acknowledege from connector and resend the messages. So I think this is the root cause. > EXACTLY_ONCE guarantee doesn't work properly for Flink/Pulsar connector > > > Key: FLINK-34414 > URL: https://issues.apache.org/jira/browse/FLINK-34414 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.17.2 >Reporter: Rafał Trójczak >Priority: Major > > Using Pulsar connector for Flink (version 4.1.0-1.17) with Flink job (version > 1.17.2) when there is an exception thrown within the job, the job gets > restarted, starts from the last checkpoint, but the sink writes to the output > more events than it should, even though the EXACT_ONCE guarantees are set > everywhere. To be more specific, here is my Job's flow: > * a Pulsar source that reads from the input topic, > * a simple processing function, > * and a sink that writes to the output topic. > Here is a fragment of the source creation: > {code:java} > .setDeserializationSchema(Schema.AVRO(inClass), inClass) > .setSubscriptionName(subscription) > .enableSchemaEvolution() > .setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true) > .setConfig(PulsarSourceOptions.PULSAR_ACK_RECEIPT_ENABLED, true) > .setConfig(PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS, 1) > .setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, > false); > {code} > Here is the fragment of the sink creation: > {code:java} > .setSerializationSchema(Schema.AVRO(outClass), outClass) > .setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true) > .setConfig(PulsarSinkOptions.PULSAR_WRITE_DELIVERY_GUARANTEE, > DeliveryGuarantee.EXACTLY_ONCE) > .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE); > {code} > And here is the Flink environment preparation: > {code:java} > environment.setRuntimeMode(RuntimeExecutionMode.STREAMING); > environment.enableCheckpointing(CHECKPOINTING_INTERVAL, > CheckpointingMode.EXACTLY_ONCE); > {code} > After sending 1000 events on the input topic, on the output topic I got 1048 > events. > I ran the job on my local Kubernetes cluster, using Kubernetes Flink Operator. > Here is the MRE for this problem (mind that there is an internal dependency, > but it may be commented out together with the code that relies on it): > [https://github.com/trojczak/flink-pulsar-connector-problem] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35072][cdc][doris] Support applying compatible `AlterColumnTypeEvent` to Doris sink [flink-cdc]
yuxiqian commented on code in PR #3215: URL: https://github.com/apache/flink-cdc/pull/3215#discussion_r1565211096 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.doris.sink; + +import org.apache.doris.flink.catalog.doris.FieldSchema; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.exception.IllegalArgumentException; +import org.apache.doris.flink.sink.schema.SchemaChangeManager; + +import java.io.IOException; + +/** Provides schema change operations based on {@link SchemaChangeManager}. */ +public class DorisSchemaChangeManager extends SchemaChangeManager { +public DorisSchemaChangeManager(DorisOptions dorisOptions) { +super(dorisOptions); +} + +private static final String MODIFY_COLUMN_DDL = "ALTER TABLE %s MODIFY COLUMN %s %s"; + +public boolean alterColumn( +String database, String table, String columnName, String newColumnType) +throws IOException, IllegalArgumentException { +String tableIdentifier = String.format("%s.%s", database, table); +FieldSchema alterFieldSchema = new FieldSchema(columnName, newColumnType, ""); + +String alterColumnDDL = +String.format( +MODIFY_COLUMN_DDL, +tableIdentifier, +columnName, +alterFieldSchema.getTypeString()); + +try { +return this.schemaChange( +database, table, buildRequestParam(true, columnName), alterColumnDDL); +} catch (RuntimeException ex) { +if (ex.getMessage().contains("Nothing is changed. please check your alter stmt.")) { Review Comment: @lvyanquan Sure, but this error message comes from [Doris server side](https://github.com/apache/doris/blob/debb83f35d37cac9089b2e0282e2d555ad9c1d62/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java#L1508), which is wrapped as a generic `RuntimeException` on client-side, and could only be identified by checking error message. Any suggestions about this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35072][cdc][doris] Support applying compatible `AlterColumnTypeEvent` to Doris sink [flink-cdc]
yuxiqian commented on code in PR #3215: URL: https://github.com/apache/flink-cdc/pull/3215#discussion_r1565211096 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.doris.sink; + +import org.apache.doris.flink.catalog.doris.FieldSchema; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.exception.IllegalArgumentException; +import org.apache.doris.flink.sink.schema.SchemaChangeManager; + +import java.io.IOException; + +/** Provides schema change operations based on {@link SchemaChangeManager}. */ +public class DorisSchemaChangeManager extends SchemaChangeManager { +public DorisSchemaChangeManager(DorisOptions dorisOptions) { +super(dorisOptions); +} + +private static final String MODIFY_COLUMN_DDL = "ALTER TABLE %s MODIFY COLUMN %s %s"; + +public boolean alterColumn( +String database, String table, String columnName, String newColumnType) +throws IOException, IllegalArgumentException { +String tableIdentifier = String.format("%s.%s", database, table); +FieldSchema alterFieldSchema = new FieldSchema(columnName, newColumnType, ""); + +String alterColumnDDL = +String.format( +MODIFY_COLUMN_DDL, +tableIdentifier, +columnName, +alterFieldSchema.getTypeString()); + +try { +return this.schemaChange( +database, table, buildRequestParam(true, columnName), alterColumnDDL); +} catch (RuntimeException ex) { +if (ex.getMessage().contains("Nothing is changed. please check your alter stmt.")) { Review Comment: @lvyanquan Sure, but this error message comes from [Doris API server side](https://github.com/apache/doris/blob/debb83f35d37cac9089b2e0282e2d555ad9c1d62/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java#L1508), which is wrapped as a generic `RuntimeException` on client-side, and could only be identified by checking error message. Any suggestions about this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-33884) Update Pulsar dependency to 3.0.2 in Pulsar Connector
[ https://issues.apache.org/jira/browse/FLINK-33884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zili Chen resolved FLINK-33884. --- Fix Version/s: pulsar-4.2.0 Resolution: Fixed Master via https://github.com/apache/flink-connector-pulsar/commit/9f4b902c2a478d0105eec1e32bac3ea40f318d00 > Update Pulsar dependency to 3.0.2 in Pulsar Connector > - > > Key: FLINK-33884 > URL: https://issues.apache.org/jira/browse/FLINK-33884 > Project: Flink > Issue Type: Improvement > Components: Connectors / Pulsar >Affects Versions: pulsar-4.0.1 >Reporter: David Christle >Assignee: David Christle >Priority: Major > Labels: pull-request-available > Fix For: pulsar-4.2.0 > > > The [3.0.2 > patch|https://pulsar.apache.org/release-notes/versioned/pulsar-3.0.2/] > includes various bug fixes, including a few for the Pulsar client (e.g. > [link]([https://github.com/apache/pulsar/pull/21144)). Upgrading the > dependency in the connector will pick up these fixes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33884] Update Pulsar dependency to 3.0.2 in Pulsar Connector [flink-connector-pulsar]
tisonkun merged PR #72: URL: https://github.com/apache/flink-connector-pulsar/pull/72 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-34629) Pulsar source lost topic subscribe
[ https://issues.apache.org/jira/browse/FLINK-34629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zili Chen resolved FLINK-34629. --- Fix Version/s: pulsar-4.2.0 Resolution: Fixed Master via https://github.com/apache/flink-connector-pulsar/commit/7a5eef268cb3f598589ad9cc32648ac92fbbee1d > Pulsar source lost topic subscribe > -- > > Key: FLINK-34629 > URL: https://issues.apache.org/jira/browse/FLINK-34629 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: pulsar-3.0.1 >Reporter: WangMinChao >Assignee: WangMinChao >Priority: Major > Labels: pull-request-available > Fix For: pulsar-4.2.0 > > > The non-partition pulsar topic partition id is `-1`, using multiples of the > non-partition topics > in Pulsar source maybe lose topic subscribe. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34436) Avro schema evolution and compatibility issues in Pulsar connector
[ https://issues.apache.org/jira/browse/FLINK-34436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837073#comment-17837073 ] Yufan Sheng commented on FLINK-34436: - I think this is not a issue. It's just a design on purpose. The schema evolution on Pulsar doesn't get proper designed. We just send the desired schema to Pulsar server and let the validation be performed on server side. The client side shouldn't perform any validation. Instead, we just use the schema provided by user for Pub/Sub messages. Sometimes, we even bypass the schema with pure bytes for (de)serializing messages on flink. > Avro schema evolution and compatibility issues in Pulsar connector > -- > > Key: FLINK-34436 > URL: https://issues.apache.org/jira/browse/FLINK-34436 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.17.2 >Reporter: Jacek Wislicki >Priority: Major > > We noticed a couple of critical issues in the Pulsar-Flink connector related > to schema evolution and compatibility. Please see the MRE available at > https://github.com/JacekWislicki/test11. More details are in the project's > README file, here is the summary: > Library versions: > * Pulsar 3.0.1 > * Flink 1.17.2 > * Pulsar-Flink connector 4.1.0-1.17 > Problems: > * Exception thrown when schema's fields are added/removed > * Avro's enum default value is ignored, instead the last known applied > I believe that I observed the same behaviour in the Pulsar itself, still now > we are focusing on the connector, hence I was able to document the problems > when using it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34629] Fix invalid subtask assignment for non-partitioned topics [flink-connector-pulsar]
boring-cyborg[bot] commented on PR #85: URL: https://github.com/apache/flink-connector-pulsar/pull/85#issuecomment-2055403591 Awesome work, congrats on your first merged pull request! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34629] Fix invalid subtask assignment for non-partitioned topics [flink-connector-pulsar]
tisonkun merged PR #85: URL: https://github.com/apache/flink-connector-pulsar/pull/85 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-25537][JUnit5 Migration] Module: flink-core with,Package: Configuration [flink]
Jiabao-Sun commented on code in PR #24612: URL: https://github.com/apache/flink/pull/24612#discussion_r1565204190 ## flink-core/src/test/java/org/apache/flink/configuration/ConfigurationConversionsTest.java: ## @@ -52,7 +51,9 @@ public class ConfigurationConversionsTest { private Configuration pc; -@Before +@Parameter private TestSpec testSpec; + +@BeforeEach public void init() { Review Comment: The visibility of class and methods can be package default. ## flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java: ## @@ -21,83 +21,75 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.UnsupportedFileSystemSchemeException; import org.apache.flink.core.fs.local.LocalFileSystem; -import org.apache.flink.util.TestLogger; -import org.junit.After; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; -import java.io.IOException; +import java.io.File; import java.net.URI; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for the configuration of the default file system scheme. */ -public class FilesystemSchemeConfigTest extends TestLogger { +public class FilesystemSchemeConfigTest { -@Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); +@TempDir public File tempFolder; Review Comment: private ## flink-core/src/test/java/org/apache/flink/configuration/ConfigurationParsingInvalidFormatsTest.java: ## @@ -64,35 +66,35 @@ public static Object[][] getSpecs() { }; } -@Parameterized.Parameter public ConfigOption option; +@Parameter public ConfigOption option; Review Comment: visibility can be private. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35064) Flink sql connector pulsar/hive com.fasterxml.jackson.annotation.JsonFormat$Value conflict
[ https://issues.apache.org/jira/browse/FLINK-35064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837068#comment-17837068 ] Yufan Sheng commented on FLINK-35064: - I have made a comment in the related PR. I just copy/paste it here for reference. The {{pulsar-client-all}} bundle its Jackson internally with a package named {{org.apache.pulsar.shade.com.fasterxml}}. The jackson used in flink shouldn't be shaded into the same package name. Or it may cause more severe issues in class conflicting. I don't think this is a good idea. The root cause is from pulsar client side with the Jackson shading issues. Shade a jackson causes a lot of issues in developing and running the flink-sql-connector-pulsar before we (StreamNative) donate it to flink community. I think we should ask pulsar do not shade the {{jackson-annotation}} in client for fixing this problem. > Flink sql connector pulsar/hive > com.fasterxml.jackson.annotation.JsonFormat$Value conflict > -- > > Key: FLINK-35064 > URL: https://issues.apache.org/jira/browse/FLINK-35064 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive, Connectors / Pulsar >Affects Versions: 1.16.1 >Reporter: elon_X >Priority: Major > Labels: pull-request-available > > When I compile and package {{flink-sql-connector-pulsar}} & > {{{}flink-sql-connector-hive{}}}, and then put these two jar files into the > Flink lib directory, I execute the following SQL statement through > {{{}bin/sql-client.sh{}}}: > > {code:java} > // code placeholder > CREATE TABLE > pulsar_table ( > content string, > proc_time AS PROCTIME () > ) > WITH > ( > 'connector' = 'pulsar', > 'topics' = 'persistent://xxx', > 'service-url' = 'pulsar://xxx', > 'source.subscription-name' = 'xxx', > 'source.start.message-id' = 'latest', > 'format' = 'csv', > 'pulsar.client.authPluginClassName' = > 'org.apache.pulsar.client.impl.auth.AuthenticationToken', > 'pulsar.client.authParams' = 'token:xxx' > ); > > select * from pulsar_table; {code} > The task error exception stack is as follows: > > {code:java} > Caused by: java.lang.NoSuchMethodError: > com.fasterxml.jackson.annotation.JsonFormat$Value.empty()Lcom/fasterxml/jackson/annotation/JsonFormat$Value; > at > org.apache.pulsar.shade.com.fasterxml.jackson.databind.cfg.MapperConfig.(MapperConfig.java:56) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectMapper.(ObjectMapper.java:660) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectMapper.(ObjectMapper.java:576) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.pulsar.common.util.ObjectMapperFactory.createObjectMapperInstance(ObjectMapperFactory.java:151) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.pulsar.common.util.ObjectMapperFactory.(ObjectMapperFactory.java:142) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.pulsar.client.impl.conf.ConfigurationDataUtils.create(ConfigurationDataUtils.java:35) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.pulsar.client.impl.conf.ConfigurationDataUtils.(ConfigurationDataUtils.java:43) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.pulsar.client.impl.ClientBuilderImpl.loadConf(ClientBuilderImpl.java:77) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient(PulsarClientFactory.java:105) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator.(PulsarSourceEnumerator.java:95) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator.(PulsarSourceEnumerator.java:76) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.flink.connector.pulsar.source.PulsarSource.createEnumerator(PulsarSource.java:144) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:213) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > {code} > > The exception shows a conflict with > {{{}com.fasterxml.jackson.annotation.JsonFormat$Value{}}}. I investigated and > found that {{flink-sql-connector-pulsar}} and {{flink-sql-connector-hive}} > depend on different versions, leading to this conflict. > {code:java} > // flink-sql-connector-pulsar pom.xml > >
Re: [PR] [FLINK-35064] Update Pulsar dependency to solve the conflict of com.f… [flink-connector-pulsar]
syhily commented on PR #88: URL: https://github.com/apache/flink-connector-pulsar/pull/88#issuecomment-2055283881 The root cause is from pulsar client side with the Jackson shading issues. Shade a jackson causes a lot of issues. I think we should ask pulsar client not to shade the jackson. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34629] Fix non-partition topic subscribe lost. [flink-connector-pulsar]
syhily commented on PR #84: URL: https://github.com/apache/flink-connector-pulsar/pull/84#issuecomment-2055266431 I'm in favor of the https://github.com/apache/flink-connector-pulsar/pull/85, maybe we can continue the review progress on that PR and close this PR instead. WDT @minchowang ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33884] Update Pulsar dependency to 3.0.2 in Pulsar Connector [flink-connector-pulsar]
syhily commented on PR #72: URL: https://github.com/apache/flink-connector-pulsar/pull/72#issuecomment-2055251321 Thanks, there are no issues from my side. @tisonkun Can you help me double check this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35064] Update Pulsar dependency to solve the conflict of com.f… [flink-connector-pulsar]
syhily commented on PR #88: URL: https://github.com/apache/flink-connector-pulsar/pull/88#issuecomment-2055227659 Sorry, I don't think this PR is accepted, indeed. IIUC, the `pulsar-client-all` bundle a Jackson internally with a package named `org.apache.pulsar.shade.com.fasterxml`. The normal jackson can't be shaded into the same package name. Or it may cause more severe issues in class conflicting. I don't think this is a good idea. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow specifying the ServerSocket port for the collect function when accessing the TaskManager from the client. [flink]
hzjhjjyy closed pull request #23254: Allow specifying the ServerSocket port for the collect function when accessing the TaskManager from the client. URL: https://github.com/apache/flink/pull/23254 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35072][cdc][doris] Support applying compatible `AlterColumnTypeEvent` to Doris sink [flink-cdc]
lvyanquan commented on code in PR #3215: URL: https://github.com/apache/flink-cdc/pull/3215#discussion_r1565182494 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.doris.sink; + +import org.apache.doris.flink.catalog.doris.FieldSchema; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.exception.IllegalArgumentException; +import org.apache.doris.flink.sink.schema.SchemaChangeManager; + +import java.io.IOException; + +/** Provides schema change operations based on {@link SchemaChangeManager}. */ +public class DorisSchemaChangeManager extends SchemaChangeManager { +public DorisSchemaChangeManager(DorisOptions dorisOptions) { +super(dorisOptions); +} + +private static final String MODIFY_COLUMN_DDL = "ALTER TABLE %s MODIFY COLUMN %s %s"; + +public boolean alterColumn( +String database, String table, String columnName, String newColumnType) +throws IOException, IllegalArgumentException { +String tableIdentifier = String.format("%s.%s", database, table); +FieldSchema alterFieldSchema = new FieldSchema(columnName, newColumnType, ""); + +String alterColumnDDL = +String.format( +MODIFY_COLUMN_DDL, +tableIdentifier, +columnName, +alterFieldSchema.getTypeString()); + +try { +return this.schemaChange( +database, table, buildRequestParam(true, columnName), alterColumnDDL); +} catch (RuntimeException ex) { +if (ex.getMessage().contains("Nothing is changed. please check your alter stmt.")) { Review Comment: Considering future maintenance, could you please add some description about the source code of this error message? and is there a method to determine through exception class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]
fredia commented on code in PR #24651: URL: https://github.com/apache/flink/pull/24651#discussion_r1565164680 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java: ## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import javax.annotation.Nonnull; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class for state descriptors. A {@code StateDescriptor} is used for creating partitioned + * State in stateful operations internally. + * + * @param The type of the value of the state object described by this state descriptor. + */ +@Internal +public abstract class StateDescriptor implements Serializable { + +private static final long serialVersionUID = 1L; + +/** An enumeration of the types of supported states. */ +public enum Type { +VALUE, +LIST, +REDUCING, +FOLDING, +AGGREGATING, +MAP +} + +/** ID that uniquely identifies state created from this StateDescriptor. */ +@Nonnull private final String stateId; Review Comment: How is `stateId` generated? If the sync state and async state co-exist, can they use the same `stateId`? ## flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.StateRequestType; + +/** + * The {@code InternalKeyedState} is the root of the internal state type hierarchy, similar to the + * {@link State} being the root of the public API state hierarchy. + * + * The public API state hierarchy is intended to be programmed against by Flink applications. The + * internal state hierarchy holds all the auxiliary methods that communicates with {@link + * AsyncExecutionController} and not intended to be used by user applications. + * + * @param The type of key the state is associated to. + * @param The type of values kept internally in state. + */ +@Internal +public abstract class InternalKeyedState implements State { Review Comment: Is the `namespace` required here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]
Zakelly commented on code in PR #24651: URL: https://github.com/apache/flink/pull/24651#discussion_r1565167042 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java: ## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class for state descriptors. A {@code StateDescriptor} is used for creating partitioned + * State in stateful operations internally. + * + * @param The type of the value of the state object described by this state descriptor. + */ +@Internal +public abstract class StateDescriptor implements Serializable { + +private static final long serialVersionUID = 1L; + +/** An enumeration of the types of supported states. */ +public enum Type { +VALUE, +LIST, +REDUCING, +FOLDING, +AGGREGATING, +MAP +} + +/** ID that uniquely identifies state created from this StateDescriptor. */ +@Nonnull private final String stateId; + +/** The serializer for the type. */ +@Nonnull private final TypeSerializer typeSerializer; + +/** + * The type information describing the value type. Remain this since it could provide more + * information which could be used internally in the future. + */ +@Nonnull private final TypeInformation typeInfo; Review Comment: I thought the statement `which could be used internally in the future` means it will be used to create the serializer lazily, which no longer applies to current implementation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34936][Checkpointing] Register reused shared state handle to FileMergingSnapshotManager [flink]
Zakelly commented on code in PR #24644: URL: https://github.com/apache/flink/pull/24644#discussion_r1565161944 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java: ## @@ -459,13 +470,28 @@ public void notifyCheckpointSubsumed(SubtaskKey subtaskKey, long checkpointId) uploadedStates.headMap(checkpointId, true).entrySet().iterator(); while (uploadedStatesIterator.hasNext()) { Map.Entry> entry = uploadedStatesIterator.next(); -if (discardLogicalFiles(subtaskKey, entry.getKey(), entry.getValue())) { +if (discardLogicalFiles(subtaskKey, checkpointId, entry.getValue())) { Review Comment: FYI: This is a bug found by newly added UTs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]
masteryhx commented on code in PR #24651: URL: https://github.com/apache/flink/pull/24651#discussion_r1565160755 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java: ## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class for state descriptors. A {@code StateDescriptor} is used for creating partitioned + * State in stateful operations internally. + * + * @param The type of the value of the state object described by this state descriptor. + */ +@Internal +public abstract class StateDescriptor implements Serializable { + +private static final long serialVersionUID = 1L; + +/** An enumeration of the types of supported states. */ +public enum Type { +VALUE, +LIST, +REDUCING, +FOLDING, +AGGREGATING, +MAP +} + +/** ID that uniquely identifies state created from this StateDescriptor. */ +@Nonnull private final String stateId; + +/** The serializer for the type. */ +@Nonnull private final TypeSerializer typeSerializer; + +/** + * The type information describing the value type. Remain this since it could provide more + * information which could be used internally in the future. + */ +@Nonnull private final TypeInformation typeInfo; Review Comment: I just remain this since it could provide/persist more information e.g. schema than TypeSerializer as its comment. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35046][state] Introduce AsyncKeyedStateBackend supporting to create StateExecutor [flink]
Zakelly commented on code in PR #24663: URL: https://github.com/apache/flink/pull/24663#discussion_r1565158902 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java: ## @@ -104,6 +104,25 @@ default String getName() { CheckpointableKeyedStateBackend createKeyedStateBackend( KeyedStateBackendParameters parameters) throws Exception; +/** + * Creates a new {@link AsyncKeyedStateBackend} which supports to access keyed state + * asynchronously. + * + * Keyed State is state where each value is bound to a key. + * + * @param parameters The arguments bundle for creating {@link AsyncKeyedStateBackend}. + * @param The type of the keys by which the state is organized. + * @return The Async Keyed State Backend for the given job, operator. + * @throws Exception This method may forward all exceptions that occur while instantiating the + * backend. + */ +@Experimental +default AsyncKeyedStateBackend createAsyncKeyedStateBackend( +KeyedStateBackendParameters parameters) throws Exception { +throw new UnsupportedOperationException( +"Don't support createAsyncKeyedStateBackend by default"); +} + Review Comment: Should we add another method `isSupportAsyncKeyedStateBackend` within `StateBackend`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32440][checkpoint] Introduce file merging configurations [flink]
Zakelly commented on code in PR #22973: URL: https://github.com/apache/flink/pull/22973#discussion_r1565156390 ## docs/layouts/shortcodes/generated/checkpointing_configuration.html: ## @@ -44,6 +44,42 @@ String The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only the meta data of checkpoints will be stored in this directory. + + state.checkpoints.file-merging.across-checkpoint-boundary +false +Boolean +Only relevant if state.checkpoints.file-merging.enabled is enabled.Whether to allow merging data of multiple checkpoints into one physical file. If this option is set to false, only merge files within checkpoint boundaries will be merged. Otherwise, it is possible for the logical files of different checkpoints to share the same physical file. + + +state.checkpoints.file-merging.enabled Review Comment: I see. Thus we leave it be. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-25537][JUnit5 Migration] Module: flink-core with,Package: Configuration [flink]
1996fanrui commented on code in PR #24612: URL: https://github.com/apache/flink/pull/24612#discussion_r1565149231 ## flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java: ## @@ -21,83 +21,79 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.UnsupportedFileSystemSchemeException; import org.apache.flink.core.fs.local.LocalFileSystem; -import org.apache.flink.util.TestLogger; -import org.junit.After; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import java.io.File; import java.io.IOException; import java.net.URI; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; /** Tests for the configuration of the default file system scheme. */ -public class FilesystemSchemeConfigTest extends TestLogger { +public class FilesystemSchemeConfigTest { -@Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); +@TempDir public File tempFolder; -@After -public void clearFsSettings() throws IOException { +@AfterEach +void clearFsSettings() throws IOException { FileSystem.initialize(new Configuration()); } // @Test -public void testDefaultsToLocal() throws Exception { -URI justPath = new URI(tempFolder.newFile().toURI().getPath()); -assertNull(justPath.getScheme()); +void testDefaultsToLocal() throws Exception { +URI justPath = new URI(File.createTempFile("junit", null, tempFolder).toURI().getPath()); +assertThat(justPath.getScheme()).isNull(); FileSystem fs = FileSystem.get(justPath); -assertEquals("file", fs.getUri().getScheme()); +assertThat(fs.getUri().getScheme()).isEqualTo("file"); } @Test -public void testExplicitlySetToLocal() throws Exception { +void testExplicitlySetToLocal() throws Exception { final Configuration conf = new Configuration(); conf.set(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, LocalFileSystem.getLocalFsURI().toString()); FileSystem.initialize(conf); -URI justPath = new URI(tempFolder.newFile().toURI().getPath()); -assertNull(justPath.getScheme()); +URI justPath = new URI(File.createTempFile("junit", null, tempFolder).toURI().getPath()); +assertThat(justPath.getScheme()).isNull(); FileSystem fs = FileSystem.get(justPath); -assertEquals("file", fs.getUri().getScheme()); +assertThat(fs.getUri().getScheme()).isEqualTo("file"); } @Test -public void testExplicitlySetToOther() throws Exception { +void testExplicitlySetToOther() throws Exception { final Configuration conf = new Configuration(); conf.set(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "otherFS://localhost:1234/"); FileSystem.initialize(conf); -URI justPath = new URI(tempFolder.newFile().toURI().getPath()); -assertNull(justPath.getScheme()); +URI justPath = new URI(File.createTempFile("junit", null, tempFolder).toURI().getPath()); +assertThat(justPath.getScheme()).isNull(); try { FileSystem.get(justPath); fail("should have failed with an exception"); } catch (UnsupportedFileSystemSchemeException e) { -assertTrue(e.getMessage().contains("otherFS")); +assertThat(e.getMessage()).contains("otherFS"); Review Comment: It's better to use `assertThatThrownBy` instead. ## flink-core/src/test/java/org/apache/flink/configuration/MemorySizeTest.java: ## @@ -20,243 +20,232 @@ import org.apache.flink.core.testutils.CommonTestUtils; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.io.IOException; import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; /** Tests for the {@link MemorySize} class. */ -public class MemorySizeTest { +class MemorySizeTest { @Test -public void testUnitConversion() { +void testUnitConversion() { final MemorySize zero =
[jira] [Updated] (FLINK-34987) Introduce Internal State Interface for Async State API
[ https://issues.apache.org/jira/browse/FLINK-34987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34987: --- Labels: pull-request-available (was: ) > Introduce Internal State Interface for Async State API > -- > > Key: FLINK-34987 > URL: https://issues.apache.org/jira/browse/FLINK-34987 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Hangxiang Yu >Assignee: Hangxiang Yu >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]
Zakelly commented on code in PR #24651: URL: https://github.com/apache/flink/pull/24651#discussion_r1565148947 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.StateRequestType; + +/** + * The {@code InternalKvState} is the root of the internal state type hierarchy, similar to the Review Comment: nit. replace all `InternalKvState` with `InternalKeyedState`. ## flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java: ## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class for state descriptors. A {@code StateDescriptor} is used for creating partitioned + * State in stateful operations internally. + * + * @param The type of the value of the state object described by this state descriptor. + */ +@Internal +public abstract class StateDescriptor implements Serializable { + +private static final long serialVersionUID = 1L; + +/** An enumeration of the types of supported states. */ +public enum Type { +VALUE, +LIST, +REDUCING, +FOLDING, +AGGREGATING, +MAP +} + +/** ID that uniquely identifies state created from this StateDescriptor. */ +@Nonnull private final String stateId; + +/** The serializer for the type. */ +@Nonnull private final TypeSerializer typeSerializer; + +/** + * The type information describing the value type. Remain this since it could provide more + * information which could be used internally in the future. + */ +@Nonnull private final TypeInformation typeInfo; Review Comment: IIUC, we don't need this? ## flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java: ## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed
Re: [PR] [FLINK-35064] Update Pulsar dependency to solve the conflict of com.f… [flink-connector-pulsar]
elon-X commented on PR #88: URL: https://github.com/apache/flink-connector-pulsar/pull/88#issuecomment-2054774574 @MartijnVisser @syhily Could you please help approve this PR when you have some free time? Thank you very much. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35102][doris] Fix Doris connector type mapping issues [flink-cdc]
yuxiqian commented on PR #3224: URL: https://github.com/apache/flink-cdc/pull/3224#issuecomment-2054696577 cc @PatrickRen @lvyanquan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35102) Incorret Type mapping for Flink CDC Doris connector
[ https://issues.apache.org/jira/browse/FLINK-35102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35102: --- Labels: pull-request-available (was: ) > Incorret Type mapping for Flink CDC Doris connector > --- > > Key: FLINK-35102 > URL: https://issues.apache.org/jira/browse/FLINK-35102 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: Xiqian YU >Priority: Major > Labels: pull-request-available > > According to Flink CDC Doris connector docs, CHAR and VARCHAR are mapped to > 3-bytes since Doris uses UTF-8 variable-length encoding internally. > |CHAR(n)|CHAR(n*3)|In Doris, strings are stored in UTF-8 encoding, so English > characters occupy 1 byte and Chinese characters occupy 3 bytes. The length > here is multiplied by 3. The maximum length of CHAR is 255. Once exceeded, it > will automatically be converted to VARCHAR type.| > |VARCHAR(n)|VARCHAR(n*3)|Same as above. The length here is multiplied by 3. > The maximum length of VARCHAR is 65533. Once exceeded, it will automatically > be converted to STRING type.| > However, currently Doris connector maps `CHAR(n)` to `CHAR(n)` and > `VARCHAR(n)` to `VARCHAR(n * 4)`, which is inconsistent with specification in > docs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix][runtime]Fix the missing methed parameter annotation problem [flink]
chenyu-opensource commented on PR #24662: URL: https://github.com/apache/flink/pull/24662#issuecomment-2054684404 > Thanks for the quick fix, looks good to me. > > I just think a hotfix commit should be good enough, would you mind renaming this to `[hotfix][runtime]x`? That sounds great. I have made this change. Thank you so much. @reswqa -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35102) Incorret Type mapping for Flink CDC Doris connector
Xiqian YU created FLINK-35102: - Summary: Incorret Type mapping for Flink CDC Doris connector Key: FLINK-35102 URL: https://issues.apache.org/jira/browse/FLINK-35102 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: Xiqian YU According to Flink CDC Doris connector docs, CHAR and VARCHAR are mapped to 3-bytes since Doris uses UTF-8 variable-length encoding internally. |CHAR(n)|CHAR(n*3)|In Doris, strings are stored in UTF-8 encoding, so English characters occupy 1 byte and Chinese characters occupy 3 bytes. The length here is multiplied by 3. The maximum length of CHAR is 255. Once exceeded, it will automatically be converted to VARCHAR type.| |VARCHAR(n)|VARCHAR(n*3)|Same as above. The length here is multiplied by 3. The maximum length of VARCHAR is 65533. Once exceeded, it will automatically be converted to STRING type.| However, currently Doris connector maps `CHAR(n)` to `CHAR(n)` and `VARCHAR(n)` to `VARCHAR(n * 4)`, which is inconsistent with specification in docs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [minor][cdc][docs] Optimize styles of the Flink CDC website docs home… [flink-cdc]
leonardBang merged PR #3208: URL: https://github.com/apache/flink-cdc/pull/3208 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [minor][docs] Optimize markdown styles in quickstart doc [flink-cdc]
xleoken commented on PR #3223: URL: https://github.com/apache/flink-cdc/pull/3223#issuecomment-2054589844 cc @GOODBOY008 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [minor][docs] Optimize markdown styles in quickstart doc [flink-cdc]
xleoken opened a new pull request, #3223: URL: https://github.com/apache/flink-cdc/pull/3223 ![image](https://github.com/apache/flink-cdc/assets/95013770/038dda76-0ede-432d-9abf-c29d1cf256b0) https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/get-started/quickstart/mysql-to-doris/#synchronize-schema-and-data-changes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35046][state] Introduce AsyncKeyedStateBackend supporting to create StateExecutor [flink]
flinkbot commented on PR #24663: URL: https://github.com/apache/flink/pull/24663#issuecomment-2054586620 ## CI report: * b551d919ce3a63501e9ebc6f8769bd56f0c74d0c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35101][runtime]Fix the missing methed parameter annotation problem [flink]
flinkbot commented on PR #24662: URL: https://github.com/apache/flink/pull/24662#issuecomment-2054585372 ## CI report: * 0260e42af4748141a97fc036dc42b87623829b36 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35040) The performance of serializerHeavyString regresses since April 3
[ https://issues.apache.org/jira/browse/FLINK-35040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837041#comment-17837041 ] Rui Fan commented on FLINK-35040: - Thanks [~slfan1989] and [~rmetzger] for the comment! I didn't find any related issue from commons-io JIRA[1]. Also, I ran benchmark on my Mac with jdk11, and try to analyze why code path causes this regression. I use async-profiler wall mode to analyze the benchmark, and didn't find any code from commons.io package. Do you have any idea to troubleshooting? [1]https://issues.apache.org/jira/browse/IO-855?jql=project%20%3D%20IO%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened) > The performance of serializerHeavyString regresses since April 3 > > > Key: FLINK-35040 > URL: https://issues.apache.org/jira/browse/FLINK-35040 > Project: Flink > Issue Type: Bug > Components: Benchmarks >Affects Versions: 1.20.0 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Blocker > Labels: pull-request-available > Attachments: image-2024-04-08-10-51-07-403.png, > image-2024-04-11-12-53-53-353.png, screenshot-1.png > > > The performance of serializerHeavyString regresses since April 3, and had not > yet recovered on April 8th. > It seems Java 11 regresses, and Java 8 and Java 17 are fine. > http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerHeavyString=on=on=off=3=200 > !screenshot-1.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35046) Introduce New KeyedStateBackend related Async interfaces
[ https://issues.apache.org/jira/browse/FLINK-35046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu reassigned FLINK-35046: Assignee: Hangxiang Yu > Introduce New KeyedStateBackend related Async interfaces > > > Key: FLINK-35046 > URL: https://issues.apache.org/jira/browse/FLINK-35046 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Hangxiang Yu >Assignee: Hangxiang Yu >Priority: Major > Labels: pull-request-available > > Since we have introduced new State API, the async version of some classes > should be introduced to support it, e.g. AsyncKeyedStateBackend, new State > Descriptor. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35046) Introduce New KeyedStateBackend related Async interfaces
[ https://issues.apache.org/jira/browse/FLINK-35046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35046: --- Labels: pull-request-available (was: ) > Introduce New KeyedStateBackend related Async interfaces > > > Key: FLINK-35046 > URL: https://issues.apache.org/jira/browse/FLINK-35046 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Hangxiang Yu >Priority: Major > Labels: pull-request-available > > Since we have introduced new State API, the async version of some classes > should be introduced to support it, e.g. AsyncKeyedStateBackend, new State > Descriptor. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35046][state] Introduce AsyncKeyedStateBackend supporting to create StateExecutor [flink]
masteryhx opened a new pull request, #24663: URL: https://github.com/apache/flink/pull/24663 ## What is the purpose of the change Introduce AsyncKeyedStateBackend which supports to create StateExecutor ## Brief change log - Introduce a new interface called AsyncKeyedStateBackend - Introduce a new method for StateBackend to create AsyncKeyedStateBackend ## Verifying this change This change modified AsyncExecutionControllerTest to verify the new added interface. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
[ https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837040#comment-17837040 ] chenyu commented on FLINK-35101: [~Weijie Guo] Thank you so much for your review. > Missing method parameter annotation for ResourceManagerGateway.sendSlotReport > - > > Key: FLINK-35101 > URL: https://issues.apache.org/jira/browse/FLINK-35101 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.18.0, 1.19.0, 1.20.0 >Reporter: chenyu >Priority: Minor > Labels: pull-request-available > Fix For: 1.20.0 > > Attachments: image-2024-04-15-10-17-53-803.png > > > The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed. > !image-2024-04-15-10-17-53-803.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
[ https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35101: --- Labels: pull-request-available (was: ) > Missing method parameter annotation for ResourceManagerGateway.sendSlotReport > - > > Key: FLINK-35101 > URL: https://issues.apache.org/jira/browse/FLINK-35101 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.18.0, 1.19.0, 1.20.0 >Reporter: chenyu >Priority: Minor > Labels: pull-request-available > Fix For: 1.20.0 > > Attachments: image-2024-04-15-10-17-53-803.png > > > The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed. > !image-2024-04-15-10-17-53-803.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35101][runtime]Fix the missing methed parameter annotation problem [flink]
chenyu-opensource opened a new pull request, #24662: URL: https://github.com/apache/flink/pull/24662 ## What is the purpose of the change This pull request fixs the problem of missing methed parameter annotation. ## Brief change log Add the parameter annotation of 'taskManagerResourceId' for the method 'ResourceManagerGateway.sendSlotReport'. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-35093) Postgres source connector support SPECIFIC_OFFSETS start up mode from an existed replication slot.
[ https://issues.apache.org/jira/browse/FLINK-35093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo reassigned FLINK-35093: -- Assignee: Hongshun Wang > Postgres source connector support SPECIFIC_OFFSETS start up mode from an > existed replication slot. > -- > > Key: FLINK-35093 > URL: https://issues.apache.org/jira/browse/FLINK-35093 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: Hongshun Wang >Assignee: Hongshun Wang >Priority: Major > > Current, Postgres source connector only support INITIAL and LATEST mode. > However, sometimes, user want to restart from an existed replication slot's > confiermed_lsn. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
[ https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837038#comment-17837038 ] Weijie Guo commented on FLINK-35101: Just feel free to open a hotfix PR. > Missing method parameter annotation for ResourceManagerGateway.sendSlotReport > - > > Key: FLINK-35101 > URL: https://issues.apache.org/jira/browse/FLINK-35101 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.18.0, 1.19.0 >Reporter: chenyu >Priority: Minor > Fix For: 1.19.0 > > Attachments: image-2024-04-15-10-17-53-803.png > > > The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed. > !image-2024-04-15-10-17-53-803.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
[ https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chenyu updated FLINK-35101: --- Component/s: Table SQL / Runtime > Missing method parameter annotation for ResourceManagerGateway.sendSlotReport > - > > Key: FLINK-35101 > URL: https://issues.apache.org/jira/browse/FLINK-35101 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.18.0, 1.19.0 >Reporter: chenyu >Priority: Minor > Fix For: 1.19.0 > > Attachments: image-2024-04-15-10-17-53-803.png > > > The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed. > !image-2024-04-15-10-17-53-803.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
[ https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chenyu updated FLINK-35101: --- Fix Version/s: (was: 1.18.0) > Missing method parameter annotation for ResourceManagerGateway.sendSlotReport > - > > Key: FLINK-35101 > URL: https://issues.apache.org/jira/browse/FLINK-35101 > Project: Flink > Issue Type: Improvement >Reporter: chenyu >Priority: Minor > Fix For: 1.19.0 > > Attachments: image-2024-04-15-10-17-53-803.png > > > The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed. > !image-2024-04-15-10-17-53-803.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
[ https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837036#comment-17837036 ] chenyu commented on FLINK-35101: Please, assign this issue to me. I can fix it. > Missing method parameter annotation for ResourceManagerGateway.sendSlotReport > - > > Key: FLINK-35101 > URL: https://issues.apache.org/jira/browse/FLINK-35101 > Project: Flink > Issue Type: Improvement >Reporter: chenyu >Priority: Minor > Fix For: 1.18.0, 1.19.0 > > Attachments: image-2024-04-15-10-17-53-803.png > > > The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed. > !image-2024-04-15-10-17-53-803.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
[ https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chenyu updated FLINK-35101: --- Affects Version/s: 1.19.0 1.18.0 > Missing method parameter annotation for ResourceManagerGateway.sendSlotReport > - > > Key: FLINK-35101 > URL: https://issues.apache.org/jira/browse/FLINK-35101 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.18.0, 1.19.0 >Reporter: chenyu >Priority: Minor > Fix For: 1.19.0 > > Attachments: image-2024-04-15-10-17-53-803.png > > > The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed. > !image-2024-04-15-10-17-53-803.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
[ https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-35101: --- Affects Version/s: 1.20.0 > Missing method parameter annotation for ResourceManagerGateway.sendSlotReport > - > > Key: FLINK-35101 > URL: https://issues.apache.org/jira/browse/FLINK-35101 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.18.0, 1.19.0, 1.20.0 >Reporter: chenyu >Priority: Minor > Fix For: 1.19.0 > > Attachments: image-2024-04-15-10-17-53-803.png > > > The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed. > !image-2024-04-15-10-17-53-803.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
[ https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo closed FLINK-35101. -- Resolution: Fixed > Missing method parameter annotation for ResourceManagerGateway.sendSlotReport > - > > Key: FLINK-35101 > URL: https://issues.apache.org/jira/browse/FLINK-35101 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.18.0, 1.19.0, 1.20.0 >Reporter: chenyu >Priority: Minor > Fix For: 1.20.0 > > Attachments: image-2024-04-15-10-17-53-803.png > > > The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed. > !image-2024-04-15-10-17-53-803.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
[ https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-35101: --- Component/s: Runtime / Coordination (was: Table SQL / Runtime) > Missing method parameter annotation for ResourceManagerGateway.sendSlotReport > - > > Key: FLINK-35101 > URL: https://issues.apache.org/jira/browse/FLINK-35101 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.18.0, 1.19.0 >Reporter: chenyu >Priority: Minor > Fix For: 1.19.0 > > Attachments: image-2024-04-15-10-17-53-803.png > > > The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed. > !image-2024-04-15-10-17-53-803.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
[ https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-35101: --- Fix Version/s: 1.20.0 (was: 1.19.0) > Missing method parameter annotation for ResourceManagerGateway.sendSlotReport > - > > Key: FLINK-35101 > URL: https://issues.apache.org/jira/browse/FLINK-35101 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.18.0, 1.19.0, 1.20.0 >Reporter: chenyu >Priority: Minor > Fix For: 1.20.0 > > Attachments: image-2024-04-15-10-17-53-803.png > > > The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed. > !image-2024-04-15-10-17-53-803.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
[ https://issues.apache.org/jira/browse/FLINK-35101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837037#comment-17837037 ] Weijie Guo commented on FLINK-35101: Yeap, this should be aligned. A hot-fix is good enough, I will close this then. > Missing method parameter annotation for ResourceManagerGateway.sendSlotReport > - > > Key: FLINK-35101 > URL: https://issues.apache.org/jira/browse/FLINK-35101 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.18.0, 1.19.0 >Reporter: chenyu >Priority: Minor > Fix For: 1.19.0 > > Attachments: image-2024-04-15-10-17-53-803.png > > > The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed. > !image-2024-04-15-10-17-53-803.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34273) git fetch fails
[ https://issues.apache.org/jira/browse/FLINK-34273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837034#comment-17837034 ] Weijie Guo commented on FLINK-34273: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58895=logs=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3=2cec4644-024f-529b-9379-b711878ccf41=262 > git fetch fails > --- > > Key: FLINK-34273 > URL: https://issues.apache.org/jira/browse/FLINK-34273 > Project: Flink > Issue Type: Bug > Components: Build System / CI, Test Infrastructure >Affects Versions: 1.19.0, 1.18.1, 1.20.0 >Reporter: Matthias Pohl >Priority: Major > Labels: test-stability > > We've seen multiple {{git fetch}} failures. I assume this to be an > infrastructure issue. This Jira issue is for documentation purposes. > {code:java} > error: RPC failed; curl 18 transfer closed with outstanding read data > remaining > error: 5211 bytes of body are still expected > fetch-pack: unexpected disconnect while reading sideband packet > fatal: early EOF > fatal: fetch-pack: invalid index-pack output {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57080=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=5d6dc3d3-393d-5111-3a40-c6a5a36202e6=667 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35101) Missing method parameter annotation for ResourceManagerGateway.sendSlotReport
chenyu created FLINK-35101: -- Summary: Missing method parameter annotation for ResourceManagerGateway.sendSlotReport Key: FLINK-35101 URL: https://issues.apache.org/jira/browse/FLINK-35101 Project: Flink Issue Type: Improvement Reporter: chenyu Fix For: 1.19.0, 1.18.0 Attachments: image-2024-04-15-10-17-53-803.png The parameter of method 'ResourceManagerGateway.sendSlotReport' has missed. !image-2024-04-15-10-17-53-803.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-18476) PythonEnvUtilsTest#testStartPythonProcess fails
[ https://issues.apache.org/jira/browse/FLINK-18476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837035#comment-17837035 ] Weijie Guo commented on FLINK-18476: 1.20 jdk17 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58895=logs=d871f0ce-7328-5d00-023b-e7391f5801c8=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6=23253 > PythonEnvUtilsTest#testStartPythonProcess fails > --- > > Key: FLINK-18476 > URL: https://issues.apache.org/jira/browse/FLINK-18476 > Project: Flink > Issue Type: Bug > Components: API / Python, Tests >Affects Versions: 1.11.0, 1.15.3, 1.18.0, 1.19.0, 1.20.0 >Reporter: Dawid Wysakowicz >Priority: Major > Labels: auto-deprioritized-major, auto-deprioritized-minor, > test-stability > > The > {{org.apache.flink.client.python.PythonEnvUtilsTest#testStartPythonProcess}} > failed in my local environment as it assumes the environment has > {{/usr/bin/python}}. > I don't know exactly how did I get python in Ubuntu 20.04, but I have only > alias for {{python = python3}}. Therefore the tests fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35100) Quickstarts Java nightly end-to-end test failed on Azure
[ https://issues.apache.org/jira/browse/FLINK-35100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-35100: --- Description: {code:java} java.lang.IllegalStateException: Trying to open gateway for unseen checkpoint: latest known checkpoint = 1, incoming checkpoint = 2 Apr 13 01:11:42 at org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl.openGatewayAndUnmarkCheckpoint(SubtaskGatewayImpl.java:223) ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] Apr 13 01:11:42 at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$null$3(OperatorCoordinatorHolder.java:276) ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] Apr 13 01:11:42 at java.util.HashMap$Values.forEach(HashMap.java:982) ~[?:1.8.0_402] Apr 13 01:11:42 at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$notifyCheckpointAborted$4(OperatorCoordinatorHolder.java:276) ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] Apr 13 01:11:42 at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:460) ~[flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT] Apr 13 01:11:42 at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] Apr 13 01:11:42 at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:460) ~[flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT] Apr 13 01:11:42 at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:225) ~[flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT] Apr 13 01:11:42 at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88) ~[flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT] Apr 13 01:11:42 at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174) ~[flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT] Apr 13 01:11:42 at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) [flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT] Apr 13 01:11:42 at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) [flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT] Apr 13 01:11:42 at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT] Apr 13 01:11:42 at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT] Apr 13 01:11:42 at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) [flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT] Apr 13 01:11:42 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT] Apr 13 01:11:42 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT] Apr 13 01:11:42 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT] Apr 13 01:11:42 at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) [flink-rpc-akkae9795546-8599-4711-866f-44d3f5c3d377.jar:1.20-SNAPSHOT] {code} https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58895=logs=fb37c667-81b7-5c22-dd91-846535e99a97=011e961e-597c-5c96-04fe-7941c8b83f23=5714 > Quickstarts Java nightly end-to-end test failed on Azure > > > Key: FLINK-35100 > URL: https://issues.apache.org/jira/browse/FLINK-35100 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Priority: Major > > {code:java} > java.lang.IllegalStateException: Trying to open gateway for unseen > checkpoint: latest known checkpoint = 1, incoming checkpoint = 2 > Apr 13 01:11:42 at > org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl.openGatewayAndUnmarkCheckpoint(SubtaskGatewayImpl.java:223) > ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] > Apr 13 01:11:42 at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$null$3(OperatorCoordinatorHolder.java:276) > ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] > Apr 13 01:11:42 at java.util.HashMap$Values.forEach(HashMap.java:982) > ~[?:1.8.0_402] > Apr 13
[jira] [Comment Edited] (FLINK-34273) git fetch fails
[ https://issues.apache.org/jira/browse/FLINK-34273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837034#comment-17837034 ] Weijie Guo edited comment on FLINK-34273 at 4/15/24 2:15 AM: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58895=logs=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3=2cec4644-024f-529b-9379-b711878ccf41=262 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58895=logs=baf26b34-3c6a-54e8-f93f-cf269b32f802=c853d405-a51b-5dcf-f438-2de530b016d4=269 was (Author: weijie guo): https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58895=logs=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3=2cec4644-024f-529b-9379-b711878ccf41=262 > git fetch fails > --- > > Key: FLINK-34273 > URL: https://issues.apache.org/jira/browse/FLINK-34273 > Project: Flink > Issue Type: Bug > Components: Build System / CI, Test Infrastructure >Affects Versions: 1.19.0, 1.18.1, 1.20.0 >Reporter: Matthias Pohl >Priority: Major > Labels: test-stability > > We've seen multiple {{git fetch}} failures. I assume this to be an > infrastructure issue. This Jira issue is for documentation purposes. > {code:java} > error: RPC failed; curl 18 transfer closed with outstanding read data > remaining > error: 5211 bytes of body are still expected > fetch-pack: unexpected disconnect while reading sideband packet > fatal: early EOF > fatal: fetch-pack: invalid index-pack output {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57080=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=5d6dc3d3-393d-5111-3a40-c6a5a36202e6=667 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35100) Quickstarts Java nightly end-to-end test failed on Azure
Weijie Guo created FLINK-35100: -- Summary: Quickstarts Java nightly end-to-end test failed on Azure Key: FLINK-35100 URL: https://issues.apache.org/jira/browse/FLINK-35100 Project: Flink Issue Type: Bug Components: Build System / CI Affects Versions: 1.20.0 Reporter: Weijie Guo -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35097][table] Fix 'raw' format deserialization [flink]
kumar-mallikarjuna commented on PR #24661: URL: https://github.com/apache/flink/pull/24661#issuecomment-2054358863 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35093) Postgres source connector support SPECIFIC_OFFSETS start up mode from an existed replication slot.
[ https://issues.apache.org/jira/browse/FLINK-35093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hongshun Wang updated FLINK-35093: -- Description: Current, Postgres source connector only support INITIAL and LATEST mode. However, sometimes, user want to restart from an existed replication slot's confiermed_lsn. was: Current, Postgres source connector only support INITIAL and LATEST mode. However, sometimes, user want to restart from an existed replication slot. > Postgres source connector support SPECIFIC_OFFSETS start up mode from an > existed replication slot. > -- > > Key: FLINK-35093 > URL: https://issues.apache.org/jira/browse/FLINK-35093 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: Hongshun Wang >Priority: Major > > Current, Postgres source connector only support INITIAL and LATEST mode. > However, sometimes, user want to restart from an existed replication slot's > confiermed_lsn. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [pipeline-connector][paimon] add paimon pipeline data sink connector. [flink-cdc]
lvyanquan commented on PR #2916: URL: https://github.com/apache/flink-cdc/pull/2916#issuecomment-2054304285 Thanks @yuxiqian for those comments, I've addressed it and resubmitted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [pipeline-connector][paimon] add paimon pipeline data sink connector. [flink-cdc]
lvyanquan commented on code in PR #2916: URL: https://github.com/apache/flink-cdc/pull/2916#discussion_r1565075579 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java: ## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2; + +import org.apache.flink.api.connector.sink2.Committer; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.flink.sink.MultiTableCommittable; +import org.apache.paimon.flink.sink.StoreMultiCommitter; +import org.apache.paimon.manifest.WrappedManifestCommittable; +import org.apache.paimon.options.Options; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** A {@link Committer} to commit write results for multiple tables. */ +public class PaimonCommitter implements Committer { + +private final StoreMultiCommitter storeMultiCommitter; + +public PaimonCommitter(Options catalogOptions, String commitUser) { +Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); +// flinkMetricGroup could be passed after FLIP-371. +storeMultiCommitter = new StoreMultiCommitter(() -> catalog, commitUser, null); +} + +@Override +public void commit(Collection> commitRequests) +throws IOException, InterruptedException { +if (commitRequests.isEmpty()) { +return; +} +List committables = +commitRequests.stream() +.map(CommitRequest::getCommittable) +.collect(Collectors.toList()); +long checkpointId = committables.get(0).checkpointId(); +WrappedManifestCommittable wrappedManifestCommittable = +storeMultiCommitter.combine(checkpointId, 1L, committables); + storeMultiCommitter.commit(Collections.singletonList(wrappedManifestCommittable)); Review Comment: Yes, you can refer to [here](https://github.com/apache/flink-cdc/pull/2916/files#diff-c39b348dcbc97b5eea6e288f55b9884b7d7cdd83cdc3bc8a81d3d78179c0f9d4R66). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [pipeline-connector][paimon] add paimon pipeline data sink connector. [flink-cdc]
lvyanquan commented on code in PR #2916: URL: https://github.com/apache/flink-cdc/pull/2916#discussion_r1565069967 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java: ## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink; + +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.sink.MetadataApplier; +import org.apache.flink.cdc.common.types.utils.DataTypeUtils; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.flink.LogicalTypeConversion; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.SchemaChange; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A {@code MetadataApplier} that applies metadata changes to Paimon. Support primary key table + * only. + */ +public class PaimonMetadataApplier implements MetadataApplier { + +// Catalog is unSerializable. +private transient Catalog catalog; + +// currently, we set table options for all tables using the same options. +private final Map tableOptions; + +private final Options catalogOptions; + +private final Map> partitionMaps; + +public PaimonMetadataApplier(Options catalogOptions) { +this.catalogOptions = catalogOptions; +this.tableOptions = new HashMap<>(); +this.partitionMaps = new HashMap<>(); +} + +public PaimonMetadataApplier( +Options catalogOptions, +Map tableOptions, +Map> partitionMaps) { +this.catalogOptions = catalogOptions; +this.tableOptions = tableOptions; +this.partitionMaps = partitionMaps; +} + +@Override +public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) { +if (catalog == null) { +catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); +} +try { +if (schemaChangeEvent instanceof CreateTableEvent) { +applyCreateTable((CreateTableEvent) schemaChangeEvent); +} else if (schemaChangeEvent instanceof AddColumnEvent) { +applyAddColumn((AddColumnEvent) schemaChangeEvent); +} else if (schemaChangeEvent instanceof DropColumnEvent) { +applyDropColumn((DropColumnEvent) schemaChangeEvent); +} else if (schemaChangeEvent instanceof RenameColumnEvent) { +applyRenameColumn((RenameColumnEvent) schemaChangeEvent); +} else if (schemaChangeEvent instanceof AlterColumnTypeEvent) { +applyAlterColumn((AlterColumnTypeEvent) schemaChangeEvent); +} else { +throw new UnsupportedOperationException( +"PaimonDataSink doesn't support schema change event " + schemaChangeEvent); +} +} catch (Exception e) { +throw new RuntimeException(e); +} +} + +/** TODO support partition column. */ Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [pipeline-connector][paimon] add paimon pipeline data sink connector. [flink-cdc]
lvyanquan commented on code in PR #2916: URL: https://github.com/apache/flink-cdc/pull/2916#discussion_r1565072387 ## docs/content/pipelines/paimon-pipeline(ZH).md: ## @@ -0,0 +1,209 @@ +# Paimon Pipeline 连接器 + +Paimon Pipeline 连接器可以用作 Pipeline 的 *Data Sink*,将数据写入[Paimon](https://paimon.apache.org)。 本文档介绍如何设置 Paimon Pipeline 连接器。 + +## 连接器的功能 +* 自动建表 +* 表结构变更同步 +* 数据实时同步 + +如何创建 Pipeline + + +从 MySQL 读取数据同步到 Paimon 的 Pipeline 可以定义如下: + +```yaml +source: + type: mysql + name: MySQL Source + hostname: 127.0.0.1 + port: 3306 + username: admin + password: pass + tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.* + server-id: 5401-5404 + +sink: + type: paimon + name: Paimon Sink + metastore: filesystem + warehouse: /path/warehouse + +pipeline: + name: MySQL to Paimon Pipeline + parallelism: 2 +``` Review Comment: addressed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]
skymilong commented on code in PR #3194: URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1565044902 ## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/MemorySize.java: ## @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.utils; + +import org.apache.flink.annotation.PublicEvolving; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; +import java.util.Optional; +import java.util.stream.IntStream; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * MemorySize is a representation of a number of bytes, viewable in different units. + * + * Parsing + * + * The size can be parsed from a text expression. If the expression is a pure number, the value + * will be interpreted as bytes. + */ +@PublicEvolving +public class MemorySize implements java.io.Serializable, Comparable { Review Comment: > I think for `MemorySize` we should use the one the Flink instead of creating our own. It is a public API (marked as `@PublicEvolving`), and Flink CDC doesn't use this one in our production code. IIUC it is just used for parsing memory size expressions in configuration. I apologize for this oversight, and thank you for your patience. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35099) Support for multiple datasources
melin created FLINK-35099: - Summary: Support for multiple datasources Key: FLINK-35099 URL: https://issues.apache.org/jira/browse/FLINK-35099 Project: Flink Issue Type: New Feature Components: Flink CDC Reporter: melin In a hyperscale scenario, databases are distributed on multiple mysql database servers instead of one database server. In our case, there are up to 16 servers. Support for multiple datasources in a single flink cdc task. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging
[ https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837027#comment-17837027 ] dongwoo.kim edited comment on FLINK-34470 at 4/15/24 1:15 AM: -- Hello [~m.orazow], I'm glad that you're interested in collaborating, I’ll send a pr soon and keep you updated. Thanks for reaching out. Best, Dongwoo was (Author: JIRAUSER300481): Hello [~m.orazow], I'm glad that you're interested in collaborating, I’ll send a pr soon and keep you updated. Thanks for reaching out. Best regards, Dongwoo > Transactional message + Table api kafka source with 'latest-offset' scan > bound mode causes indefinitely hanging > --- > > Key: FLINK-34470 > URL: https://issues.apache.org/jira/browse/FLINK-34470 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 >Reporter: dongwoo.kim >Priority: Major > > h2. Summary > Hi we have faced issue with transactional message and table api kafka source. > If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's > request timeouts after hanging. We can always reproduce this unexpected > behavior by following below steps. > This is related to this > [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. > h2. How to reproduce > 1. Deploy transactional producer and stop after producing certain amount of > messages > 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple > query such as getting count of the produced messages > 3. Flink sql job gets stucked and timeouts. > h2. Cause > Transaction producer always produces [control > records|https://kafka.apache.org/documentation/#controlbatch] at the end of > the transaction. And these control messages are not polled by > {*}consumer.poll(){*}. (It is filtered internally). In > *KafkaPartitionSplitReader* code, split is finished only when > *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work > well with non transactional messages or streaming environment but in some > batch use cases it causes unexpected hanging. > h2. Proposed solution > {code:java} > if (consumer.position(tp) >= stoppingOffset) { > recordsBySplits.setPartitionStoppingOffset(tp, > stoppingOffset); > finishSplitAtRecord( > tp, > stoppingOffset, > lastRecord.offset(), > finishedPartitions, > recordsBySplits); > } > {code} > Replacing if condition to *consumer.position(tp) >= stoppingOffset* in > [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137] > can solve the problem. > *consumer.position(tp)* gets next record's offset if it exist and the last > record's offset if the next record doesn't exist. > By this KafkaPartitionSplitReader is available to finish the split even when > the stopping offset is configured to control record's offset. > I would be happy to implement about this fix if we can reach on agreement. > Thanks -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging
[ https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837027#comment-17837027 ] dongwoo.kim commented on FLINK-34470: - Hello [~m.orazow], I'm glad that you're interested in collaborating, I’ll send a pr soon and keep you updated. Thanks for reaching out. Best regards, Dongwoo > Transactional message + Table api kafka source with 'latest-offset' scan > bound mode causes indefinitely hanging > --- > > Key: FLINK-34470 > URL: https://issues.apache.org/jira/browse/FLINK-34470 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 >Reporter: dongwoo.kim >Priority: Major > > h2. Summary > Hi we have faced issue with transactional message and table api kafka source. > If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's > request timeouts after hanging. We can always reproduce this unexpected > behavior by following below steps. > This is related to this > [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. > h2. How to reproduce > 1. Deploy transactional producer and stop after producing certain amount of > messages > 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple > query such as getting count of the produced messages > 3. Flink sql job gets stucked and timeouts. > h2. Cause > Transaction producer always produces [control > records|https://kafka.apache.org/documentation/#controlbatch] at the end of > the transaction. And these control messages are not polled by > {*}consumer.poll(){*}. (It is filtered internally). In > *KafkaPartitionSplitReader* code, split is finished only when > *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work > well with non transactional messages or streaming environment but in some > batch use cases it causes unexpected hanging. > h2. Proposed solution > {code:java} > if (consumer.position(tp) >= stoppingOffset) { > recordsBySplits.setPartitionStoppingOffset(tp, > stoppingOffset); > finishSplitAtRecord( > tp, > stoppingOffset, > lastRecord.offset(), > finishedPartitions, > recordsBySplits); > } > {code} > Replacing if condition to *consumer.position(tp) >= stoppingOffset* in > [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137] > can solve the problem. > *consumer.position(tp)* gets next record's offset if it exist and the last > record's offset if the next record doesn't exist. > By this KafkaPartitionSplitReader is available to finish the split even when > the stopping offset is configured to control record's offset. > I would be happy to implement about this fix if we can reach on agreement. > Thanks -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [pipeline-connector][paimon] add paimon pipeline data sink connector. [flink-cdc]
yuxiqian commented on code in PR #2916: URL: https://github.com/apache/flink-cdc/pull/2916#discussion_r1565028345 ## docs/content/pipelines/paimon-pipeline(ZH).md: ## @@ -0,0 +1,209 @@ +# Paimon Pipeline 连接器 + +Paimon Pipeline 连接器可以用作 Pipeline 的 *Data Sink*,将数据写入[Paimon](https://paimon.apache.org)。 本文档介绍如何设置 Paimon Pipeline 连接器。 + +## 连接器的功能 +* 自动建表 +* 表结构变更同步 +* 数据实时同步 + +如何创建 Pipeline + + +从 MySQL 读取数据同步到 Paimon 的 Pipeline 可以定义如下: + +```yaml +source: + type: mysql + name: MySQL Source + hostname: 127.0.0.1 + port: 3306 + username: admin + password: pass + tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.* + server-id: 5401-5404 + +sink: + type: paimon + name: Paimon Sink + metastore: filesystem + warehouse: /path/warehouse + +pipeline: + name: MySQL to Paimon Pipeline + parallelism: 2 +``` Review Comment: > This indentation is to maintain the definition of yaml. `source` and `pipeline` phase uses 3-space indentation while `sink` uses 2. Any reason for this inconsistency? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35098][orc] Fix incorrect results in thanEquals filters [flink]
snuyanzin commented on PR #24658: URL: https://github.com/apache/flink/pull/24658#issuecomment-2054184159 Yep, you're right probably i was not aware of the context I was judging only by timestamps I'm really sorry in case I offended in any way I would suggest in case you are interested in something: let others know about it with a small comment in jira for instance, this should allow to avoid this in future imho... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34942][connectors/opensearch] Add support for Flink 1.19+ [flink-connector-opensearch]
snuyanzin merged PR #42: URL: https://github.com/apache/flink-connector-opensearch/pull/42 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34942][connectors/opensearch] Add support for Flink 1.19+ [flink-connector-opensearch]
snuyanzin commented on PR #42: URL: https://github.com/apache/flink-connector-opensearch/pull/42#issuecomment-2054124759 Thanks for taking a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [pipeline-connector][paimon] add paimon pipeline data sink connector. [flink-cdc]
lvyanquan commented on code in PR #2916: URL: https://github.com/apache/flink-cdc/pull/2916#discussion_r1564775105 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TableSchemaInfo.java: ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2; + +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.schema.Schema; + +import java.time.ZoneId; +import java.util.List; + +/** Keep a list of {@link RecordData.FieldGetter} for a specific {@link Schema}. */ +public class TableSchemaInfo { Review Comment: If we use the complete `Schema`, we can directly reuse the `SchemeUtils. applySchemeChangeEvent` method. However, if we use `SchemeChangeEvent`, we need to handle various SchemeChangeEvents, so when adding new `SchemeChangeEvent` types, we need to modify the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [pipeline-connector][paimon] add paimon pipeline data sink connector. [flink-cdc]
lvyanquan commented on code in PR #2916: URL: https://github.com/apache/flink-cdc/pull/2916#discussion_r1564773416 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java: ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2; + +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypeChecks; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.RowKind; + +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.List; + +/** A helper class for {@link PaimonWriter} to create FieldGetter and GenericRow. */ +public class PaimonWriterHelper { + +/** create a list of {@link RecordData.FieldGetter} for {@link PaimonWriter}. */ +public static List createFieldGetters(Schema schema, ZoneId zoneId) { +List fieldGetters = new ArrayList<>(schema.getColumns().size()); +for (int i = 0; i < schema.getColumns().size(); i++) { + fieldGetters.add(createFieldGetter(schema.getColumns().get(i).getType(), i, zoneId)); +} +return fieldGetters; +} + +private static RecordData.FieldGetter createFieldGetter( +DataType fieldType, int fieldPos, ZoneId zoneId) { +final RecordData.FieldGetter fieldGetter; +// ordered by type root definition +switch (fieldType.getTypeRoot()) { +case CHAR: +case VARCHAR: +fieldGetter = row -> BinaryString.fromString(row.getString(fieldPos).toString()); +break; +case BOOLEAN: +fieldGetter = row -> row.getBoolean(fieldPos); +break; +case BINARY: +case VARBINARY: +fieldGetter = row -> row.getBinary(fieldPos); +break; +case DECIMAL: +final int decimalPrecision = DataTypeChecks.getPrecision(fieldType); +final int decimalScale = DataTypeChecks.getScale(fieldType); +fieldGetter = +row -> { +DecimalData decimalData = +row.getDecimal(fieldPos, decimalPrecision, decimalScale); +return Decimal.fromBigDecimal( +decimalData.toBigDecimal(), decimalPrecision, decimalScale); +}; +break; +case TINYINT: +fieldGetter = row -> row.getByte(fieldPos); +break; +case SMALLINT: +fieldGetter = row -> row.getShort(fieldPos); +break; +case INTEGER: +case DATE: +case TIME_WITHOUT_TIME_ZONE: +fieldGetter = row -> row.getInt(fieldPos); +break; +case BIGINT: +fieldGetter = row -> row.getLong(fieldPos); +break; +case FLOAT: +fieldGetter = row -> row.getFloat(fieldPos); +break; +case DOUBLE: +fieldGetter = row -> row.getDouble(fieldPos); +break; +case TIMESTAMP_WITHOUT_TIME_ZONE: +fieldGetter = +row -> +Timestamp.fromSQLTimestamp( +row.getTimestamp( +fieldPos, + DataTypeChecks.getPrecision(fieldType)) +.toTimestamp()); +break; +case
Re: [PR] [FLINK-33859] Support OpenSearch v2 [flink-connector-opensearch]
snuyanzin commented on PR #38: URL: https://github.com/apache/flink-connector-opensearch/pull/38#issuecomment-2054094757 first this should be reviewed/approved/merged https://github.com/apache/flink-connector-opensearch/pull/42 then we could continue with this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-35097) Table API Filesystem connector with 'raw' format repeats last line
[ https://issues.apache.org/jira/browse/FLINK-35097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836959#comment-17836959 ] Kumar Mallikarjuna edited comment on FLINK-35097 at 4/14/24 3:05 PM: - [~david.perkins] , I've raised a fix here: [https://github.com/apache/flink/pull/24661] I just realised, I need to look for a committer and get the issue assigned first! was (Author: JIRAUSER303984): [~david.perkins] , I've raised a fix here: https://github.com/apache/flink/pull/24661 > Table API Filesystem connector with 'raw' format repeats last line > -- > > Key: FLINK-35097 > URL: https://issues.apache.org/jira/browse/FLINK-35097 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.17.1 > Environment: I ran the above test with 1.17.1. I checked for existing > bug tickets and release notes, but did not find anything, so assuming this > effects 1.18 and 1.19. >Reporter: David Perkins >Priority: Major > Labels: pull-request-available > > When using the Filesystem connector with 'raw' format to read text data that > contains new lines, a row is returned for every line, but always contains the > contents of the last line. > For example, with the following file. > {quote} > line 1 > line 2 > line 3 > {quote} > And table definition > {quote} > create TABLE MyRawTable ( > `doc` string, > ) WITH ( > 'path' = 'file:///path/to/data', > 'format' = 'raw', >'connector' = 'filesystem' > ); > {quote} > Selecting `*` from the table produces three rows all with "line 3" for `doc`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35098][ORC] Fix incorrect results with literal first expressions [flink]
empathy87 commented on PR #24659: URL: https://github.com/apache/flink/pull/24659#issuecomment-2054089997 > Hi @empathy87 thanks a lot for driving this PR. It looks good to me. Could you please add tests for each changelog in 'OrcFilters.java' ? Thanks! Sure! I reworked the tests to cover all changes in `OrcFilters.java`. Please, note that expressions like **10 > y** could not be tested by executing SQL queries. This is because Filesystem connector predicate push down works in a best effort manner. And if extra rows are returned from the ORC file, they will be further filtered out. Expressions like **10 > y** are tested in `OrcFileSystemFilterTest::testApplyPredicateReverse` to assert that it is converted to **y < 10** ORC filer predicate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [pipeline-connector][paimon] add paimon pipeline data sink connector. [flink-cdc]
yuxiqian commented on code in PR #2916: URL: https://github.com/apache/flink-cdc/pull/2916#discussion_r1564742486 ## docs/content/pipelines/paimon-pipeline.md: ## @@ -0,0 +1,209 @@ +# Paimon Pipeline Connector + +The Paimon Pipeline connector can be used as the *Data Sink* of the pipeline, and write data to [Paimon](https://paimon.apache.org). This document describes how to set up the Paimon Pipeline connector. + +## What can the connector do? +* Create table automatically if not exist +* Schema change synchronization +* Data synchronization + +How to create Pipeline + + +The pipeline for reading data from MySQL and sink to Paimon can be defined as follows: + +```yaml +source: + type: mysql + name: MySQL Source + hostname: 127.0.0.1 + port: 3306 + username: admin + password: pass + tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.* + server-id: 5401-5404 + +sink: + type: paimon + name: Paimon Sink + metastore: filesystem + warehouse: /path/warehouse + +pipeline: + name: MySQL to Paimon Pipeline + parallelism: 2 +``` Review Comment: Sink & Pipeline phase uses 2 spaces and source phase uses 3 spaces. Any reasons for this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [pipeline-connector][paimon] add paimon pipeline data sink connector. [flink-cdc]
lvyanquan commented on code in PR #2916: URL: https://github.com/apache/flink-cdc/pull/2916#discussion_r1564741797 ## docs/content/pipelines/paimon-pipeline.md: ## @@ -0,0 +1,209 @@ +# Paimon Pipeline Connector + +The Paimon Pipeline connector can be used as the *Data Sink* of the pipeline, and write data to [Paimon](https://paimon.apache.org). This document describes how to set up the Paimon Pipeline connector. + +## What can the connector do? +* Create table automatically if not exist +* Schema change synchronization +* Data synchronization + +How to create Pipeline + + +The pipeline for reading data from MySQL and sink to Paimon can be defined as follows: + +```yaml +source: + type: mysql + name: MySQL Source + hostname: 127.0.0.1 + port: 3306 + username: admin + password: pass + tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.* + server-id: 5401-5404 + +sink: + type: paimon + name: Paimon Sink + metastore: filesystem + warehouse: /path/warehouse + +pipeline: + name: MySQL to Paimon Pipeline + parallelism: 2 +``` Review Comment: This indentation is to maintain the definition of yaml. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [pipeline-connector][paimon] add paimon pipeline data sink connector. [flink-cdc]
lvyanquan commented on code in PR #2916: URL: https://github.com/apache/flink-cdc/pull/2916#discussion_r1564741417 ## docs/content/pipelines/paimon-pipeline(ZH).md: ## @@ -0,0 +1,209 @@ +# Paimon Pipeline 连接器 + +Paimon Pipeline 连接器可以用作 Pipeline 的 *Data Sink*,将数据写入[Paimon](https://paimon.apache.org)。 本文档介绍如何设置 Paimon Pipeline 连接器。 + +## 连接器的功能 +* 自动建表 +* 表结构变更同步 +* 数据实时同步 + +如何创建 Pipeline + + +从 MySQL 读取数据同步到 Paimon 的 Pipeline 可以定义如下: + +```yaml +source: + type: mysql + name: MySQL Source + hostname: 127.0.0.1 + port: 3306 + username: admin + password: pass + tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.* + server-id: 5401-5404 + +sink: + type: paimon + name: Paimon Sink + metastore: filesystem + warehouse: /path/warehouse + +pipeline: + name: MySQL to Paimon Pipeline + parallelism: 2 +``` Review Comment: This indentation is to maintain the definition of yaml. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35097][table] Fix 'raw' format deserialization [flink]
flinkbot commented on PR #24661: URL: https://github.com/apache/flink/pull/24661#issuecomment-2054068968 ## CI report: * b3c8502a9ca370eab1c023588571f6f3fc18addd UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35097) Table API Filesystem connector with 'raw' format repeats last line
[ https://issues.apache.org/jira/browse/FLINK-35097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836959#comment-17836959 ] Kumar Mallikarjuna commented on FLINK-35097: [~david.perkins] , I've raised a fix here: https://github.com/apache/flink/pull/24661 > Table API Filesystem connector with 'raw' format repeats last line > -- > > Key: FLINK-35097 > URL: https://issues.apache.org/jira/browse/FLINK-35097 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.17.1 > Environment: I ran the above test with 1.17.1. I checked for existing > bug tickets and release notes, but did not find anything, so assuming this > effects 1.18 and 1.19. >Reporter: David Perkins >Priority: Major > Labels: pull-request-available > > When using the Filesystem connector with 'raw' format to read text data that > contains new lines, a row is returned for every line, but always contains the > contents of the last line. > For example, with the following file. > {quote} > line 1 > line 2 > line 3 > {quote} > And table definition > {quote} > create TABLE MyRawTable ( > `doc` string, > ) WITH ( > 'path' = 'file:///path/to/data', > 'format' = 'raw', >'connector' = 'filesystem' > ); > {quote} > Selecting `*` from the table produces three rows all with "line 3" for `doc`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35097][table] Fix 'raw' format deserialization [flink]
kumar-mallikarjuna opened a new pull request, #24661: URL: https://github.com/apache/flink/pull/24661 ## What is the purpose of the change Fixes broken deserialization of the `raw` format. ## Brief change log The existing `reuse` attribute is returned by the `deserialize()` method in `RawFormatDeserializationSchema`. The attribute is successively modified during deserialization but since it's returned by reference in the method, all the returned deserializations refer to a single value. This change removes the attributes and instead constructs the returned value in the `deserialize()` method inside the method itself, thus avoiding overwrites. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. **Existing Behavior** 1. Create a test file with the content: ``` line 1 line 2 line3 ``` 2. Run ```SQL CREATE TABLE MyRawTable ( `doc` string, ) WITH ( 'path' = 'file:///path/to/data', 'format' = 'raw', 'connector' = 'filesystem' ); ``` 3. Check inserted values ```SQL SELECT * FROM MyRawTable; ``` ``` doc - line 3 line 3 line 3 ``` **After the Change** ```SQL SELECT * FROM MyRawTable; ``` ``` doc - line 1 line 2 line 3 ``` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (yes) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35097) Table API Filesystem connector with 'raw' format repeats last line
[ https://issues.apache.org/jira/browse/FLINK-35097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35097: --- Labels: pull-request-available (was: ) > Table API Filesystem connector with 'raw' format repeats last line > -- > > Key: FLINK-35097 > URL: https://issues.apache.org/jira/browse/FLINK-35097 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.17.1 > Environment: I ran the above test with 1.17.1. I checked for existing > bug tickets and release notes, but did not find anything, so assuming this > effects 1.18 and 1.19. >Reporter: David Perkins >Priority: Major > Labels: pull-request-available > > When using the Filesystem connector with 'raw' format to read text data that > contains new lines, a row is returned for every line, but always contains the > contents of the last line. > For example, with the following file. > {quote} > line 1 > line 2 > line 3 > {quote} > And table definition > {quote} > create TABLE MyRawTable ( > `doc` string, > ) WITH ( > 'path' = 'file:///path/to/data', > 'format' = 'raw', >'connector' = 'filesystem' > ); > {quote} > Selecting `*` from the table produces three rows all with "line 3" for `doc`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35097) Table API Filesystem connector with 'raw' format repeats last line
[ https://issues.apache.org/jira/browse/FLINK-35097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836952#comment-17836952 ] Kumar Mallikarjuna commented on FLINK-35097: > assuming this effects 1.18 and 1.19. This is reproducible on `master`. I'll look into a fix. > Table API Filesystem connector with 'raw' format repeats last line > -- > > Key: FLINK-35097 > URL: https://issues.apache.org/jira/browse/FLINK-35097 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.17.1 > Environment: I ran the above test with 1.17.1. I checked for existing > bug tickets and release notes, but did not find anything, so assuming this > effects 1.18 and 1.19. >Reporter: David Perkins >Priority: Major > > When using the Filesystem connector with 'raw' format to read text data that > contains new lines, a row is returned for every line, but always contains the > contents of the last line. > For example, with the following file. > {quote} > line 1 > line 2 > line 3 > {quote} > And table definition > {quote} > create TABLE MyRawTable ( > `doc` string, > ) WITH ( > 'path' = 'file:///path/to/data', > 'format' = 'raw', >'connector' = 'filesystem' > ); > {quote} > Selecting `*` from the table produces three rows all with "line 3" for `doc`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34903) Add mysql-pipeline-connector with tables.exclude option to exclude unnecessary tables
[ https://issues.apache.org/jira/browse/FLINK-34903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thorne updated FLINK-34903: --- Summary: Add mysql-pipeline-connector with tables.exclude option to exclude unnecessary tables (was: Add mysql-pipeline-connector with table.exclude.list option to exclude unnecessary tables ) > Add mysql-pipeline-connector with tables.exclude option to exclude > unnecessary tables > - > > Key: FLINK-34903 > URL: https://issues.apache.org/jira/browse/FLINK-34903 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: Thorne >Priority: Major > Labels: cdc, pull-request-available > Fix For: cdc-3.1.0 > > Attachments: screenshot-1.png, screenshot-2.png, screenshot-3.png > > Original Estimate: 72h > Remaining Estimate: 72h > > When using the MySQL Pipeline connector for whole-database > synchronization, users currently cannot exclude unnecessary tables. Taking > reference from Debezium's parameters, specifically the > {*}table.exclude.list{*}, if the *table.include.list* is declared, then the > *table.exclude.list* parameter will not take effect. However, the tables > specified in the tables parameter of the MySQL Pipeline connector are > effectively added to the *table.include.list* in Debezium's context. > !screenshot-1.png! > !screenshot-2.png|width=834,height=86! > debezium opthion desc > !screenshot-3.png|width=831,height=217! > In summary, it is necessary to introduce an externally-exposed > *table.exclude.list* parameter within the MySQL Pipeline connector to > facilitate the exclusion of tables. This is because the current setup does > not allow for excluding unnecessary tables when including others through the > tables parameter. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34903) Add mysql-pipeline-connector with tables.exclude option to exclude unnecessary tables
[ https://issues.apache.org/jira/browse/FLINK-34903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thorne updated FLINK-34903: --- Release Note: [features] Add mysql-pipeline-connector with tables.exclude option to exclude unnecessary tables (was: [features] Add mysql-pipeline-connector with table.exclude.list option to exclude unnecessary tables ) > Add mysql-pipeline-connector with tables.exclude option to exclude > unnecessary tables > - > > Key: FLINK-34903 > URL: https://issues.apache.org/jira/browse/FLINK-34903 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: Thorne >Priority: Major > Labels: cdc, pull-request-available > Fix For: cdc-3.1.0 > > Attachments: screenshot-1.png, screenshot-2.png, screenshot-3.png > > Original Estimate: 72h > Remaining Estimate: 72h > > When using the MySQL Pipeline connector for whole-database > synchronization, users currently cannot exclude unnecessary tables. Taking > reference from Debezium's parameters, specifically the > {*}table.exclude.list{*}, if the *table.include.list* is declared, then the > *table.exclude.list* parameter will not take effect. However, the tables > specified in the tables parameter of the MySQL Pipeline connector are > effectively added to the *table.include.list* in Debezium's context. > !screenshot-1.png! > !screenshot-2.png|width=834,height=86! > debezium opthion desc > !screenshot-3.png|width=831,height=217! > In summary, it is necessary to introduce an externally-exposed > *table.exclude.list* parameter within the MySQL Pipeline connector to > facilitate the exclusion of tables. This is because the current setup does > not allow for excluding unnecessary tables when including others through the > tables parameter. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-30687][table] Support pushdown for aggregate filters [flink]
flinkbot commented on PR #24660: URL: https://github.com/apache/flink/pull/24660#issuecomment-2054018936 ## CI report: * e5e78f553679c92f336b4f4fdccb31b95dd7bad9 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34903][MySQL][Feature] Add mysql-pipeline-connector with table.exclude.list option to exclude unnecessary tables [flink-cdc]
shiyiky commented on PR #3186: URL: https://github.com/apache/flink-cdc/pull/3186#issuecomment-2054018232 @PatrickRen I have done it .Please review it if u have free time ,tks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-30687][table] Support pushdown for aggregate filters [flink]
jeyhunkarimov opened a new pull request, #24660: URL: https://github.com/apache/flink/pull/24660 ## What is the purpose of the change This PR supports pushing down filters for aggregate expressions. Not that this optimization works if there is single aggregate expression with filter. Otherwise, pushing down filters leads incorrect results. ## Brief change log - Introduce `AggregateFilterPushdownRule` - Implement tests ## Verifying this change Via tests in `org.apache.flink.table.planner.plan.rules.logical.AggregateFilterPushdownRuleTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no ) - The runtime per-record code paths (performance sensitive): (no ) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30687) FILTER not effect in count(*)
[ https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30687: --- Labels: pull-request-available (was: ) > FILTER not effect in count(*) > - > > Key: FLINK-30687 > URL: https://issues.apache.org/jira/browse/FLINK-30687 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: tanjialiang >Priority: Major > Labels: pull-request-available > Attachments: image-2023-01-16-10-54-04-673.png > > > When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' > is not effect. > {code:java} > CREATE TABLE student > ( > id INT NOT NULL, > name STRING, > class_id INT NOT NULL > ) > WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost:3306/test', > 'table-name' = 'student', > 'username' = 'root', > 'password' = '12345678' > ); > SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; > or > SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code} > > But when i tried Flink SQL like this, it worked. > {code:java} > SELECT COUNT(*) FROM student WHERE class_id = 1; > or > SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} > > By the way, mysql connector has a bug and fixed in > https://issues.apache.org/jira/browse/FLINK-29558. Maybe you try this demo > should cherry-pick this PR first. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] add support for protobuf-confluent [flink]
anupamaggarwal commented on code in PR #24482: URL: https://github.com/apache/flink/pull/24482#discussion_r1564640169 ## flink-formats/flink-protobuf-confluent-registry/src/main/java/org/apache/flink/formats/protobuf/registry/confluent/SchemaCoderProviders.java: ## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf.registry.confluent; + +import org.apache.flink.formats.protobuf.registry.confluent.utils.FlinkToProtoSchemaConverter; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.WrappingRuntimeException; + +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.schemaregistry.protobuf.MessageIndexes; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; +import org.apache.kafka.common.utils.ByteUtils; + +import javax.annotation.Nullable; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static java.lang.String.format; + +/** Factory for {@link org.apache.flink.formats.protobuf.registry.confluent.SchemaCoder}. */ +public class SchemaCoderProviders { + +/** + * Creates a {@link org.apache.flink.formats.protobuf.registry.confluent.SchemaCoder} in cases + * where the schema has already been setup before-hand/exists in Confluent Schema Registry. + * + * Useful in scenarios where users want to be more explicit with schemas used. In these cases + * the external schema specified through schemaId will take precedence for encoding/decoding + * data. Also, the step of registering with schemaRegistry during serialization, will be + * skipped. + * + * A single Schema Registry Protobuf entry may contain multiple Protobuf messages, some of + * which may have nested messages. The messageName identifies the exact message/schema to use + * for serialization/deserialization. Consider the following protobuf message + * + * + * package test.package; + * message MessageA { + * message MessageB { + * message MessageC { + * ... + * } + * } + * message MessageD { + * ... + * } + * message MessageE { + * message MessageF { + * ... + * } + * message MessageG { + * ... + * } + * ... + * } + * ... + * } + * + * + * In order to use messageD the messageName should contain the value of + * test.package.messageD. Similarly, for messageF to be used messageName should contain + * test.package.MessageE.MessageF. + * + * @param schemaId SchemaId for external schema referenced for encoding/decoding of payload. + * @param messageName Optional message name to be used to select the right {@link + * com.google.protobuf.Message} for Serialialization/Deserialization. In absence of + * messageName the outermost message will be used. + * @param schemaRegistryClient client handle to Schema Registry {@link + * io.confluent.kafka.schemaregistry.client.SchemaRegistryClient} + * @return + */ +public static SchemaCoder createForPreRegisteredSchema( +int schemaId, @Nullable String messageName, SchemaRegistryClient schemaRegistryClient) { +return new PreRegisteredSchemaCoder(schemaId, messageName, schemaRegistryClient); +} + +/** + * Creates a default schema coder. + * + * For serialization schema coder will infer the schema from Flink {@link + * org.apache.flink.table.types.logical.RowType}. Schema obtained from rowType will also be + * registered to Schema Registry using the subject passed in by invoking {@link + * io.confluent.kafka.schemaregistry.client.SchemaRegistryClient#register(String, + *
Re: [PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] add support for protobuf-confluent [flink]
anupamaggarwal commented on code in PR #24482: URL: https://github.com/apache/flink/pull/24482#discussion_r1564639354 ## flink-formats/flink-protobuf-confluent-registry/src/test/java/org/apache/flink/formats/protobuf/registry/confluent/utils/FlinkToProtoSchemaConverterTest.java: ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf.registry.confluent.utils; + +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.RowType.RowField; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.util.TestLoggerExtension; + +import com.google.protobuf.Descriptors.Descriptor; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Collections; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link FlinkToProtoSchemaConverter}. */ +@ExtendWith(TestLoggerExtension.class) +class FlinkToProtoSchemaConverterTest { Review Comment: Hi @klam-shop, I don't think I understood your comment completely, if we are converting from the rowType the protobuf schema will only have 1 (root) element. IIUC, we won't have a case which is similar to what is outlined in [wire-format](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format) example. I added a test testMessageIndexHandlingInferredFromRowTypeWithConnectDecoder to verify if connect's deserializer is happy with empty message index. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-31992) FlinkKafkaConsumer API is suggested to use as part of documentation, when that API is deprecated for flink version 1.14
[ https://issues.apache.org/jira/browse/FLINK-31992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17756352#comment-17756352 ] Sandesh Mendan edited comment on FLINK-31992 at 4/14/24 11:07 AM: -- Since this issue has been corrected in higher flink version, I am proceeding to close this ticket was (Author: JIRAUSER300172): Since this issue has been corrected in higher flink version, I am proceeding to this ticket > FlinkKafkaConsumer API is suggested to use as part of documentation, when > that API is deprecated for flink version 1.14 > --- > > Key: FLINK-31992 > URL: https://issues.apache.org/jira/browse/FLINK-31992 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.14.2 >Reporter: Sandesh Mendan >Priority: Minor > Labels: auto-deprioritized-major, documentation, > documentation-update, good-first-issue, newbie > > In Flink version 1.14, even though the API class FlinkKafkaConsumer had been > [deprecated|https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/], > the official > [documentation|https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector] > suggests that API to use. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33859] Support OpenSearch v2 [flink-connector-opensearch]
joshbsemperis commented on PR #38: URL: https://github.com/apache/flink-connector-opensearch/pull/38#issuecomment-2054001884 Can this be resolved and pushed? Is there a reason why it hasnt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org