[jira] [Created] (FLINK-35537) Error parsing list of enum in legacy yaml configuration
Zakelly Lan created FLINK-35537: --- Summary: Error parsing list of enum in legacy yaml configuration Key: FLINK-35537 URL: https://issues.apache.org/jira/browse/FLINK-35537 Project: Flink Issue Type: Bug Components: Runtime / Configuration Affects Versions: 1.19.0 Reporter: Zakelly Lan In flink 1.9.0, when I submit a job to a standalone cluster, the TM throws {code:java} Caused by: java.lang.IllegalArgumentException: Could not parse value '[NO_COMPRESSION]' for key 'state.backend.rocksdb.compression.per.level'. at org.apache.flink.configuration.Configuration.getOptional(Configuration.java:827) at org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.internalGetOption(RocksDBResourceContainer.java:312) at org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.setColumnFamilyOptionsFromConfigurableOptions(RocksDBResourceContainer.java:361) at org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.getColumnOptions(RocksDBResourceContainer.java:181) at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.lambda$createKeyedStateBackend$0(EmbeddedRocksDBStateBackend.java:449) at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyOptions(RocksDBOperationUtils.java:219) at org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.loadDb(RocksDBHandle.java:138) at org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.openDB(RocksDBHandle.java:113) at org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation.restore(RocksDBNoneRestoreOperation.java:62) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:333) ... 19 more Caused by: java.lang.IllegalArgumentException: Could not parse value for enum class org.rocksdb.CompressionType. Expected one of: [[NO_COMPRESSION, SNAPPY_COMPRESSION, ZLIB_COMPRESSION, BZLIB2_COMPRESSION, LZ4_COMPRESSION, LZ4HC_COMPRESSION, XPRESS_COMPRESSION, ZSTD_COMPRESSION, DISABLE_COMPRESSION_OPTION]] at org.apache.flink.configuration.ConfigurationUtils.lambda$convertToEnum$12(ConfigurationUtils.java:502) at java.util.Optional.orElseThrow(Optional.java:290) at org.apache.flink.configuration.ConfigurationUtils.convertToEnum(ConfigurationUtils.java:499) at org.apache.flink.configuration.ConfigurationUtils.convertValue(ConfigurationUtils.java:392) at org.apache.flink.configuration.ConfigurationUtils.lambda$convertToListWithLegacyProperties$4(ConfigurationUtils.java:440) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) at org.apache.flink.configuration.ConfigurationUtils.convertToListWithLegacyProperties(ConfigurationUtils.java:441) at org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:432) at org.apache.flink.configuration.Configuration.lambda$getOptional$3(Configuration.java:819) at java.util.Optional.map(Optional.java:215) at org.apache.flink.configuration.Configuration.getOptional(Configuration.java:819) ... 28 more {code} I configured 'state.backend.rocksdb.compression.per.level: NO_COMPRESSION' in flink-conf.yaml. I also tried the flink-1.18.1, and it runs well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35537) Error parsing list of enum in legacy yaml configuration
[ https://issues.apache.org/jira/browse/FLINK-35537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852663#comment-17852663 ] Zakelly Lan commented on FLINK-35537: - In my guess this is something like the option is parsed twice, since the error message gives the raw value is '[NO_COMPRESSION]' instead of 'NO_COMPRESSION'. > Error parsing list of enum in legacy yaml configuration > > > Key: FLINK-35537 > URL: https://issues.apache.org/jira/browse/FLINK-35537 > Project: Flink > Issue Type: Bug > Components: Runtime / Configuration >Affects Versions: 1.19.0 >Reporter: Zakelly Lan >Priority: Major > > In flink 1.9.0, when I submit a job to a standalone cluster, the TM throws > {code:java} > Caused by: java.lang.IllegalArgumentException: Could not parse value > '[NO_COMPRESSION]' for key 'state.backend.rocksdb.compression.per.level'. > at > org.apache.flink.configuration.Configuration.getOptional(Configuration.java:827) > at > org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.internalGetOption(RocksDBResourceContainer.java:312) > at > org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.setColumnFamilyOptionsFromConfigurableOptions(RocksDBResourceContainer.java:361) > at > org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.getColumnOptions(RocksDBResourceContainer.java:181) > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.lambda$createKeyedStateBackend$0(EmbeddedRocksDBStateBackend.java:449) > at > org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyOptions(RocksDBOperationUtils.java:219) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.loadDb(RocksDBHandle.java:138) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.openDB(RocksDBHandle.java:113) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation.restore(RocksDBNoneRestoreOperation.java:62) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:333) > ... 19 more > Caused by: java.lang.IllegalArgumentException: Could not parse value for enum > class org.rocksdb.CompressionType. Expected one of: [[NO_COMPRESSION, > SNAPPY_COMPRESSION, ZLIB_COMPRESSION, BZLIB2_COMPRESSION, LZ4_COMPRESSION, > LZ4HC_COMPRESSION, XPRESS_COMPRESSION, ZSTD_COMPRESSION, > DISABLE_COMPRESSION_OPTION]] > at > org.apache.flink.configuration.ConfigurationUtils.lambda$convertToEnum$12(ConfigurationUtils.java:502) > at java.util.Optional.orElseThrow(Optional.java:290) > at > org.apache.flink.configuration.ConfigurationUtils.convertToEnum(ConfigurationUtils.java:499) > at > org.apache.flink.configuration.ConfigurationUtils.convertValue(ConfigurationUtils.java:392) > at > org.apache.flink.configuration.ConfigurationUtils.lambda$convertToListWithLegacyProperties$4(ConfigurationUtils.java:440) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.flink.configuration.ConfigurationUtils.convertToListWithLegacyProperties(ConfigurationUtils.java:441) > at > org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:432) > at > org.apache.flink.configuration.Configuration.lambda$getOptional$3(Configuration.java:819) > at java.util.Optional.map(Optional.java:215) > at > org.apache.flink.configuration.Configuration.getOptional(Configuration.java:819) > ... 28 more > {code} > I configured 'state.backend.rocksdb.compression.per.level: NO_COMPRESSION' in > flink-conf.yaml. I also tried the flink-1.18.1, and it runs well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35049][state] Implement Map Async State API for ForStStateBackend [flink]
fredia commented on code in PR #24812: URL: https://github.com/apache/flink/pull/24812#discussion_r1628898055 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBBunchPutRequest.java: ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.state.InternalStateFuture; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.Map; + +/** + * The Bunch Put access request for ForStDB. + * + * @param The type of key in original state access request. + */ +public class ForStDBBunchPutRequest extends ForStDBPutRequest, Map> { Review Comment: I moved the logic of interacting with the `db` to the `ForStDBPutRequest` and `ForStDBBunchPutRequest`, to avoid the classifications in `ForStWriteBatchOperation`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35525] Add all hdfs delegation tokens to Yarn AM [flink]
wForget commented on code in PR #24891: URL: https://github.com/apache/flink/pull/24891#discussion_r1628934380 ## flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java: ## @@ -1374,6 +1374,13 @@ private void setTokensFor(ContainerLaunchContext containerLaunchContext, boolean for (Map.Entry e : container.getTokens().entrySet()) { if (e.getKey().equals("hadoopfs")) { credentials.addAll(HadoopDelegationTokenConverter.deserialize(e.getValue())); +} else { +// Add hdfs tokens, maybe fetched by custom DelegationTokenProvider +Credentials fetchedCredentials = + HadoopDelegationTokenConverter.deserialize(e.getValue()); Review Comment: Thanks @gaborgsomogyi , I added a `yarn.security.appmaster.delegation.token.services` configuration to set token services we need for yarn am, and added unit test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35049][state] Implement Map Async State API for ForStStateBackend [flink]
fredia commented on code in PR #24812: URL: https://github.com/apache/flink/pull/24812#discussion_r1628938515 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBBunchPutRequest.java: ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.state.InternalStateFuture; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.Map; + +/** + * The Bunch Put access request for ForStDB. + * + * @param The type of key in original state access request. + */ +public class ForStDBBunchPutRequest extends ForStDBPutRequest, Map> { + +/** Serializer for the user values. */ +final TypeSerializer userValueSerializer; + +/** The data outputStream used for value serializer, which should be thread-safe. */ +final ThreadLocal valueSerializerView; + +/** The data inputStream used for value deserializer, which should be thread-safe. */ +final ThreadLocal valueDeserializerView; + +public ForStDBBunchPutRequest( +ContextKey key, Map value, ForStMapState table, InternalStateFuture future) { +super(key, value, table, future); +Preconditions.checkArgument(table instanceof ForStMapState); +this.userValueSerializer = table.userValueSerializer; +this.valueSerializerView = table.valueSerializerView; +this.valueDeserializerView = table.valueDeserializerView; +} + +public Map getBunchValue() { +return value; +} + +@Override +public byte[] buildSerializedKey() throws IOException { +key.resetExtra(); +return table.serializeKey(key); +} + +public byte[] buildSerializedKey(Object userKey) throws IOException { +key.setUserKey(userKey); +return table.serializeKey(key); +} + +public byte[] buildSerializedValue(Object singleValue) throws IOException { +DataOutputSerializer outputView = valueSerializerView.get(); +outputView.clear(); +userValueSerializer.serialize(singleValue, outputView); +return outputView.getCopyOfBuffer(); +} + +/** + * Find the next byte array that is lexicographically larger than input byte array. + * + * @param bytes the input byte array. + * @return the next byte array. + */ +public static byte[] nextBytes(byte[] bytes) { +Preconditions.checkState(bytes != null && bytes.length > 0); +int len = bytes.length; +byte[] nextBytes = new byte[len]; +System.arraycopy(bytes, 0, nextBytes, 0, len); +boolean find = false; +for (int i = len - 1; i >= 0; i--) { +byte currentByte = ++nextBytes[i]; +if (currentByte != Byte.MIN_VALUE) { +find = true; +break; +} +} +if (!find) { +byte[] newBytes = new byte[len + 1]; +System.arraycopy(bytes, 0, newBytes, 0, len); +newBytes[len] = 1; +return newBytes; +} Review Comment: Thanks for the suggestion, after reading the code in rocksdb, I think `newBytes[len] = Byte.MIN_VALUE` is enough. ```C++ inline int Slice::compare(const Slice& b) const { assert(data_ != nullptr && b.data_ != nullptr); const size_t min_len = (size_ < b.size_) ? size_ : b.size_; int r = memcmp(data_, b.data_, min_len); if (r == 0) { if (size_ < b.size_) r = -1; else if (size_ > b.size_) r = +1; } return r; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35049][state] Implement Map Async State API for ForStStateBackend [flink]
fredia commented on code in PR #24812: URL: https://github.com/apache/flink/pull/24812#discussion_r1628941486 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java: ## @@ -34,12 +34,35 @@ public class ForStDBGetRequest { private final K key; private final ForStInnerTable table; -private final InternalStateFuture future; +private final InternalStateFuture future; -private ForStDBGetRequest(K key, ForStInnerTable table, InternalStateFuture future) { +private final boolean toBoolean; +private final boolean checkMapEmpty; + +private int keyGroupPrefixBytes = 1; + +private ForStDBGetRequest( +K key, +ForStInnerTable table, +InternalStateFuture future, +boolean toBoolean, +boolean checkMapEmpty) { this.key = key; this.table = table; this.future = future; +this.toBoolean = toBoolean; Review Comment: 👍,I added `ForStDBMapCheckRequest` for this kind of operation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode
[ https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852668#comment-17852668 ] David Morávek commented on FLINK-35035: --- Yes, we have this on our radar. [~mapohl] is currently looking into this direction. The main idea is to move resource stabilization timeout from WaitingForResources into Executing state. > Reduce job pause time when cluster resources are expanded in adaptive mode > -- > > Key: FLINK-35035 > URL: https://issues.apache.org/jira/browse/FLINK-35035 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.19.0 >Reporter: yuanfenghu >Priority: Minor > > When 'jobmanager.scheduler = adaptive' , job graph changes triggered by > cluster expansion will cause long-term task stagnation. We should reduce this > impact. > As an example: > I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)] > When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5] > When I add slots the task will trigger jobgraph changes,by > org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable, > However, the five new slots I added were not discovered at the same time (for > convenience, I assume that a taskmanager has one slot), because no matter > what environment we add, we cannot guarantee that the new slots will be added > at once, so this will cause onNewResourcesAvailable triggers repeatedly > ,If each new slot action has a certain interval, then the jobgraph will > continue to change during this period. What I hope is that there will be a > stable time to configure the cluster resources and then go to it after the > number of cluster slots has been stable for a certain period of time. Trigger > jobgraph changes to avoid this situation -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35415][base] Fix compatibility with Flink 1.19 [flink-cdc]
leonardBang merged PR #3348: URL: https://github.com/apache/flink-cdc/pull/3348 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35092) Add integrated test for Doris / Starrocks sink pipeline connector
[ https://issues.apache.org/jira/browse/FLINK-35092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852669#comment-17852669 ] Leonard Xu commented on FLINK-35092: master: 1a360635c7ac936106e78d87867f96ed3f64e04b > Add integrated test for Doris / Starrocks sink pipeline connector > - > > Key: FLINK-35092 > URL: https://issues.apache.org/jira/browse/FLINK-35092 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: yux >Assignee: yux >Priority: Minor > Labels: pull-request-available > Fix For: cdc-3.2.0 > > > Currently, no integrated test are being applied to Doris pipeline connector > (there's only one DorisRowConverterTest case for now). Adding ITcases would > improving Doris connector's code quality and reliability. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35415) CDC Fails to create sink with Flink 1.19
[ https://issues.apache.org/jira/browse/FLINK-35415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852672#comment-17852672 ] Leonard Xu commented on FLINK-35415: master: 9a9603413c7c6d7094a30f8c8e676b65b0d3b9ba > CDC Fails to create sink with Flink 1.19 > > > Key: FLINK-35415 > URL: https://issues.apache.org/jira/browse/FLINK-35415 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: yux >Assignee: yux >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.2.0 > > > Currently, Flink CDC doesn't work with Flink 1.19 with the following > exception: > Exception in thread "main" java.lang.NoSuchMethodError: 'void > org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory.(org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink, > boolean, boolean)' > The reason is Flink CDC uses Flink @Internal API and it was changed in 1.19 > update. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35316) Add CDC e2e test case for on Flink 1.19
[ https://issues.apache.org/jira/browse/FLINK-35316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852671#comment-17852671 ] Leonard Xu commented on FLINK-35316: master: d8a9c8c63ef5fb958a72c3a15e47ee29d72a481e > Add CDC e2e test case for on Flink 1.19 > --- > > Key: FLINK-35316 > URL: https://issues.apache.org/jira/browse/FLINK-35316 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: yux >Priority: Minor > Labels: pull-request-available > > Since Flink 1.19 has been generally available, Flink CDC is expected to be > used with it. E2e test cases should cover this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35538) pipeline sink support caseSensitive
melin created FLINK-35538: - Summary: pipeline sink support caseSensitive Key: FLINK-35538 URL: https://issues.apache.org/jira/browse/FLINK-35538 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: cdc-3.2.0, cdc-3.1.1 Reporter: melin Attachments: image-2024-06-06-15-51-02-428.png source is case sensitive, sink is case insensitive. Even paimon doesn't allow capital letters !image-2024-06-06-15-51-02-428.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35120) Add Doris Pipeline connector integration test cases
[ https://issues.apache.org/jira/browse/FLINK-35120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852670#comment-17852670 ] Leonard Xu commented on FLINK-35120: master: 8eb7e53965e59c2f09c7d86461c095133e1d8998 > Add Doris Pipeline connector integration test cases > --- > > Key: FLINK-35120 > URL: https://issues.apache.org/jira/browse/FLINK-35120 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: yux >Assignee: yux >Priority: Minor > Labels: pull-request-available > Fix For: cdc-3.2.0 > > > Currently, Flink CDC Doris pipeline connector has very limited test cases > (which only covers row convertion). Adding an ITCase testing its data > pipeline and metadata applier should help improving connector's reliability. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35464) Flink CDC 3.1 breaks operator state compatiblity
[ https://issues.apache.org/jira/browse/FLINK-35464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852673#comment-17852673 ] Leonard Xu commented on FLINK-35464: master: 414720d1c5e9126ca557bf59389e9fb1b460c998 > Flink CDC 3.1 breaks operator state compatiblity > > > Key: FLINK-35464 > URL: https://issues.apache.org/jira/browse/FLINK-35464 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: yux >Assignee: yux >Priority: Blocker > Labels: pull-request-available > Fix For: cdc-3.1.1 > > > Flink CDC 3.1 changed how SchemaRegistry [de]serializes state data, which > causes any checkpoint states saved with earlier version could not be restored > in version 3.1.0. > This could be resolved by adding serialization versioning control logic. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [BP-3.1.1][FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink (#3233) [flink-cdc]
loserwang1024 commented on code in PR #3387: URL: https://github.com/apache/flink-cdc/pull/3387#discussion_r1628958303 ## flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslatorTest.java: ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.composer.flink.translator; + +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; + +import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; + +import org.junit.Assert; Review Comment: Thanks, i will also modified it in master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35049][state] Implement Map Async State API for ForStStateBackend [flink]
fredia commented on code in PR #24812: URL: https://github.com/apache/flink/pull/24812#discussion_r1628938515 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBBunchPutRequest.java: ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.state.InternalStateFuture; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.Map; + +/** + * The Bunch Put access request for ForStDB. + * + * @param The type of key in original state access request. + */ +public class ForStDBBunchPutRequest extends ForStDBPutRequest, Map> { + +/** Serializer for the user values. */ +final TypeSerializer userValueSerializer; + +/** The data outputStream used for value serializer, which should be thread-safe. */ +final ThreadLocal valueSerializerView; + +/** The data inputStream used for value deserializer, which should be thread-safe. */ +final ThreadLocal valueDeserializerView; + +public ForStDBBunchPutRequest( +ContextKey key, Map value, ForStMapState table, InternalStateFuture future) { +super(key, value, table, future); +Preconditions.checkArgument(table instanceof ForStMapState); +this.userValueSerializer = table.userValueSerializer; +this.valueSerializerView = table.valueSerializerView; +this.valueDeserializerView = table.valueDeserializerView; +} + +public Map getBunchValue() { +return value; +} + +@Override +public byte[] buildSerializedKey() throws IOException { +key.resetExtra(); +return table.serializeKey(key); +} + +public byte[] buildSerializedKey(Object userKey) throws IOException { +key.setUserKey(userKey); +return table.serializeKey(key); +} + +public byte[] buildSerializedValue(Object singleValue) throws IOException { +DataOutputSerializer outputView = valueSerializerView.get(); +outputView.clear(); +userValueSerializer.serialize(singleValue, outputView); +return outputView.getCopyOfBuffer(); +} + +/** + * Find the next byte array that is lexicographically larger than input byte array. + * + * @param bytes the input byte array. + * @return the next byte array. + */ +public static byte[] nextBytes(byte[] bytes) { +Preconditions.checkState(bytes != null && bytes.length > 0); +int len = bytes.length; +byte[] nextBytes = new byte[len]; +System.arraycopy(bytes, 0, nextBytes, 0, len); +boolean find = false; +for (int i = len - 1; i >= 0; i--) { +byte currentByte = ++nextBytes[i]; +if (currentByte != Byte.MIN_VALUE) { +find = true; +break; +} +} +if (!find) { +byte[] newBytes = new byte[len + 1]; +System.arraycopy(bytes, 0, newBytes, 0, len); +newBytes[len] = 1; +return newBytes; +} Review Comment: Thanks for the suggestion, after reading the code in rocksdb, I think `newBytes[len] = Byte.MAX_VALUE` is enough. ```C++ inline int Slice::compare(const Slice& b) const { assert(data_ != nullptr && b.data_ != nullptr); const size_t min_len = (size_ < b.size_) ? size_ : b.size_; int r = memcmp(data_, b.data_, min_len); if (r == 0) { if (size_ < b.size_) r = -1; else if (size_ > b.size_) r = +1; } return r; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35539) The artifactId of flink-migration-test-utils module has a typo
Zhen Wang created FLINK-35539: - Summary: The artifactId of flink-migration-test-utils module has a typo Key: FLINK-35539 URL: https://issues.apache.org/jira/browse/FLINK-35539 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.20.0 Reporter: Zhen Wang The artifactId of flink-migration-test-utils module has a typo. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35049][state] Implement Map Async State API for ForStStateBackend [flink]
fredia commented on code in PR #24812: URL: https://github.com/apache/flink/pull/24812#discussion_r1628979026 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java: ## @@ -55,22 +54,44 @@ public class ForStWriteBatchOperation implements ForStDBOperation { public CompletableFuture process() { return CompletableFuture.runAsync( () -> { -try (WriteBatch writeBatch = -new WriteBatch(batchRequest.size() * PER_RECORD_ESTIMATE_BYTES)) { +try (ForStDBWriteBatchWrapper writeBatch = +new ForStDBWriteBatchWrapper(db, writeOptions, batchRequest.size())) { for (ForStDBPutRequest request : batchRequest) { +ColumnFamilyHandle cf = request.getColumnFamilyHandle(); if (request.valueIsNull()) { -// put(key, null) == delete(key) -writeBatch.delete( -request.getColumnFamilyHandle(), -request.buildSerializedKey()); +if (request instanceof ForStDBBunchPutRequest) { +ForStDBBunchPutRequest bunchPutRequest = +(ForStDBBunchPutRequest) request; +byte[] primaryKey = bunchPutRequest.buildSerializedKey(null); Review Comment: For map state, there are two types of key: primary key and user key. Logically it is organized like this: `` We concatenate primary key and user key together as the key of rocksdb, and physically organized like this: ```< , user value> ``` For `Bunch remove`, we will remove all user keys under one primary key, so we use the prefix to match all keys. For `Single remove`, we use the bytes of <`primary key`-`user key`> to match one specific key. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35539] Fix artifactId of flink-migration-test-utils module [flink]
wForget opened a new pull request, #24901: URL: https://github.com/apache/flink/pull/24901 ## What is the purpose of the change fix typo ## Brief change log ## Verifying this change Flink CI ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`:no - The serializers: no - The runtime per-record code paths (performance sensitive):no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35539) The artifactId of flink-migration-test-utils module has a typo
[ https://issues.apache.org/jira/browse/FLINK-35539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35539: --- Labels: pull-request-available (was: ) > The artifactId of flink-migration-test-utils module has a typo > --- > > Key: FLINK-35539 > URL: https://issues.apache.org/jira/browse/FLINK-35539 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.20.0 >Reporter: Zhen Wang >Priority: Major > Labels: pull-request-available > > The artifactId of flink-migration-test-utils module has a typo. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35522][runtime] Fix the issue that the source task may get stuck in speculative execution mode. [flink]
zhuzhurk commented on PR #24899: URL: https://github.com/apache/flink/pull/24899#issuecomment-2151672311 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35049][state] Implement Map Async State API for ForStStateBackend [flink]
fredia commented on code in PR #24812: URL: https://github.com/apache/flink/pull/24812#discussion_r1628988650 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java: ## @@ -55,22 +54,44 @@ public class ForStWriteBatchOperation implements ForStDBOperation { public CompletableFuture process() { return CompletableFuture.runAsync( () -> { -try (WriteBatch writeBatch = -new WriteBatch(batchRequest.size() * PER_RECORD_ESTIMATE_BYTES)) { +try (ForStDBWriteBatchWrapper writeBatch = +new ForStDBWriteBatchWrapper(db, writeOptions, batchRequest.size())) { for (ForStDBPutRequest request : batchRequest) { +ColumnFamilyHandle cf = request.getColumnFamilyHandle(); if (request.valueIsNull()) { -// put(key, null) == delete(key) -writeBatch.delete( -request.getColumnFamilyHandle(), -request.buildSerializedKey()); +if (request instanceof ForStDBBunchPutRequest) { +ForStDBBunchPutRequest bunchPutRequest = +(ForStDBBunchPutRequest) request; +byte[] primaryKey = bunchPutRequest.buildSerializedKey(null); +byte[] endKey = ForStDBBunchPutRequest.nextBytes(primaryKey); Review Comment: We concatenate primary key and user key together as the key of rocksdb, and physically organized like this: ``` < , user value1> < , user value2> < , user value3> < , user value4> ``` Here, we use primary key as a prefix to calculate an interval to leverage `deleteRange`. But too much `deleteRange` may affect the read performance, and we will consider changing it iterator.delete later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35049][state] Implement Map Async State API for ForStStateBackend [flink]
fredia commented on code in PR #24812: URL: https://github.com/apache/flink/pull/24812#discussion_r1628990950 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java: ## @@ -34,12 +34,35 @@ public class ForStDBGetRequest { private final K key; private final ForStInnerTable table; -private final InternalStateFuture future; +private final InternalStateFuture future; -private ForStDBGetRequest(K key, ForStInnerTable table, InternalStateFuture future) { +private final boolean toBoolean; Review Comment: 👍I added `ForStDBMapCheckRequest` for this kind of operation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35539] Fix artifactId of flink-migration-test-utils module [flink]
flinkbot commented on PR #24901: URL: https://github.com/apache/flink/pull/24901#issuecomment-2151681134 ## CI report: * 50ae68d1e07a382fbcd3f635270f1855b7755237 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35149][hotfix][cdc-composor] adjust test of cdc composer from junit4 to junit5. [flink-cdc]
loserwang1024 commented on PR #3394: URL: https://github.com/apache/flink-cdc/pull/3394#issuecomment-2151681364 @Jiabao-Sun , CC -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35049][state] Implement Map Async State API for ForStStateBackend [flink]
fredia commented on code in PR #24812: URL: https://github.com/apache/flink/pull/24812#discussion_r1628994002 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java: ## @@ -34,23 +35,33 @@ */ public class ForStDBPutRequest { -private final K key; -@Nullable private final V value; -private final ForStInnerTable table; -private final InternalStateFuture future; +protected final K key; -private ForStDBPutRequest( +@Nullable protected final V value; + +protected final ForStInnerTable table; + +protected final InternalStateFuture future; + +protected final boolean tableIsMap; + +protected ForStDBPutRequest( K key, V value, ForStInnerTable table, InternalStateFuture future) { this.key = key; this.value = value; this.table = table; this.future = future; +this.tableIsMap = table instanceof ForStMapState; } public boolean valueIsNull() { return value == null; } +public boolean valueIsMap() { Review Comment: I moved the logic of interacting with the db to the `ForStDBPutRequest` and `ForStDBBunchPutRequest`, `valueIsMap` is deleted now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-35517) CI pipeline triggered by pull request seems unstable
[ https://issues.apache.org/jira/browse/FLINK-35517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851994#comment-17851994 ] Jing Ge edited comment on FLINK-35517 at 6/6/24 8:25 AM: - It should work again. Thanks [~chesnay] for your support! [~Weijie Guo] please let me know if you still have any issues. Thanks! was (Author: jingge): It should work again. [~Weijie Guo] please let me know if you still have any issues. Thanks! > CI pipeline triggered by pull request seems unstable > > > Key: FLINK-35517 > URL: https://issues.apache.org/jira/browse/FLINK-35517 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Assignee: Jing Ge >Priority: Blocker > Fix For: 1.20.0 > > > Flink CI pipeline triggered by pull request seems sort of unstable. > For example, https://github.com/apache/flink/pull/24883 was filed 15 hours > ago, but CI report is UNKNOWN. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35049][state] Implement Map Async State API for ForStStateBackend [flink]
fredia commented on code in PR #24812: URL: https://github.com/apache/flink/pull/24812#discussion_r1629027426 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java: ## @@ -55,22 +54,44 @@ public class ForStWriteBatchOperation implements ForStDBOperation { public CompletableFuture process() { return CompletableFuture.runAsync( () -> { -try (WriteBatch writeBatch = -new WriteBatch(batchRequest.size() * PER_RECORD_ESTIMATE_BYTES)) { +try (ForStDBWriteBatchWrapper writeBatch = +new ForStDBWriteBatchWrapper(db, writeOptions, batchRequest.size())) { for (ForStDBPutRequest request : batchRequest) { +ColumnFamilyHandle cf = request.getColumnFamilyHandle(); if (request.valueIsNull()) { -// put(key, null) == delete(key) -writeBatch.delete( -request.getColumnFamilyHandle(), -request.buildSerializedKey()); +if (request instanceof ForStDBBunchPutRequest) { +ForStDBBunchPutRequest bunchPutRequest = +(ForStDBBunchPutRequest) request; +byte[] primaryKey = bunchPutRequest.buildSerializedKey(null); +byte[] endKey = ForStDBBunchPutRequest.nextBytes(primaryKey); Review Comment: I found that `deleteRange` may be ambiguous in some cases, such as when the primary key is an array of `Byte. MAX_VALUE `, so I changed it to iterator and delete one by one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]
snuyanzin commented on code in PR #24526: URL: https://github.com/apache/flink/pull/24526#discussion_r1629030390 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayIntersectFunction.java: ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider; +import org.apache.flink.table.runtime.util.ObjectContainer; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Set; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_INTERSECT}. */ +@Internal +public class ArrayIntersectFunction extends BuiltInScalarFunction { +private final ArrayData.ElementGetter elementGetter; +private final EqualityAndHashcodeProvider equalityAndHashcodeProvider; + +public ArrayIntersectFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.ARRAY_INTERSECT, context); +final DataType dataType = +((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) +.getElementDataType() +.toInternal(); +elementGetter = ArrayData.createElementGetter(dataType.toInternal().getLogicalType()); +this.equalityAndHashcodeProvider = new EqualityAndHashcodeProvider(context, dataType); +} + +@Override +public void open(FunctionContext context) throws Exception { +equalityAndHashcodeProvider.open(context); +} + +public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) { +try { +if (arrayOne == null || arrayTwo == null) { +return null; +} + +Set set = new HashSet<>(); +for (int pos = 0; pos < arrayTwo.size(); pos++) { +final Object element = elementGetter.getElementOrNull(arrayTwo, pos); +final ObjectContainer objectContainer = createObjectContainer(element); +set.add(objectContainer); +} + +Set res = new LinkedHashSet<>(); Review Comment: nit: do we really need `LinkedHashSet` here? I guess we could use `ArrayList` to achieve same result which more space efficient like if a record present in `set` then add it to array list otherwise add it to `set` WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35353][docs-zh]Translate "Profiler" page into Chinese [flink]
drymatini commented on PR #24822: URL: https://github.com/apache/flink/pull/24822#issuecomment-2151726109 Hi @Myasuka ,thank you for the comments, I have made changes to solve the problems, please check again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]
snuyanzin commented on code in PR #24526: URL: https://github.com/apache/flink/pull/24526#discussion_r1629032823 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayIntersectFunction.java: ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider; +import org.apache.flink.table.runtime.util.ObjectContainer; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Set; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_INTERSECT}. */ +@Internal +public class ArrayIntersectFunction extends BuiltInScalarFunction { +private final ArrayData.ElementGetter elementGetter; +private final EqualityAndHashcodeProvider equalityAndHashcodeProvider; + +public ArrayIntersectFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.ARRAY_INTERSECT, context); +final DataType dataType = +((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) +.getElementDataType() +.toInternal(); +elementGetter = ArrayData.createElementGetter(dataType.toInternal().getLogicalType()); +this.equalityAndHashcodeProvider = new EqualityAndHashcodeProvider(context, dataType); +} + +@Override +public void open(FunctionContext context) throws Exception { +equalityAndHashcodeProvider.open(context); +} + +public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) { +try { +if (arrayOne == null || arrayTwo == null) { +return null; +} + +Set set = new HashSet<>(); +for (int pos = 0; pos < arrayTwo.size(); pos++) { +final Object element = elementGetter.getElementOrNull(arrayTwo, pos); +final ObjectContainer objectContainer = createObjectContainer(element); +set.add(objectContainer); +} + +Set res = new LinkedHashSet<>(); +for (int pos = 0; pos < arrayOne.size(); pos++) { +final Object element = elementGetter.getElementOrNull(arrayOne, pos); +final ObjectContainer objectContainer = createObjectContainer(element); +if (set.contains(objectContainer)) { +res.add(objectContainer); +} +} + +return new GenericArrayData( +res.stream() +.map(element -> element != null ? element.getObject() : null) +.toArray()); Review Comment: Is there anything blocking from usage of just `toArray` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]
snuyanzin commented on code in PR #24526: URL: https://github.com/apache/flink/pull/24526#discussion_r1629035418 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/ObjectContainer.java: ## @@ -31,21 +31,25 @@ @Internal public class ObjectContainer { -private final Object o; +private final Object object; Review Comment: I guess we do not need it if we use just ArrayList as mentioned above Or did I miss anything? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35537) Error parsing list of enum in legacy yaml configuration
[ https://issues.apache.org/jira/browse/FLINK-35537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852688#comment-17852688 ] Zakelly Lan commented on FLINK-35537: - I have confirmed that the problem comes from the {{{}EmbeddedRocksDBStateBackend#mergeConfigurableOptions{}}}, which merges two configuration map into one. It uses {{toString}} of each already parsed value and insert the value string into a new raw map. The {{toString}} gives the string format of enum list, not the legacy form of the value. The new raw map get parsed later, that's when the error happens. > Error parsing list of enum in legacy yaml configuration > > > Key: FLINK-35537 > URL: https://issues.apache.org/jira/browse/FLINK-35537 > Project: Flink > Issue Type: Bug > Components: Runtime / Configuration >Affects Versions: 1.19.0 >Reporter: Zakelly Lan >Priority: Major > > In flink 1.9.0, when I submit a job to a standalone cluster, the TM throws > {code:java} > Caused by: java.lang.IllegalArgumentException: Could not parse value > '[NO_COMPRESSION]' for key 'state.backend.rocksdb.compression.per.level'. > at > org.apache.flink.configuration.Configuration.getOptional(Configuration.java:827) > at > org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.internalGetOption(RocksDBResourceContainer.java:312) > at > org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.setColumnFamilyOptionsFromConfigurableOptions(RocksDBResourceContainer.java:361) > at > org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.getColumnOptions(RocksDBResourceContainer.java:181) > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.lambda$createKeyedStateBackend$0(EmbeddedRocksDBStateBackend.java:449) > at > org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyOptions(RocksDBOperationUtils.java:219) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.loadDb(RocksDBHandle.java:138) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.openDB(RocksDBHandle.java:113) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation.restore(RocksDBNoneRestoreOperation.java:62) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:333) > ... 19 more > Caused by: java.lang.IllegalArgumentException: Could not parse value for enum > class org.rocksdb.CompressionType. Expected one of: [[NO_COMPRESSION, > SNAPPY_COMPRESSION, ZLIB_COMPRESSION, BZLIB2_COMPRESSION, LZ4_COMPRESSION, > LZ4HC_COMPRESSION, XPRESS_COMPRESSION, ZSTD_COMPRESSION, > DISABLE_COMPRESSION_OPTION]] > at > org.apache.flink.configuration.ConfigurationUtils.lambda$convertToEnum$12(ConfigurationUtils.java:502) > at java.util.Optional.orElseThrow(Optional.java:290) > at > org.apache.flink.configuration.ConfigurationUtils.convertToEnum(ConfigurationUtils.java:499) > at > org.apache.flink.configuration.ConfigurationUtils.convertValue(ConfigurationUtils.java:392) > at > org.apache.flink.configuration.ConfigurationUtils.lambda$convertToListWithLegacyProperties$4(ConfigurationUtils.java:440) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.flink.configuration.ConfigurationUtils.convertToListWithLegacyProperties(ConfigurationUtils.java:441) > at > org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:432) > at > org.apache.flink.configuration.Configuration.lambda$getOptional$3(Configuration.java:819) > at java.util.Optional.map(Optional.java:215) > at > org.apache.flink.configuration.Configuration.getOptional(Configuration.java:819) > ... 28 more > {code} > I configured 'state.backend.rocksdb.compression.per.level: NO_COMPRESSION' in > flink-conf.yaml. I also tried the flink-1.18.1, and it runs well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35540) flink-cdc-pipeline-connector-mysql lost table which database and table with the same name
Qigeng Lin created FLINK-35540: -- Summary: flink-cdc-pipeline-connector-mysql lost table which database and table with the same name Key: FLINK-35540 URL: https://issues.apache.org/jira/browse/FLINK-35540 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.1.0 Reporter: Qigeng Lin h1. Description When the parameter of 'tables' in mysql pipeline job contains a table which database and table are with the same name like 'app.app', the job will fail and the error meaasge is like: {code:java} java.lang.IllegalArgumentException: Cannot find any table by the option 'tables' = app.app {code} h1. How to reproduce Create database and table all named `{{{}app`{}}}, then submit a pipeline job like this YAML defined: {code:java} source: type: mysql hostname: localhost port: 3306 username: root password: 123456 tables: app.app server-id: 5400-5404 server-time-zone: UTCsink: type: doris fenodes: 127.0.0.1:8030 username: root password: "" table.create.properties.light_schema_change: true table.create.properties.replication_num: 1pipeline: name: Sync MySQL Database to Doris parallelism: 2 {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35540) flink-cdc-pipeline-connector-mysql lost table which database and table with the same name
[ https://issues.apache.org/jira/browse/FLINK-35540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qigeng Lin updated FLINK-35540: --- Description: h1. Description When the parameter of 'tables' in mysql pipeline job contains a table which database and table are with the same name like 'app.app', the job will fail and the error meaasge is like: {code:java} java.lang.IllegalArgumentException: Cannot find any table by the option 'tables' = app.app {code} h1. How to reproduce Create database and table all named `{{{}app`{}}}, then submit a pipeline job like this YAML defined: {code:java} source: type: mysql hostname: localhost port: 3306 username: root password: 123456 tables: app.app server-id: 5400-5404 server-time-zone: UTC sink: type: doris fenodes: 127.0.0.1:8030 username: root password: "" table.create.properties.light_schema_change: true table.create.properties.replication_num: 1pipeline: name: Sync MySQL Database to Doris parallelism: 2 {code} was: h1. Description When the parameter of 'tables' in mysql pipeline job contains a table which database and table are with the same name like 'app.app', the job will fail and the error meaasge is like: {code:java} java.lang.IllegalArgumentException: Cannot find any table by the option 'tables' = app.app {code} h1. How to reproduce Create database and table all named `{{{}app`{}}}, then submit a pipeline job like this YAML defined: {code:java} source: type: mysql hostname: localhost port: 3306 username: root password: 123456 tables: app.app server-id: 5400-5404 server-time-zone: UTCsink: type: doris fenodes: 127.0.0.1:8030 username: root password: "" table.create.properties.light_schema_change: true table.create.properties.replication_num: 1pipeline: name: Sync MySQL Database to Doris parallelism: 2 {code} > flink-cdc-pipeline-connector-mysql lost table which database and table with > the same name > - > > Key: FLINK-35540 > URL: https://issues.apache.org/jira/browse/FLINK-35540 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Qigeng Lin >Priority: Major > > h1. Description > When the parameter of 'tables' in mysql pipeline job contains a table which > database and table are with the same name like 'app.app', the job will fail > and the error meaasge is like: > {code:java} > java.lang.IllegalArgumentException: Cannot find any table by the option > 'tables' = app.app {code} > h1. How to reproduce > Create database and table all named `{{{}app`{}}}, then submit a pipeline job > like this YAML defined: > {code:java} > source: > type: mysql > hostname: localhost > port: 3306 > username: root > password: 123456 > tables: app.app > server-id: 5400-5404 > server-time-zone: UTC > sink: > type: doris > fenodes: 127.0.0.1:8030 > username: root > password: "" > table.create.properties.light_schema_change: true > table.create.properties.replication_num: 1pipeline: > name: Sync MySQL Database to Doris > parallelism: 2 {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35537) Error when setting state.backend.rocksdb.compression.per.level
[ https://issues.apache.org/jira/browse/FLINK-35537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zakelly Lan updated FLINK-35537: Summary: Error when setting state.backend.rocksdb.compression.per.level (was: Error parsing list of enum in legacy yaml configuration ) > Error when setting state.backend.rocksdb.compression.per.level > -- > > Key: FLINK-35537 > URL: https://issues.apache.org/jira/browse/FLINK-35537 > Project: Flink > Issue Type: Bug > Components: Runtime / Configuration >Affects Versions: 1.19.0 >Reporter: Zakelly Lan >Priority: Major > > In flink 1.9.0, when I submit a job to a standalone cluster, the TM throws > {code:java} > Caused by: java.lang.IllegalArgumentException: Could not parse value > '[NO_COMPRESSION]' for key 'state.backend.rocksdb.compression.per.level'. > at > org.apache.flink.configuration.Configuration.getOptional(Configuration.java:827) > at > org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.internalGetOption(RocksDBResourceContainer.java:312) > at > org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.setColumnFamilyOptionsFromConfigurableOptions(RocksDBResourceContainer.java:361) > at > org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.getColumnOptions(RocksDBResourceContainer.java:181) > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.lambda$createKeyedStateBackend$0(EmbeddedRocksDBStateBackend.java:449) > at > org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyOptions(RocksDBOperationUtils.java:219) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.loadDb(RocksDBHandle.java:138) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.openDB(RocksDBHandle.java:113) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation.restore(RocksDBNoneRestoreOperation.java:62) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:333) > ... 19 more > Caused by: java.lang.IllegalArgumentException: Could not parse value for enum > class org.rocksdb.CompressionType. Expected one of: [[NO_COMPRESSION, > SNAPPY_COMPRESSION, ZLIB_COMPRESSION, BZLIB2_COMPRESSION, LZ4_COMPRESSION, > LZ4HC_COMPRESSION, XPRESS_COMPRESSION, ZSTD_COMPRESSION, > DISABLE_COMPRESSION_OPTION]] > at > org.apache.flink.configuration.ConfigurationUtils.lambda$convertToEnum$12(ConfigurationUtils.java:502) > at java.util.Optional.orElseThrow(Optional.java:290) > at > org.apache.flink.configuration.ConfigurationUtils.convertToEnum(ConfigurationUtils.java:499) > at > org.apache.flink.configuration.ConfigurationUtils.convertValue(ConfigurationUtils.java:392) > at > org.apache.flink.configuration.ConfigurationUtils.lambda$convertToListWithLegacyProperties$4(ConfigurationUtils.java:440) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.flink.configuration.ConfigurationUtils.convertToListWithLegacyProperties(ConfigurationUtils.java:441) > at > org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:432) > at > org.apache.flink.configuration.Configuration.lambda$getOptional$3(Configuration.java:819) > at java.util.Optional.map(Optional.java:215) > at > org.apache.flink.configuration.Configuration.getOptional(Configuration.java:819) > ... 28 more > {code} > I configured 'state.backend.rocksdb.compression.per.level: NO_COMPRESSION' in > flink-conf.yaml. I also tried the flink-1.18.1, and it runs well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35522][runtime] Fix the issue that the source task may get stuck in speculative execution mode. [flink]
zhuzhurk commented on PR #24899: URL: https://github.com/apache/flink/pull/24899#issuecomment-2151759062 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35525] Add a token services configuration to allow obtained token to be passed to Yarn AM [flink]
gaborgsomogyi commented on PR #24891: URL: https://github.com/apache/flink/pull/24891#issuecomment-2151764327 Unless there are further comments I'm intended to merge this next week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-35525) HDFS delegation token fetched by custom DelegationTokenProvider is not passed to Yarn AM
[ https://issues.apache.org/jira/browse/FLINK-35525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Somogyi reassigned FLINK-35525: - Assignee: Zhen Wang > HDFS delegation token fetched by custom DelegationTokenProvider is not > passed to Yarn AM > - > > Key: FLINK-35525 > URL: https://issues.apache.org/jira/browse/FLINK-35525 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.19.0, 1.18.1 >Reporter: Zhen Wang >Assignee: Zhen Wang >Priority: Major > Labels: pull-request-available > > I tried running flink with hadoop proxy user by disabling HadoopModuleFactory > and flink built-in token providers, and implementing a custom token provider. > However, only the hdfs token obtained by hadoopfs provider was added in > YarnClusterDescriptor, which resulted in Yarn AM submission failure. > Discussion: https://github.com/apache/flink/pull/22009#issuecomment-2132676114 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35537] Fix exception when setting 'state.backend.rocksdb.compression.per.level' in yaml [flink]
Zakelly opened a new pull request, #24902: URL: https://github.com/apache/flink/pull/24902 ## What is the purpose of the change See FLINK-35537, an exception thrown when setting 'state.backend.rocksdb.compression.per.level' in flink 1.19.0. I have confirmed that the problem comes from the `EmbeddedRocksDBStateBackend#mergeConfigurableOptions`, which merges two configuration map into one. It uses `toString` of each already parsed value and insert the value string into a new raw map. The `toString` gives the string format of enum list, not the legacy form of the value. The new raw map get parsed later, that's when the error happens. This PR fixes `EmbeddedRocksDBStateBackend#mergeConfigurableOptions`. ## Brief change log - `EmbeddedRocksDBStateBackend#mergeConfigurableOptions`. - Added test `testConfigureRocksDBCompressionPerLevel` ## Verifying this change Newly added test `testConfigureRocksDBCompressionPerLevel`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35537) Error when setting state.backend.rocksdb.compression.per.level
[ https://issues.apache.org/jira/browse/FLINK-35537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35537: --- Labels: pull-request-available (was: ) > Error when setting state.backend.rocksdb.compression.per.level > -- > > Key: FLINK-35537 > URL: https://issues.apache.org/jira/browse/FLINK-35537 > Project: Flink > Issue Type: Bug > Components: Runtime / Configuration >Affects Versions: 1.19.0 >Reporter: Zakelly Lan >Priority: Major > Labels: pull-request-available > > In flink 1.9.0, when I submit a job to a standalone cluster, the TM throws > {code:java} > Caused by: java.lang.IllegalArgumentException: Could not parse value > '[NO_COMPRESSION]' for key 'state.backend.rocksdb.compression.per.level'. > at > org.apache.flink.configuration.Configuration.getOptional(Configuration.java:827) > at > org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.internalGetOption(RocksDBResourceContainer.java:312) > at > org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.setColumnFamilyOptionsFromConfigurableOptions(RocksDBResourceContainer.java:361) > at > org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.getColumnOptions(RocksDBResourceContainer.java:181) > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.lambda$createKeyedStateBackend$0(EmbeddedRocksDBStateBackend.java:449) > at > org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyOptions(RocksDBOperationUtils.java:219) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.loadDb(RocksDBHandle.java:138) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.openDB(RocksDBHandle.java:113) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation.restore(RocksDBNoneRestoreOperation.java:62) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:333) > ... 19 more > Caused by: java.lang.IllegalArgumentException: Could not parse value for enum > class org.rocksdb.CompressionType. Expected one of: [[NO_COMPRESSION, > SNAPPY_COMPRESSION, ZLIB_COMPRESSION, BZLIB2_COMPRESSION, LZ4_COMPRESSION, > LZ4HC_COMPRESSION, XPRESS_COMPRESSION, ZSTD_COMPRESSION, > DISABLE_COMPRESSION_OPTION]] > at > org.apache.flink.configuration.ConfigurationUtils.lambda$convertToEnum$12(ConfigurationUtils.java:502) > at java.util.Optional.orElseThrow(Optional.java:290) > at > org.apache.flink.configuration.ConfigurationUtils.convertToEnum(ConfigurationUtils.java:499) > at > org.apache.flink.configuration.ConfigurationUtils.convertValue(ConfigurationUtils.java:392) > at > org.apache.flink.configuration.ConfigurationUtils.lambda$convertToListWithLegacyProperties$4(ConfigurationUtils.java:440) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.flink.configuration.ConfigurationUtils.convertToListWithLegacyProperties(ConfigurationUtils.java:441) > at > org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:432) > at > org.apache.flink.configuration.Configuration.lambda$getOptional$3(Configuration.java:819) > at java.util.Optional.map(Optional.java:215) > at > org.apache.flink.configuration.Configuration.getOptional(Configuration.java:819) > ... 28 more > {code} > I configured 'state.backend.rocksdb.compression.per.level: NO_COMPRESSION' in > flink-conf.yaml. I also tried the flink-1.18.1, and it runs well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35537] Fix exception when setting 'state.backend.rocksdb.compression.per.level' in yaml [flink]
Zakelly commented on PR #24902: URL: https://github.com/apache/flink/pull/24902#issuecomment-2151772776 @fredia @masteryhx Would you please take a look? thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35371) Allow the keystore and truststore type to configured for SSL
[ https://issues.apache.org/jira/browse/FLINK-35371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852689#comment-17852689 ] Gabor Somogyi commented on FLINK-35371: --- [~ammarm] any news on this? > Allow the keystore and truststore type to configured for SSL > > > Key: FLINK-35371 > URL: https://issues.apache.org/jira/browse/FLINK-35371 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.19.0 >Reporter: Ammar Master >Assignee: Ammar Master >Priority: Minor > Labels: SSL > > Flink always creates a keystore and trustore using the [default > type|https://github.com/apache/flink/blob/b87ead743dca161cdae8a1fef761954d206b81fb/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java#L236] > defined in the JDK, which in most cases is JKS. > {code} > KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); > {code} > We should add other configuration options to set the type explicitly to > support other custom formats, and match the options provided by other > applications by > [Spark|https://spark.apache.org/docs/latest/security.html#:~:text=the%20key%20store.-,%24%7Bns%7D.keyStoreType,-JKS] > and > [Kafka|https://kafka.apache.org/documentation/#:~:text=per%2Dbroker-,ssl.keystore.type,-The%20file%20format] > already. The default would continue to be specified by the JDK. > > The SSLContext for the REST API can read the configuration option directly, > and we need to add extra logic to the > [CustomSSLEngineProvider|https://github.com/apache/flink/blob/master/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java] > for Pekko. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [BP-3.1.1][FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink (#3233) [flink-cdc]
Jiabao-Sun merged PR #3387: URL: https://github.com/apache/flink-cdc/pull/3387 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35537] Fix exception when setting 'state.backend.rocksdb.compression.per.level' in yaml [flink]
flinkbot commented on PR #24902: URL: https://github.com/apache/flink/pull/24902#issuecomment-2151784661 ## CI report: * 3e4451a8c373e99bf99b1b9c4ecd2f8448855773 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35448] Translate pod templates documentation into Chinese [flink-kubernetes-operator]
1996fanrui commented on code in PR #830: URL: https://github.com/apache/flink-kubernetes-operator/pull/830#discussion_r1629028807 ## docs/content/docs/custom-resource/pod-template.md: ## @@ -26,21 +26,18 @@ under the License. # Pod template -The operator CRD is designed to have a minimal set of direct, short-hand CRD settings to express the most -basic attributes of a deployment. For all other settings the CRD provides the `flinkConfiguration` and -`podTemplate` fields. + -Pod templates permit customization of the Flink job and task manager pods, for example to specify -volume mounts, ephemeral storage, sidecar containers etc. +Operator CRD 被设计为一组直接、简短的 CRD 设置,以表达部署的最基本属性。对于所有其他设置,CRD 提供了 `flinkConfiguration` 和 `podTemplate` 字段。 Review Comment: ```suggestion Operator CRD 被设计为一组直接、简短的 CRD 设置,以表达 deployment 的最基本属性。对于所有其他设置,CRD 提供了 `flinkConfiguration` 和 `podTemplate` 字段。 ``` We don't need to translate `deployment `. ## docs/content/docs/custom-resource/pod-template.md: ## @@ -26,21 +26,18 @@ under the License. # Pod template -The operator CRD is designed to have a minimal set of direct, short-hand CRD settings to express the most -basic attributes of a deployment. For all other settings the CRD provides the `flinkConfiguration` and -`podTemplate` fields. + -Pod templates permit customization of the Flink job and task manager pods, for example to specify -volume mounts, ephemeral storage, sidecar containers etc. +Operator CRD 被设计为一组直接、简短的 CRD 设置,以表达部署的最基本属性。对于所有其他设置,CRD 提供了 `flinkConfiguration` 和 `podTemplate` 字段。 -Pod templates can be layered, as shown in the example below. -A common pod template may hold the settings that apply to both job and task manager, -like `volumeMounts`. Another template under job or task manager can define additional settings that supplement or override those -in the common template, such as a task manager sidecar. +Pod templates 保证了 Flink 作业和任务管理器 pod 的自定义,例如指定卷挂载、临时存储、sidecar 容器等。 -The operator is going to merge the common and specific templates for job and task manager respectively. +Pod template 可以被分层,如下面的示例所示。 +一个通用的 pod template 可以保存适用于作业和任务管理器的设置,比如 `volumeMounts`。作业或任务管理器下的另一个模板可以定义补充或覆盖通用模板中的其他设置,比如一个任务管理器 sidecar。 -Here the full example: +Operator 将分别合并作业和任务管理器的通用和特定模板。 Review Comment: Same comment as above. ## docs/content/docs/custom-resource/pod-template.md: ## @@ -26,21 +26,18 @@ under the License. # Pod template -The operator CRD is designed to have a minimal set of direct, short-hand CRD settings to express the most -basic attributes of a deployment. For all other settings the CRD provides the `flinkConfiguration` and -`podTemplate` fields. + -Pod templates permit customization of the Flink job and task manager pods, for example to specify -volume mounts, ephemeral storage, sidecar containers etc. +Operator CRD 被设计为一组直接、简短的 CRD 设置,以表达部署的最基本属性。对于所有其他设置,CRD 提供了 `flinkConfiguration` 和 `podTemplate` 字段。 -Pod templates can be layered, as shown in the example below. -A common pod template may hold the settings that apply to both job and task manager, -like `volumeMounts`. Another template under job or task manager can define additional settings that supplement or override those -in the common template, such as a task manager sidecar. +Pod templates 保证了 Flink 作业和任务管理器 pod 的自定义,例如指定卷挂载、临时存储、sidecar 容器等。 Review Comment: ```suggestion Pod templates 允许自定义 Flink Job 和 Task Manager 的 pod,例如指定卷挂载、临时存储、sidecar 容器等。 ``` Task Manager should be translated as well. ## docs/content/docs/custom-resource/pod-template.md: ## @@ -125,4 +124,4 @@ arr1: [{name: a, p2: v2}, {name: c, p2: v2}] merged: [{name: a, p1: v1, p2: v2}, {name: b, p1: v1}, {name: c, p2: v2}] ``` -Merging by name can we be very convenient when merging container specs or when the base and override templates are not defined together. +当合并容器规范或者当基础模板和覆盖模板没有一起定义时,按名称合并可以非常方便。 Review Comment: ```suggestion 当合并容器规格或者当基础模板和覆盖模板没有一起定义时,按名称合并可以非常方便。 ``` The original English doc doesn't need the `we`, right? If yes, we can remove it in this PR directly. ## docs/content/docs/custom-resource/pod-template.md: ## @@ -26,21 +26,18 @@ under the License. # Pod template -The operator CRD is designed to have a minimal set of direct, short-hand CRD settings to express the most -basic attributes of a deployment. For all other settings the CRD provides the `flinkConfiguration` and -`podTemplate` fields. + -Pod templates permit customization of the Flink job and task manager pods, for example to specify -volume mounts, ephemeral storage, sidecar containers etc. +Operator CRD 被设计为一组直接、简短的 CRD 设置,以表达部署的最基本属性。对于所有其他设置,CRD 提供了 `flinkConfiguration` 和 `podTemplate` 字段。 -Pod templates can be layered, as shown in the example below. -A common pod template may hold the settings that apply to both job and task manager, -like `volum
[jira] [Resolved] (FLINK-35149) Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink
[ https://issues.apache.org/jira/browse/FLINK-35149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiabao Sun resolved FLINK-35149. Release Note: cdc release-3.1: fa9fb0b1c49848e77c211a5913d7f28c33e04ff0 Resolution: Fixed > Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not > TwoPhaseCommittingSink > --- > > Key: FLINK-35149 > URL: https://issues.apache.org/jira/browse/FLINK-35149 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Hongshun Wang >Assignee: Hongshun Wang >Priority: Minor > Labels: pull-request-available > Fix For: cdc-3.2.0, cdc-3.1.1 > > > Current , when sink is not instanceof TwoPhaseCommittingSink, use > input.transform rather than stream. It means that pre-write topology will be > ignored. > {code:java} > private void sinkTo( > DataStream input, > Sink sink, > String sinkName, > OperatorID schemaOperatorID) { > DataStream stream = input; > // Pre write topology > if (sink instanceof WithPreWriteTopology) { > stream = ((WithPreWriteTopology) > sink).addPreWriteTopology(stream); > } > if (sink instanceof TwoPhaseCommittingSink) { > addCommittingTopology(sink, stream, sinkName, schemaOperatorID); > } else { > input.transform( > SINK_WRITER_PREFIX + sinkName, > CommittableMessageTypeInfo.noOutput(), > new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID)); > } > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35149) Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink
[ https://issues.apache.org/jira/browse/FLINK-35149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiabao Sun updated FLINK-35149: --- Release Note: (was: cdc release-3.1: fa9fb0b1c49848e77c211a5913d7f28c33e04ff0) > Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not > TwoPhaseCommittingSink > --- > > Key: FLINK-35149 > URL: https://issues.apache.org/jira/browse/FLINK-35149 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Hongshun Wang >Assignee: Hongshun Wang >Priority: Minor > Labels: pull-request-available > Fix For: cdc-3.2.0, cdc-3.1.1 > > > Current , when sink is not instanceof TwoPhaseCommittingSink, use > input.transform rather than stream. It means that pre-write topology will be > ignored. > {code:java} > private void sinkTo( > DataStream input, > Sink sink, > String sinkName, > OperatorID schemaOperatorID) { > DataStream stream = input; > // Pre write topology > if (sink instanceof WithPreWriteTopology) { > stream = ((WithPreWriteTopology) > sink).addPreWriteTopology(stream); > } > if (sink instanceof TwoPhaseCommittingSink) { > addCommittingTopology(sink, stream, sinkName, schemaOperatorID); > } else { > input.transform( > SINK_WRITER_PREFIX + sinkName, > CommittableMessageTypeInfo.noOutput(), > new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID)); > } > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35149) Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink
[ https://issues.apache.org/jira/browse/FLINK-35149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852690#comment-17852690 ] Jiabao Sun commented on FLINK-35149: flink-cdc release-3.1: fa9fb0b1c49848e77c211a5913d7f28c33e04ff0 > Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not > TwoPhaseCommittingSink > --- > > Key: FLINK-35149 > URL: https://issues.apache.org/jira/browse/FLINK-35149 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Hongshun Wang >Assignee: Hongshun Wang >Priority: Minor > Labels: pull-request-available > Fix For: cdc-3.2.0, cdc-3.1.1 > > > Current , when sink is not instanceof TwoPhaseCommittingSink, use > input.transform rather than stream. It means that pre-write topology will be > ignored. > {code:java} > private void sinkTo( > DataStream input, > Sink sink, > String sinkName, > OperatorID schemaOperatorID) { > DataStream stream = input; > // Pre write topology > if (sink instanceof WithPreWriteTopology) { > stream = ((WithPreWriteTopology) > sink).addPreWriteTopology(stream); > } > if (sink instanceof TwoPhaseCommittingSink) { > addCommittingTopology(sink, stream, sinkName, schemaOperatorID); > } else { > input.transform( > SINK_WRITER_PREFIX + sinkName, > CommittableMessageTypeInfo.noOutput(), > new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID)); > } > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33278) RemotePekkoRpcActorTest.failsRpcResultImmediatelyIfRemoteRpcServiceIsNotAvailable fails on AZP
[ https://issues.apache.org/jira/browse/FLINK-33278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852691#comment-17852691 ] Pedro Mázala commented on FLINK-33278: -- Is there any updates on this? I'm also seeing it on Flink 1.18.x > RemotePekkoRpcActorTest.failsRpcResultImmediatelyIfRemoteRpcServiceIsNotAvailable > fails on AZP > -- > > Key: FLINK-33278 > URL: https://issues.apache.org/jira/browse/FLINK-33278 > Project: Flink > Issue Type: Bug > Components: Runtime / RPC >Affects Versions: 1.19.0 >Reporter: Sergey Nuyanzin >Priority: Critical > Labels: test-stability > Attachments: screenshot-1.png, screenshot-2.png, screenshot-3.png, > screenshot-4.png > > > This build > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53740&view=logs&j=0e7be18f-84f2-53f0-a32d-4a5e4a174679&t=7c1d86e3-35bd-5fd5-3b7c-30c126a78702&l=6563] > fails as > {noformat} > Oct 15 01:02:20 Multiple Failures (1 failure) > Oct 15 01:02:20 -- failure 1 -- > Oct 15 01:02:20 [Any cause is instance of class 'class > org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException'] > Oct 15 01:02:20 Expecting any element of: > Oct 15 01:02:20 [java.util.concurrent.CompletionException: > java.util.concurrent.TimeoutException: Invocation of > [RemoteRpcInvocation(SerializedValueRespondingGateway.getSerializedValue())] > at recipient > [pekko.tcp://flink@localhost:38231/user/rpc/8c211f34-41e5-4efe-93bd-8eca6c590a7f] > timed out. This is usually caused by: 1) Pekko failed sending the message > silently, due to problems like oversized payload or serialization failures. > In that case, you should find detailed error information in the logs. 2) The > recipient needs more time for responding, due to problems like slow machines > or network jitters. In that case, you can try to increase pekko.ask.timeout. > Oct 15 01:02:20 at > java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:375) > Oct 15 01:02:20 at > java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947) > Oct 15 01:02:20 at > org.apache.flink.runtime.rpc.pekko.RemotePekkoRpcActorTest.lambda$failsRpcResultImmediatelyIfRemoteRpcServiceIsNotAvailable$1(RemotePekkoRpcActorTest.java:168) > Oct 15 01:02:20 ...(63 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed), > ... > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35541) Introduce retry limiting for AWS connector sinks
Aleksandr Pilipenko created FLINK-35541: --- Summary: Introduce retry limiting for AWS connector sinks Key: FLINK-35541 URL: https://issues.apache.org/jira/browse/FLINK-35541 Project: Flink Issue Type: Technical Debt Components: Connectors / AWS, Connectors / DynamoDB, Connectors / Firehose, Connectors / Kinesis Affects Versions: aws-connector-4.2.0 Reporter: Aleksandr Pilipenko Currently if the record write operation in the sink consistently fails with retriable error, sinks will retry indefinitely. In case when cause of the error is not resolved this may lead to stuck operator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]
liuyongvs commented on code in PR #24526: URL: https://github.com/apache/flink/pull/24526#discussion_r1629112803 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayIntersectFunction.java: ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider; +import org.apache.flink.table.runtime.util.ObjectContainer; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Set; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_INTERSECT}. */ +@Internal +public class ArrayIntersectFunction extends BuiltInScalarFunction { +private final ArrayData.ElementGetter elementGetter; +private final EqualityAndHashcodeProvider equalityAndHashcodeProvider; + +public ArrayIntersectFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.ARRAY_INTERSECT, context); +final DataType dataType = +((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) +.getElementDataType() +.toInternal(); +elementGetter = ArrayData.createElementGetter(dataType.toInternal().getLogicalType()); +this.equalityAndHashcodeProvider = new EqualityAndHashcodeProvider(context, dataType); +} + +@Override +public void open(FunctionContext context) throws Exception { +equalityAndHashcodeProvider.open(context); +} + +public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) { +try { +if (arrayOne == null || arrayTwo == null) { +return null; +} + +Set set = new HashSet<>(); +for (int pos = 0; pos < arrayTwo.size(); pos++) { +final Object element = elementGetter.getElementOrNull(arrayTwo, pos); +final ObjectContainer objectContainer = createObjectContainer(element); +set.add(objectContainer); +} + +Set res = new LinkedHashSet<>(); Review Comment: if a record present in set then add it to array list otherwise add it to set? how to deduplicate? if array1: a, a, array2: b, a how to deduplicate a ,a ? @snuyanzin -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]
liuyongvs commented on code in PR #24526: URL: https://github.com/apache/flink/pull/24526#discussion_r1629113958 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayIntersectFunction.java: ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider; +import org.apache.flink.table.runtime.util.ObjectContainer; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Set; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_INTERSECT}. */ +@Internal +public class ArrayIntersectFunction extends BuiltInScalarFunction { +private final ArrayData.ElementGetter elementGetter; +private final EqualityAndHashcodeProvider equalityAndHashcodeProvider; + +public ArrayIntersectFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.ARRAY_INTERSECT, context); +final DataType dataType = +((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) +.getElementDataType() +.toInternal(); +elementGetter = ArrayData.createElementGetter(dataType.toInternal().getLogicalType()); +this.equalityAndHashcodeProvider = new EqualityAndHashcodeProvider(context, dataType); +} + +@Override +public void open(FunctionContext context) throws Exception { +equalityAndHashcodeProvider.open(context); +} + +public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) { +try { +if (arrayOne == null || arrayTwo == null) { +return null; +} + +Set set = new HashSet<>(); +for (int pos = 0; pos < arrayTwo.size(); pos++) { +final Object element = elementGetter.getElementOrNull(arrayTwo, pos); +final ObjectContainer objectContainer = createObjectContainer(element); +set.add(objectContainer); +} + +Set res = new LinkedHashSet<>(); +for (int pos = 0; pos < arrayOne.size(); pos++) { +final Object element = elementGetter.getElementOrNull(arrayOne, pos); +final ObjectContainer objectContainer = createObjectContainer(element); +if (set.contains(objectContainer)) { +res.add(objectContainer); +} +} + +return new GenericArrayData( +res.stream() +.map(element -> element != null ? element.getObject() : null) +.toArray()); Review Comment: when element has null, it will NPE @snuyanzin -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33278) RemotePekkoRpcActorTest.failsRpcResultImmediatelyIfRemoteRpcServiceIsNotAvailable fails on AZP
[ https://issues.apache.org/jira/browse/FLINK-33278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852700#comment-17852700 ] Matthias Pohl commented on FLINK-33278: --- What are you seeing on Flink 1.18.x? Are you referring to the test instability or some error? I guess, we haven't continued working on the issue for now because that test instability was only observed once so far. > RemotePekkoRpcActorTest.failsRpcResultImmediatelyIfRemoteRpcServiceIsNotAvailable > fails on AZP > -- > > Key: FLINK-33278 > URL: https://issues.apache.org/jira/browse/FLINK-33278 > Project: Flink > Issue Type: Bug > Components: Runtime / RPC >Affects Versions: 1.19.0 >Reporter: Sergey Nuyanzin >Priority: Critical > Labels: test-stability > Attachments: screenshot-1.png, screenshot-2.png, screenshot-3.png, > screenshot-4.png > > > This build > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53740&view=logs&j=0e7be18f-84f2-53f0-a32d-4a5e4a174679&t=7c1d86e3-35bd-5fd5-3b7c-30c126a78702&l=6563] > fails as > {noformat} > Oct 15 01:02:20 Multiple Failures (1 failure) > Oct 15 01:02:20 -- failure 1 -- > Oct 15 01:02:20 [Any cause is instance of class 'class > org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException'] > Oct 15 01:02:20 Expecting any element of: > Oct 15 01:02:20 [java.util.concurrent.CompletionException: > java.util.concurrent.TimeoutException: Invocation of > [RemoteRpcInvocation(SerializedValueRespondingGateway.getSerializedValue())] > at recipient > [pekko.tcp://flink@localhost:38231/user/rpc/8c211f34-41e5-4efe-93bd-8eca6c590a7f] > timed out. This is usually caused by: 1) Pekko failed sending the message > silently, due to problems like oversized payload or serialization failures. > In that case, you should find detailed error information in the logs. 2) The > recipient needs more time for responding, due to problems like slow machines > or network jitters. In that case, you can try to increase pekko.ask.timeout. > Oct 15 01:02:20 at > java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:375) > Oct 15 01:02:20 at > java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947) > Oct 15 01:02:20 at > org.apache.flink.runtime.rpc.pekko.RemotePekkoRpcActorTest.lambda$failsRpcResultImmediatelyIfRemoteRpcServiceIsNotAvailable$1(RemotePekkoRpcActorTest.java:168) > Oct 15 01:02:20 ...(63 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed), > ... > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35490][cdc][JUnit5 Migration] For `flink-cdc-connect/flink-cdc-source-connectors` module [flink-cdc]
morazow opened a new pull request, #3395: URL: https://github.com/apache/flink-cdc/pull/3395 https://issues.apache.org/jira/browse/FLINK-35490 Junit4 to Junit5 migration of `flink-cdc-connect/flink-cdc-source-connectors` module. Reference document: [Junit5 Migration Guide](https://docs.google.com/document/d/1514Wa_aNB9bJUen4xm5uiuXOooOJTtXqS_Jqk9KJitU). - All the unit and integration tests should work as expected -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35490) [JUnit5 Migration] Module: Flink CDC flink-cdc-connect/flink-cdc-source-connectors
[ https://issues.apache.org/jira/browse/FLINK-35490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35490: --- Labels: pull-request-available (was: ) > [JUnit5 Migration] Module: Flink CDC > flink-cdc-connect/flink-cdc-source-connectors > -- > > Key: FLINK-35490 > URL: https://issues.apache.org/jira/browse/FLINK-35490 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: Muhammet Orazov >Priority: Major > Labels: pull-request-available > > Migrate Junit4 tests to Junit5 in the Flink CDC following modules: > > - flink-cdc-connect/flink-cdc-source-connectors -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35526) Remove deprecated stedolan/jq Docker image from Flink e2e tests
[ https://issues.apache.org/jira/browse/FLINK-35526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852710#comment-17852710 ] Robert Metzger commented on FLINK-35526: 1.18: https://github.com/apache/flink/commit/b51dc716ce48862269fdd409ea85036d61ee7b57 1.19: https://github.com/apache/flink/commit/b9c666e6a51702c35101af2d13277115f48be993 > Remove deprecated stedolan/jq Docker image from Flink e2e tests > --- > > Key: FLINK-35526 > URL: https://issues.apache.org/jira/browse/FLINK-35526 > Project: Flink > Issue Type: Bug > Components: Test Infrastructure >Reporter: Robert Metzger >Assignee: Robert Metzger >Priority: Minor > Labels: pull-request-available > > Our CI logs contain this warning: > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60060&view=logs&j=af184cdd-c6d8-5084-0b69-7e9c67b35f7a&t=0f3adb59-eefa-51c6-2858-3654d9e0749d&l=3828 > {code} > latest: Pulling from stedolan/jq > [DEPRECATION NOTICE] Docker Image Format v1, and Docker Image manifest > version 2, schema 1 support will be removed in an upcoming release. Suggest > the author of docker.io/stedolan/jq:latest to upgrade the image to the OCI > Format, or Docker Image manifest v2, schema 2. More information at > https://docs.docker.com/go/deprecated-image-specs/ > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-35526) Remove deprecated stedolan/jq Docker image from Flink e2e tests
[ https://issues.apache.org/jira/browse/FLINK-35526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-35526. Fix Version/s: 1.18.2 1.19.1 Resolution: Fixed > Remove deprecated stedolan/jq Docker image from Flink e2e tests > --- > > Key: FLINK-35526 > URL: https://issues.apache.org/jira/browse/FLINK-35526 > Project: Flink > Issue Type: Bug > Components: Test Infrastructure >Reporter: Robert Metzger >Assignee: Robert Metzger >Priority: Minor > Labels: pull-request-available > Fix For: 1.18.2, 1.19.1 > > > Our CI logs contain this warning: > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60060&view=logs&j=af184cdd-c6d8-5084-0b69-7e9c67b35f7a&t=0f3adb59-eefa-51c6-2858-3654d9e0749d&l=3828 > {code} > latest: Pulling from stedolan/jq > [DEPRECATION NOTICE] Docker Image Format v1, and Docker Image manifest > version 2, schema 1 support will be removed in an upcoming release. Suggest > the author of docker.io/stedolan/jq:latest to upgrade the image to the OCI > Format, or Docker Image manifest v2, schema 2. More information at > https://docs.docker.com/go/deprecated-image-specs/ > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Adding MongoDB Connector v1.2.0 [flink-web]
dannycranmer merged PR #735: URL: https://github.com/apache/flink-web/pull/735 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-35139) Release flink-connector-mongodb v1.2.0 for Flink 1.19
[ https://issues.apache.org/jira/browse/FLINK-35139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer resolved FLINK-35139. --- Resolution: Fixed > Release flink-connector-mongodb v1.2.0 for Flink 1.19 > - > > Key: FLINK-35139 > URL: https://issues.apache.org/jira/browse/FLINK-35139 > Project: Flink > Issue Type: Sub-task > Components: Connectors / MongoDB >Reporter: Danny Cranmer >Assignee: Danny Cranmer >Priority: Major > Labels: pull-request-available > Fix For: mongodb-1.2.0 > > > https://github.com/apache/flink-connector-mongodb -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35532][Runtime/Web Frontend] Prevent Cross-Site Authentication… [flink]
hlteoh37 merged PR #24897: URL: https://github.com/apache/flink/pull/24897 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35532) Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard
[ https://issues.apache.org/jira/browse/FLINK-35532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852716#comment-17852716 ] Hong Liang Teoh commented on FLINK-35532: - merged commit [{{ceb4db4}}|https://github.com/apache/flink/commit/ceb4db4d88a91546a179cc9fe1bc86ea1d7bb42a] into apache:master > Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard > -- > > Key: FLINK-35532 > URL: https://issues.apache.org/jira/browse/FLINK-35532 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / Web Frontend >Affects Versions: 1.19.0, 1.19.1 >Reporter: Hong Liang Teoh >Assignee: Hong Liang Teoh >Priority: Minor > Labels: pull-request-available > Fix For: 1.19.2 > > > As part of FLINK-33325, we introduced a new tab on the Flink dashboard to > trigger the async profiler on the JobManager and TaskManager. > > The HTML component introduced links out to async profiler page on Github -> > [https://github.com/async-profiler/async-profiler/wiki]. > However, the anchor element introduced does not follow best practices around > preventing XSA attacks, by setting up the below: > {code:java} > target="_blank" rel="noopener noreferrer"{code} > We should add these attributes as best practice! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35532][Runtime/Web Frontend] Prevent Cross-Site Authentication… [flink]
hlteoh37 opened a new pull request, #24903: URL: https://github.com/apache/flink/pull/24903 … (XSA) attacks on Flink dashboard ## What is the purpose of the change - Backport of https://github.com/apache/flink/pull/24897 - Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard ## Brief change log - Set `target="_blank" rel="noopener noreferrer"` properties on the hyperlink reference element on Flink dashboard. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35532][Runtime/Web Frontend] Prevent Cross-Site Authentication… [flink]
hlteoh37 merged PR #24903: URL: https://github.com/apache/flink/pull/24903 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35532) Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard
[ https://issues.apache.org/jira/browse/FLINK-35532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852719#comment-17852719 ] Hong Liang Teoh commented on FLINK-35532: - merged commit [{{175ed72}}|https://github.com/apache/flink/commit/175ed72e15c6228dbc36cf42105ef9f5a03b] into apache:release-1.19 > Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard > -- > > Key: FLINK-35532 > URL: https://issues.apache.org/jira/browse/FLINK-35532 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / Web Frontend >Affects Versions: 1.19.0, 1.19.1 >Reporter: Hong Liang Teoh >Assignee: Hong Liang Teoh >Priority: Minor > Labels: pull-request-available > Fix For: 1.19.2 > > > As part of FLINK-33325, we introduced a new tab on the Flink dashboard to > trigger the async profiler on the JobManager and TaskManager. > > The HTML component introduced links out to async profiler page on Github -> > [https://github.com/async-profiler/async-profiler/wiki]. > However, the anchor element introduced does not follow best practices around > preventing XSA attacks, by setting up the below: > {code:java} > target="_blank" rel="noopener noreferrer"{code} > We should add these attributes as best practice! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35532) Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard
[ https://issues.apache.org/jira/browse/FLINK-35532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh updated FLINK-35532: Fix Version/s: 1.20.0 1.19.1 (was: 1.19.2) > Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard > -- > > Key: FLINK-35532 > URL: https://issues.apache.org/jira/browse/FLINK-35532 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / Web Frontend >Affects Versions: 1.19.0, 1.19.1 >Reporter: Hong Liang Teoh >Assignee: Hong Liang Teoh >Priority: Minor > Labels: pull-request-available > Fix For: 1.20.0, 1.19.1 > > > As part of FLINK-33325, we introduced a new tab on the Flink dashboard to > trigger the async profiler on the JobManager and TaskManager. > > The HTML component introduced links out to async profiler page on Github -> > [https://github.com/async-profiler/async-profiler/wiki]. > However, the anchor element introduced does not follow best practices around > preventing XSA attacks, by setting up the below: > {code:java} > target="_blank" rel="noopener noreferrer"{code} > We should add these attributes as best practice! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-35532) Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard
[ https://issues.apache.org/jira/browse/FLINK-35532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh resolved FLINK-35532. - Resolution: Fixed > Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard > -- > > Key: FLINK-35532 > URL: https://issues.apache.org/jira/browse/FLINK-35532 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / Web Frontend >Affects Versions: 1.19.0 >Reporter: Hong Liang Teoh >Assignee: Hong Liang Teoh >Priority: Minor > Labels: pull-request-available > Fix For: 1.20.0, 1.19.1 > > > As part of FLINK-33325, we introduced a new tab on the Flink dashboard to > trigger the async profiler on the JobManager and TaskManager. > > The HTML component introduced links out to async profiler page on Github -> > [https://github.com/async-profiler/async-profiler/wiki]. > However, the anchor element introduced does not follow best practices around > preventing XSA attacks, by setting up the below: > {code:java} > target="_blank" rel="noopener noreferrer"{code} > We should add these attributes as best practice! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35532) Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard
[ https://issues.apache.org/jira/browse/FLINK-35532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh updated FLINK-35532: Affects Version/s: (was: 1.19.1) > Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard > -- > > Key: FLINK-35532 > URL: https://issues.apache.org/jira/browse/FLINK-35532 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / Web Frontend >Affects Versions: 1.19.0 >Reporter: Hong Liang Teoh >Assignee: Hong Liang Teoh >Priority: Minor > Labels: pull-request-available > Fix For: 1.20.0, 1.19.1 > > > As part of FLINK-33325, we introduced a new tab on the Flink dashboard to > trigger the async profiler on the JobManager and TaskManager. > > The HTML component introduced links out to async profiler page on Github -> > [https://github.com/async-profiler/async-profiler/wiki]. > However, the anchor element introduced does not follow best practices around > preventing XSA attacks, by setting up the below: > {code:java} > target="_blank" rel="noopener noreferrer"{code} > We should add these attributes as best practice! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35542) ClassNotFoundException when deserializing CheckpointedOffset
Jan Gurda created FLINK-35542: - Summary: ClassNotFoundException when deserializing CheckpointedOffset Key: FLINK-35542 URL: https://issues.apache.org/jira/browse/FLINK-35542 Project: Flink Issue Type: Bug Components: Connectors / JDBC Affects Versions: jdbc-3.1.2 Environment: Flink 1.19.0 Flink JDBC Connector 3.2-SNAPSHOT (commit 2defbbcf4fc550a76dd9c664e1eed7d261e028ca) JDK 11 (Temurin) Reporter: Jan Gurda Fix For: jdbc-3.2.0 I use the latest flink-connector-jdbc code from the main branch, it's actually 3.2-SNAPSHOT (commit 2defbbcf4fc550a76dd9c664e1eed7d261e028ca). When jobs get interrupted while reading data from the JDBC source (for example, by the TaskManager outage), they cannot recover due to the following exception: {code:java} java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.connector.jdbc.source.split.CheckpointedOffset at org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:71) at org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:34) at org.apache.flink.connector.base.source.hybrid.HybridSourceSplit.unwrapSplit(HybridSourceSplit.java:122) at org.apache.flink.connector.base.source.hybrid.HybridSourceReader.addSplits(HybridSourceReader.java:158) at org.apache.flink.connector.base.source.hybrid.HybridSourceReader.setCurrentReader(HybridSourceReader.java:247) at org.apache.flink.connector.base.source.hybrid.HybridSourceReader.handleSourceEvents(HybridSourceReader.java:186) at org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:571) at org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:72) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:80) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$22(StreamTask.java:1540) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.base/java.lang.Thread.run(Unknown Source) Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.jdbc.source.split.CheckpointedOffset at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source) at java.base/java.lang.ClassLoader.loadClass(Unknown Source) at java.base/java.lang.Class.forName0(Native Method) at java.base/java.lang.Class.forName(Unknown Source) at java.base/java.io.ObjectInputStream.resolveClass(Unknown Source) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:92) at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source) at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.base/java.io.ObjectInputStream.readObject0(Unknown Source) at java.base/java.io.ObjectInputStream.readObject(Unknown Source) at java.base/java.io.ObjectInputStream.readObject(Unknown Source) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527) at org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserializeJdbcSourceSplit(JdbcSourceSplitSerializer.java:109) at org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:69)
[jira] [Updated] (FLINK-35542) ClassNotFoundException when deserializing CheckpointedOffset
[ https://issues.apache.org/jira/browse/FLINK-35542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35542: --- Labels: pull-request-available (was: ) > ClassNotFoundException when deserializing CheckpointedOffset > > > Key: FLINK-35542 > URL: https://issues.apache.org/jira/browse/FLINK-35542 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: jdbc-3.1.2 > Environment: Flink 1.19.0 > Flink JDBC Connector 3.2-SNAPSHOT (commit > 2defbbcf4fc550a76dd9c664e1eed7d261e028ca) > JDK 11 (Temurin) >Reporter: Jan Gurda >Priority: Major > Labels: pull-request-available > Fix For: jdbc-3.2.0 > > > I use the latest flink-connector-jdbc code from the main branch, it's > actually 3.2-SNAPSHOT (commit 2defbbcf4fc550a76dd9c664e1eed7d261e028ca). > > When jobs get interrupted while reading data from the JDBC source (for > example, by the TaskManager outage), they cannot recover due to the following > exception: > {code:java} > java.lang.RuntimeException: java.lang.ClassNotFoundException: > org.apache.flink.connector.jdbc.source.split.CheckpointedOffset > at > org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:71) > at > org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:34) > at > org.apache.flink.connector.base.source.hybrid.HybridSourceSplit.unwrapSplit(HybridSourceSplit.java:122) > at > org.apache.flink.connector.base.source.hybrid.HybridSourceReader.addSplits(HybridSourceReader.java:158) > at > org.apache.flink.connector.base.source.hybrid.HybridSourceReader.setCurrentReader(HybridSourceReader.java:247) > at > org.apache.flink.connector.base.source.hybrid.HybridSourceReader.handleSourceEvents(HybridSourceReader.java:186) > at > org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:571) > at > org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:72) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:80) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$22(StreamTask.java:1540) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.connector.jdbc.source.split.CheckpointedOffset > at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown > Source) > at > java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown > Source) > at java.base/java.lang.ClassLoader.loadClass(Unknown Source) > at java.base/java.lang.Class.forName0(Native Method) > at java.base/java.lang.Class.forName(Unknown Source) > at java.base/java.io.ObjectInputStream.resolveClass(Unknown Source) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:92) > at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) > at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source) > at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > at java.base/java.io.ObjectInputStream.readObject0(Unknown Source) > at java.base/java.io.ObjectInputStream.readObject(Unknown Source) > at java.base/java.io.ObjectInputStream.readObject(Unknown Source) >
[jira] [Updated] (FLINK-35541) Introduce retry limiting for AWS connector sinks
[ https://issues.apache.org/jira/browse/FLINK-35541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh updated FLINK-35541: Description: Currently if the record write operation in the sink consistently fails with retriable error, sinks will retry indefinitely. In case when cause of the error is not resolved this may lead to poison pill. Proposal here is to add a configurable retry limit for each record. Users can specify a maximum retry per record, and the sink will fail once the retry limit is reached. We will implement this for all AWS connectors: * DDBSink * Firehose Sink * Kinesis Sink was:Currently if the record write operation in the sink consistently fails with retriable error, sinks will retry indefinitely. In case when cause of the error is not resolved this may lead to stuck operator. > Introduce retry limiting for AWS connector sinks > > > Key: FLINK-35541 > URL: https://issues.apache.org/jira/browse/FLINK-35541 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / AWS, Connectors / DynamoDB, Connectors / > Firehose, Connectors / Kinesis >Affects Versions: aws-connector-4.2.0 >Reporter: Aleksandr Pilipenko >Priority: Major > > Currently if the record write operation in the sink consistently fails with > retriable error, sinks will retry indefinitely. In case when cause of the > error is not resolved this may lead to poison pill. > > Proposal here is to add a configurable retry limit for each record. Users can > specify a maximum retry per record, and the sink will fail once the retry > limit is reached. > > We will implement this for all AWS connectors: > * DDBSink > * Firehose Sink > * Kinesis Sink > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35542] Set the correct class loader to lookup for the class CheckpointedOffset [flink-connector-jdbc]
jan-osc opened a new pull request, #130: URL: https://github.com/apache/flink-connector-jdbc/pull/130 - [FLINK-35542] Use the class loader of class CheckpointedOffset while deserializing JdbcSourceSplit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35542] Set the correct class loader to lookup for the class CheckpointedOffset [flink-connector-jdbc]
boring-cyborg[bot] commented on PR #130: URL: https://github.com/apache/flink-connector-jdbc/pull/130#issuecomment-2151980850 Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35540][cdc-common][cdc-connector][mysql] fix lost table when database and table are with the same name [flink-cdc]
qg-lin opened a new pull request, #3396: URL: https://github.com/apache/flink-cdc/pull/3396 https://issues.apache.org/jira/browse/FLINK-35540 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35540) flink-cdc-pipeline-connector-mysql lost table which database and table with the same name
[ https://issues.apache.org/jira/browse/FLINK-35540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35540: --- Labels: pull-request-available (was: ) > flink-cdc-pipeline-connector-mysql lost table which database and table with > the same name > - > > Key: FLINK-35540 > URL: https://issues.apache.org/jira/browse/FLINK-35540 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: linqigeng >Priority: Major > Labels: pull-request-available > > h1. Description > When the parameter of 'tables' in mysql pipeline job contains a table which > database and table are with the same name like 'app.app', the job will fail > and the error meaasge is like: > {code:java} > java.lang.IllegalArgumentException: Cannot find any table by the option > 'tables' = app.app {code} > h1. How to reproduce > Create database and table all named `{{{}app`{}}}, then submit a pipeline job > like this YAML defined: > {code:java} > source: > type: mysql > hostname: localhost > port: 3306 > username: root > password: 123456 > tables: app.app > server-id: 5400-5404 > server-time-zone: UTC > sink: > type: doris > fenodes: 127.0.0.1:8030 > username: root > password: "" > table.create.properties.light_schema_change: true > table.create.properties.replication_num: 1pipeline: > name: Sync MySQL Database to Doris > parallelism: 2 {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [BP-3.1][FLINK-35415][base] Fix compatibility with Flink 1.19 [flink-cdc]
yuxiqian commented on PR #3375: URL: https://github.com/apache/flink-cdc/pull/3375#issuecomment-2152104901 Fixed E2e package failure & rebased to `release-3.1` branch, could @leonardBang please trigger the CI? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-3.1][FLINK-35325][cdc-connector][paimon]Support for specifying column order when adding new columns to a table. [flink-cdc]
leonardBang commented on PR #3376: URL: https://github.com/apache/flink-cdc/pull/3376#issuecomment-2152114473 Thanks @joyCurry30 for the contribution, I close this one as it's a feature and should not be merged to release-3.1, only bugfix should be merged -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-3.1][FLINK-35325][cdc-connector][paimon]Support for specifying column order when adding new columns to a table. [flink-cdc]
leonardBang closed pull request #3376: [BP-3.1][FLINK-35325][cdc-connector][paimon]Support for specifying column order when adding new columns to a table. URL: https://github.com/apache/flink-cdc/pull/3376 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20625][pubsub,e2e] Add PubSubSource connector using FLIP-27 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on code in PR #2: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/2#discussion_r1627352870 ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java: ## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.gcp.pubsub.source; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorState; +import org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorStateSerializer; +import org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubSourceEnumerator; +import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubRecordEmitter; +import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSourceReader; +import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSplitReader; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit; +import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.util.Preconditions; + +import com.google.auth.Credentials; +import com.google.pubsub.v1.ProjectSubscriptionName; + +import java.io.IOException; +import java.time.Duration; +import java.util.Properties; +import java.util.function.Supplier; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; + +/** + * A source implementation to pull messages from GCP Pub/Sub into Flink. + * + * The {@link PubSubSourceEnumerator} assigns a static {@link PubSubSplit} to every {@link + * PubSubSourceReader} that joins. The split does not contain any split-specific information because + * Pub/Sub does not allow subscribers to specify a "range" of messages to pull by providing + * partitions or offsets. However, Pub/Sub will automatically load-balance messages between multiple + * readers using same subscription. + * + * A {@link PubSubSource} can be constructed through the {@link PubSubSourceBuilder} like so: + * + * {@code + * PubSubSource.builder() + * // The deserialization schema to deserialize Pub/Sub messages + * .setDeserializationSchema(new SimpleStringSchema()) + * // The name string of your Pub/Sub project + * .setProjectName(PROJECT_NAME) + * // The name string of the subscription you would like to receive messages from + * .setSubscriptionName(SUBSCRIPTION_NAME) + * // An instance of com.google.auth.Credentials to authenticate against Google Cloud + * .setCredentials(CREDENTIALS) + * .setPubSubSubscriberFactory( + * // The maximum number of messages that should be pulled in one go + * 3, + * // The timeout after which a message pull request is deemed a failure + * Duration.ofSeconds(1), + * // The number of times the reception of a
[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode
[ https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852748#comment-17852748 ] Matthias Pohl commented on FLINK-35035: --- Thanks for the pointer, [~dmvk]. We looked into this issue while working on [FLIP-461|https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler] (which is kind of related) and plan to do a follow-up FLIP that will align the resource controlling mechanism of the {{AdaptiveScheduler}}'s {{WaitingForResources}} and {{Executing}} states. Currently, we have parameters intervening in the rescaling in different places ([j.a.scaling-interval.min|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-min], [j.a.scaling-interval.max|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-max] being utilized in {{Executing}} and [j.a.resource-stabilization-timeout|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-stabilization-timeout) being utilized in {{WaitingForResources}}). Having a {{resource-stabilization}} phase in {{Executing}} should resolve the problem described in this Jira issue here. > Reduce job pause time when cluster resources are expanded in adaptive mode > -- > > Key: FLINK-35035 > URL: https://issues.apache.org/jira/browse/FLINK-35035 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.19.0 >Reporter: yuanfenghu >Priority: Minor > > When 'jobmanager.scheduler = adaptive' , job graph changes triggered by > cluster expansion will cause long-term task stagnation. We should reduce this > impact. > As an example: > I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)] > When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5] > When I add slots the task will trigger jobgraph changes,by > org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable, > However, the five new slots I added were not discovered at the same time (for > convenience, I assume that a taskmanager has one slot), because no matter > what environment we add, we cannot guarantee that the new slots will be added > at once, so this will cause onNewResourcesAvailable triggers repeatedly > ,If each new slot action has a certain interval, then the jobgraph will > continue to change during this period. What I hope is that there will be a > stable time to configure the cluster resources and then go to it after the > number of cluster slots has been stable for a certain period of time. Trigger > jobgraph changes to avoid this situation -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode
[ https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852748#comment-17852748 ] Matthias Pohl edited comment on FLINK-35035 at 6/6/24 11:47 AM: Thanks for the pointer, [~dmvk]. We looked into this issue while working on [FLIP-461|https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler] (which is kind of related) and plan to do a follow-up FLIP that will align the resource controlling mechanism of the {{{}AdaptiveScheduler{}}}'s {{WaitingForResources}} and {{Executing}} states. Currently, we have parameters intervening in the rescaling in different places ([j.a.scaling-interval.min|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-min], [j.a.scaling-interval.max|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-max] being utilized in {{Executing}} and [j.a.resource-stabilization-timeout|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-stabilization-timeout] being utilized in {{{}WaitingForResources){}}}. Having a {{resource-stabilization}} phase in {{Executing}} should resolve the problem described in this Jira issue here. was (Author: mapohl): Thanks for the pointer, [~dmvk]. We looked into this issue while working on [FLIP-461|https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler] (which is kind of related) and plan to do a follow-up FLIP that will align the resource controlling mechanism of the {{AdaptiveScheduler}}'s {{WaitingForResources}} and {{Executing}} states. Currently, we have parameters intervening in the rescaling in different places ([j.a.scaling-interval.min|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-min], [j.a.scaling-interval.max|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-max] being utilized in {{Executing}} and [j.a.resource-stabilization-timeout|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-stabilization-timeout) being utilized in {{WaitingForResources}}). Having a {{resource-stabilization}} phase in {{Executing}} should resolve the problem described in this Jira issue here. > Reduce job pause time when cluster resources are expanded in adaptive mode > -- > > Key: FLINK-35035 > URL: https://issues.apache.org/jira/browse/FLINK-35035 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.19.0 >Reporter: yuanfenghu >Priority: Minor > > When 'jobmanager.scheduler = adaptive' , job graph changes triggered by > cluster expansion will cause long-term task stagnation. We should reduce this > impact. > As an example: > I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)] > When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5] > When I add slots the task will trigger jobgraph changes,by > org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable, > However, the five new slots I added were not discovered at the same time (for > convenience, I assume that a taskmanager has one slot), because no matter > what environment we add, we cannot guarantee that the new slots will be added > at once, so this will cause onNewResourcesAvailable triggers repeatedly > ,If each new slot action has a certain interval, then the jobgraph will > continue to change during this period. What I hope is that there will be a > stable time to configure the cluster resources and then go to it after the > number of cluster slots has been stable for a certain period of time. Trigger > jobgraph changes to avoid this situation -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34908][pipeline-connector][doris] Fix Mysql pipeline to doris will lost precision for timestamp [flink-cdc]
leonardBang merged PR #3207: URL: https://github.com/apache/flink-cdc/pull/3207 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34908][pipeline-connector][doris] Fix Mysql pipeline to doris will lost precision for timestamp [flink-cdc]
leonardBang commented on PR #3207: URL: https://github.com/apache/flink-cdc/pull/3207#issuecomment-2152195587 @gong would you like to open a PR for release-3.1 branch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34908) [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision for timestamp
[ https://issues.apache.org/jira/browse/FLINK-34908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu updated FLINK-34908: --- Issue Type: Bug (was: Improvement) > [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision > for timestamp > --- > > Key: FLINK-34908 > URL: https://issues.apache.org/jira/browse/FLINK-34908 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: Xin Gong >Assignee: Xin Gong >Priority: Minor > Labels: pull-request-available > Fix For: cdc-3.2.0 > > > flink cdc pipeline will decide timestamp zone by config of pipeline. I found > mysql2doris and mysql2starracks will specific datetime format > -MM-dd HH:mm:ss, it will cause lost datatime precision. I think we > should't set fixed datetime format, just return LocalDateTime object. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34908) [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision for timestamp
[ https://issues.apache.org/jira/browse/FLINK-34908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852752#comment-17852752 ] Leonard Xu commented on FLINK-34908: master: e2ccc836a056c16974e4956190bdce249705b7ee 3.1: todo > [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision > for timestamp > --- > > Key: FLINK-34908 > URL: https://issues.apache.org/jira/browse/FLINK-34908 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: Xin Gong >Assignee: Xin Gong >Priority: Minor > Labels: pull-request-available > Fix For: cdc-3.2.0 > > > flink cdc pipeline will decide timestamp zone by config of pipeline. I found > mysql2doris and mysql2starracks will specific datetime format > -MM-dd HH:mm:ss, it will cause lost datatime precision. I think we > should't set fixed datetime format, just return LocalDateTime object. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34908) [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision for timestamp
[ https://issues.apache.org/jira/browse/FLINK-34908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu updated FLINK-34908: --- Fix Version/s: cdc-3.1.1 > [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision > for timestamp > --- > > Key: FLINK-34908 > URL: https://issues.apache.org/jira/browse/FLINK-34908 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Xin Gong >Assignee: Xin Gong >Priority: Minor > Labels: pull-request-available > Fix For: cdc-3.2.0, cdc-3.1.1 > > > flink cdc pipeline will decide timestamp zone by config of pipeline. I found > mysql2doris and mysql2starracks will specific datetime format > -MM-dd HH:mm:ss, it will cause lost datatime precision. I think we > should't set fixed datetime format, just return LocalDateTime object. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34908) [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision for timestamp
[ https://issues.apache.org/jira/browse/FLINK-34908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu updated FLINK-34908: --- Affects Version/s: cdc-3.1.0 > [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision > for timestamp > --- > > Key: FLINK-34908 > URL: https://issues.apache.org/jira/browse/FLINK-34908 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Xin Gong >Assignee: Xin Gong >Priority: Minor > Labels: pull-request-available > Fix For: cdc-3.2.0 > > > flink cdc pipeline will decide timestamp zone by config of pipeline. I found > mysql2doris and mysql2starracks will specific datetime format > -MM-dd HH:mm:ss, it will cause lost datatime precision. I think we > should't set fixed datetime format, just return LocalDateTime object. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35092][cdc][starrocks] Add starrocks integration test cases [flink-cdc]
yuxiqian commented on PR #3231: URL: https://github.com/apache/flink-cdc/pull/3231#issuecomment-2152228771 Fixed in #3348. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35120][doris] Add Doris integration test cases [flink-cdc]
yuxiqian commented on PR #3227: URL: https://github.com/apache/flink-cdc/pull/3227#issuecomment-2152229529 Fixed in #3348. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35092][cdc][starrocks] Add starrocks integration test cases [flink-cdc]
yuxiqian closed pull request #3231: [FLINK-35092][cdc][starrocks] Add starrocks integration test cases URL: https://github.com/apache/flink-cdc/pull/3231 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][tests] Fix occasional pipeline E2e testcases failure [flink-cdc]
yuxiqian commented on PR #3309: URL: https://github.com/apache/flink-cdc/pull/3309#issuecomment-2152233915 Fixed in #3348. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-3.1][minor][cdc][docs] Optimize markdown formats in doris quickstart doc [flink-cdc]
leonardBang merged PR #3325: URL: https://github.com/apache/flink-cdc/pull/3325 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35120][doris] Add Doris integration test cases [flink-cdc]
yuxiqian closed pull request #3227: [FLINK-35120][doris] Add Doris integration test cases URL: https://github.com/apache/flink-cdc/pull/3227 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-3.0][minor][cdc][docs] Optimize markdown formats in doris quickstart doc [flink-cdc]
leonardBang merged PR #3326: URL: https://github.com/apache/flink-cdc/pull/3326 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][tests] Fix occasional pipeline E2e testcases failure [flink-cdc]
yuxiqian closed pull request #3309: [hotfix][tests] Fix occasional pipeline E2e testcases failure URL: https://github.com/apache/flink-cdc/pull/3309 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35463]Fixed issue for route rule changed when restored from checkpoint. [flink-cdc]
yuxiqian commented on code in PR #3364: URL: https://github.com/apache/flink-cdc/pull/3364#discussion_r1629411656 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java: ## @@ -130,12 +130,13 @@ public CompletableFuture handleSchemaChangeRequest( LOG.info( "Received schema change event request from table {}. Start to buffer requests for others.", request.getTableId().toString()); -if (request.getSchemaChangeEvent() instanceof CreateTableEvent -&& schemaManager.schemaExists(request.getTableId())) { -return CompletableFuture.completedFuture( -wrap(new SchemaChangeResponse(Collections.emptyList(; Review Comment: I think this check is meant to avoid sending duplicate CreateTableEvent to downstream after restored from checkpoints. Will this patch change that behavior? ## flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManagerTest.java: ## Review Comment: Could @hk-lrzy please add an integrated case to validate if route rule could be correctly restored? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35436]Fixed Schema Behavior Action for the Pipeline. [flink-cdc]
yuxiqian commented on PR #3355: URL: https://github.com/apache/flink-cdc/pull/3355#issuecomment-2152262621 Seems FLINK-35436 has lots of overlapping with FLINK-35242, which is expected to be shipped with CDC 3.2, I would prefer closing this PR first. Thoughts? @leonardBang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35501] Use common IO thread pool for RocksDB data transfer [flink]
rkhachatryan merged PR #24882: URL: https://github.com/apache/flink/pull/24882 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35501] Use common IO thread pool for RocksDB data transfer [flink]
rkhachatryan commented on PR #24882: URL: https://github.com/apache/flink/pull/24882#issuecomment-2152267918 Thanks for the review! Merged into master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35501) Use common thread pools when transferring RocksDB state files
[ https://issues.apache.org/jira/browse/FLINK-35501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852758#comment-17852758 ] Roman Khachatryan commented on FLINK-35501: --- Merged into master as 9708f9fd65751296b3b0377c964207b630958259..5855f5354985cabea6f01b6b2effaaa8cfcbee55 > Use common thread pools when transferring RocksDB state files > - > > Key: FLINK-35501 > URL: https://issues.apache.org/jira/browse/FLINK-35501 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.20.0 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > Currently, each RocksDB state backend creates an executor backed by a thread > pool. > This makes it difficult to control the total number of threads per TM because > it might have at least one task per slot and theoretically, many state > backends per task (because of chaining). > Additionally, using a common thread pool allows to indirectly control the > load on the underlying DFS (e.g. the total number of requests to S3 from a > TM). -- This message was sent by Atlassian Jira (v8.20.10#820010)