[jira] [Created] (FLINK-35537) Error parsing list of enum in legacy yaml configuration

2024-06-06 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35537:
---

 Summary: Error parsing list of enum in legacy yaml configuration 
 Key: FLINK-35537
 URL: https://issues.apache.org/jira/browse/FLINK-35537
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Configuration
Affects Versions: 1.19.0
Reporter: Zakelly Lan


In flink 1.9.0, when I submit a job to a standalone cluster, the TM throws
{code:java}
Caused by: java.lang.IllegalArgumentException: Could not parse value 
'[NO_COMPRESSION]' for key 'state.backend.rocksdb.compression.per.level'.
at 
org.apache.flink.configuration.Configuration.getOptional(Configuration.java:827)
at 
org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.internalGetOption(RocksDBResourceContainer.java:312)
at 
org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.setColumnFamilyOptionsFromConfigurableOptions(RocksDBResourceContainer.java:361)
at 
org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.getColumnOptions(RocksDBResourceContainer.java:181)
at 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.lambda$createKeyedStateBackend$0(EmbeddedRocksDBStateBackend.java:449)
at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyOptions(RocksDBOperationUtils.java:219)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.loadDb(RocksDBHandle.java:138)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.openDB(RocksDBHandle.java:113)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation.restore(RocksDBNoneRestoreOperation.java:62)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:333)
... 19 more
Caused by: java.lang.IllegalArgumentException: Could not parse value for enum 
class org.rocksdb.CompressionType. Expected one of: [[NO_COMPRESSION, 
SNAPPY_COMPRESSION, ZLIB_COMPRESSION, BZLIB2_COMPRESSION, LZ4_COMPRESSION, 
LZ4HC_COMPRESSION, XPRESS_COMPRESSION, ZSTD_COMPRESSION, 
DISABLE_COMPRESSION_OPTION]]
at 
org.apache.flink.configuration.ConfigurationUtils.lambda$convertToEnum$12(ConfigurationUtils.java:502)
at java.util.Optional.orElseThrow(Optional.java:290)
at 
org.apache.flink.configuration.ConfigurationUtils.convertToEnum(ConfigurationUtils.java:499)
at 
org.apache.flink.configuration.ConfigurationUtils.convertValue(ConfigurationUtils.java:392)
at 
org.apache.flink.configuration.ConfigurationUtils.lambda$convertToListWithLegacyProperties$4(ConfigurationUtils.java:440)
at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at 
org.apache.flink.configuration.ConfigurationUtils.convertToListWithLegacyProperties(ConfigurationUtils.java:441)
at 
org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:432)
at 
org.apache.flink.configuration.Configuration.lambda$getOptional$3(Configuration.java:819)
at java.util.Optional.map(Optional.java:215)
at 
org.apache.flink.configuration.Configuration.getOptional(Configuration.java:819)
... 28 more
{code}
I configured 'state.backend.rocksdb.compression.per.level: NO_COMPRESSION' in 
flink-conf.yaml. I also tried the flink-1.18.1, and it runs well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35537) Error parsing list of enum in legacy yaml configuration

2024-06-06 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852663#comment-17852663
 ] 

Zakelly Lan commented on FLINK-35537:
-

In my guess this is something like the option is parsed twice, since the error 
message gives the raw value is '[NO_COMPRESSION]' instead of 'NO_COMPRESSION'.

> Error parsing list of enum in legacy yaml configuration 
> 
>
> Key: FLINK-35537
> URL: https://issues.apache.org/jira/browse/FLINK-35537
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration
>Affects Versions: 1.19.0
>Reporter: Zakelly Lan
>Priority: Major
>
> In flink 1.9.0, when I submit a job to a standalone cluster, the TM throws
> {code:java}
> Caused by: java.lang.IllegalArgumentException: Could not parse value 
> '[NO_COMPRESSION]' for key 'state.backend.rocksdb.compression.per.level'.
>   at 
> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:827)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.internalGetOption(RocksDBResourceContainer.java:312)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.setColumnFamilyOptionsFromConfigurableOptions(RocksDBResourceContainer.java:361)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.getColumnOptions(RocksDBResourceContainer.java:181)
>   at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.lambda$createKeyedStateBackend$0(EmbeddedRocksDBStateBackend.java:449)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyOptions(RocksDBOperationUtils.java:219)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.loadDb(RocksDBHandle.java:138)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.openDB(RocksDBHandle.java:113)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation.restore(RocksDBNoneRestoreOperation.java:62)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:333)
>   ... 19 more
> Caused by: java.lang.IllegalArgumentException: Could not parse value for enum 
> class org.rocksdb.CompressionType. Expected one of: [[NO_COMPRESSION, 
> SNAPPY_COMPRESSION, ZLIB_COMPRESSION, BZLIB2_COMPRESSION, LZ4_COMPRESSION, 
> LZ4HC_COMPRESSION, XPRESS_COMPRESSION, ZSTD_COMPRESSION, 
> DISABLE_COMPRESSION_OPTION]]
>   at 
> org.apache.flink.configuration.ConfigurationUtils.lambda$convertToEnum$12(ConfigurationUtils.java:502)
>   at java.util.Optional.orElseThrow(Optional.java:290)
>   at 
> org.apache.flink.configuration.ConfigurationUtils.convertToEnum(ConfigurationUtils.java:499)
>   at 
> org.apache.flink.configuration.ConfigurationUtils.convertValue(ConfigurationUtils.java:392)
>   at 
> org.apache.flink.configuration.ConfigurationUtils.lambda$convertToListWithLegacyProperties$4(ConfigurationUtils.java:440)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>   at 
> org.apache.flink.configuration.ConfigurationUtils.convertToListWithLegacyProperties(ConfigurationUtils.java:441)
>   at 
> org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:432)
>   at 
> org.apache.flink.configuration.Configuration.lambda$getOptional$3(Configuration.java:819)
>   at java.util.Optional.map(Optional.java:215)
>   at 
> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:819)
>   ... 28 more
> {code}
> I configured 'state.backend.rocksdb.compression.per.level: NO_COMPRESSION' in 
> flink-conf.yaml. I also tried the flink-1.18.1, and it runs well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35049][state] Implement Map Async State API for ForStStateBackend [flink]

2024-06-06 Thread via GitHub


fredia commented on code in PR #24812:
URL: https://github.com/apache/flink/pull/24812#discussion_r1628898055


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBBunchPutRequest.java:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.state.InternalStateFuture;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * The Bunch Put access request for ForStDB.
+ *
+ * @param  The type of key in original state access request.
+ */
+public class ForStDBBunchPutRequest extends 
ForStDBPutRequest, Map> {

Review Comment:
   I moved the logic of interacting with the `db` to the `ForStDBPutRequest` 
and `ForStDBBunchPutRequest`, to avoid the classifications in 
`ForStWriteBatchOperation`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35525] Add all hdfs delegation tokens to Yarn AM [flink]

2024-06-06 Thread via GitHub


wForget commented on code in PR #24891:
URL: https://github.com/apache/flink/pull/24891#discussion_r1628934380


##
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##
@@ -1374,6 +1374,13 @@ private void setTokensFor(ContainerLaunchContext 
containerLaunchContext, boolean
 for (Map.Entry e : 
container.getTokens().entrySet()) {
 if (e.getKey().equals("hadoopfs")) {
 
credentials.addAll(HadoopDelegationTokenConverter.deserialize(e.getValue()));
+} else {
+// Add hdfs tokens, maybe fetched by custom 
DelegationTokenProvider
+Credentials fetchedCredentials =
+
HadoopDelegationTokenConverter.deserialize(e.getValue());

Review Comment:
   Thanks @gaborgsomogyi , I added a 
`yarn.security.appmaster.delegation.token.services` configuration to set token 
services we need for yarn am, and added unit test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35049][state] Implement Map Async State API for ForStStateBackend [flink]

2024-06-06 Thread via GitHub


fredia commented on code in PR #24812:
URL: https://github.com/apache/flink/pull/24812#discussion_r1628938515


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBBunchPutRequest.java:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.state.InternalStateFuture;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * The Bunch Put access request for ForStDB.
+ *
+ * @param  The type of key in original state access request.
+ */
+public class ForStDBBunchPutRequest extends 
ForStDBPutRequest, Map> {
+
+/** Serializer for the user values. */
+final TypeSerializer userValueSerializer;
+
+/** The data outputStream used for value serializer, which should be 
thread-safe. */
+final ThreadLocal valueSerializerView;
+
+/** The data inputStream used for value deserializer, which should be 
thread-safe. */
+final ThreadLocal valueDeserializerView;
+
+public ForStDBBunchPutRequest(
+ContextKey key, Map value, ForStMapState table, 
InternalStateFuture future) {
+super(key, value, table, future);
+Preconditions.checkArgument(table instanceof ForStMapState);
+this.userValueSerializer = table.userValueSerializer;
+this.valueSerializerView = table.valueSerializerView;
+this.valueDeserializerView = table.valueDeserializerView;
+}
+
+public Map getBunchValue() {
+return value;
+}
+
+@Override
+public byte[] buildSerializedKey() throws IOException {
+key.resetExtra();
+return table.serializeKey(key);
+}
+
+public byte[] buildSerializedKey(Object userKey) throws IOException {
+key.setUserKey(userKey);
+return table.serializeKey(key);
+}
+
+public byte[] buildSerializedValue(Object singleValue) throws IOException {
+DataOutputSerializer outputView = valueSerializerView.get();
+outputView.clear();
+userValueSerializer.serialize(singleValue, outputView);
+return outputView.getCopyOfBuffer();
+}
+
+/**
+ * Find the next byte array that is lexicographically larger than input 
byte array.
+ *
+ * @param bytes the input byte array.
+ * @return the next byte array.
+ */
+public static byte[] nextBytes(byte[] bytes) {
+Preconditions.checkState(bytes != null && bytes.length > 0);
+int len = bytes.length;
+byte[] nextBytes = new byte[len];
+System.arraycopy(bytes, 0, nextBytes, 0, len);
+boolean find = false;
+for (int i = len - 1; i >= 0; i--) {
+byte currentByte = ++nextBytes[i];
+if (currentByte != Byte.MIN_VALUE) {
+find = true;
+break;
+}
+}
+if (!find) {
+byte[] newBytes = new byte[len + 1];
+System.arraycopy(bytes, 0, newBytes, 0, len);
+newBytes[len] = 1;
+return newBytes;
+}

Review Comment:
   Thanks for the suggestion, after reading the code in rocksdb, I think 
`newBytes[len] = Byte.MIN_VALUE` is enough.
   ```C++
   inline int Slice::compare(const Slice& b) const {
 assert(data_ != nullptr && b.data_ != nullptr);
 const size_t min_len = (size_ < b.size_) ? size_ : b.size_;
 int r = memcmp(data_, b.data_, min_len);
 if (r == 0) {
   if (size_ < b.size_)
 r = -1;
   else if (size_ > b.size_)
 r = +1;
 }
 return r;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35049][state] Implement Map Async State API for ForStStateBackend [flink]

2024-06-06 Thread via GitHub


fredia commented on code in PR #24812:
URL: https://github.com/apache/flink/pull/24812#discussion_r1628941486


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java:
##
@@ -34,12 +34,35 @@ public class ForStDBGetRequest {
 
 private final K key;
 private final ForStInnerTable table;
-private final InternalStateFuture future;
+private final InternalStateFuture future;
 
-private ForStDBGetRequest(K key, ForStInnerTable table, 
InternalStateFuture future) {
+private final boolean toBoolean;
+private final boolean checkMapEmpty;
+
+private int keyGroupPrefixBytes = 1;
+
+private ForStDBGetRequest(
+K key,
+ForStInnerTable table,
+InternalStateFuture future,
+boolean toBoolean,
+boolean checkMapEmpty) {
 this.key = key;
 this.table = table;
 this.future = future;
+this.toBoolean = toBoolean;

Review Comment:
   👍,I added `ForStDBMapCheckRequest` for this kind of operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-06-06 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852668#comment-17852668
 ] 

David Morávek commented on FLINK-35035:
---

Yes, we have this on our radar. [~mapohl] is currently looking into this 
direction. The main idea is to move resource stabilization timeout from 
WaitingForResources into Executing state.

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35415][base] Fix compatibility with Flink 1.19 [flink-cdc]

2024-06-06 Thread via GitHub


leonardBang merged PR #3348:
URL: https://github.com/apache/flink-cdc/pull/3348


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35092) Add integrated test for Doris / Starrocks sink pipeline connector

2024-06-06 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852669#comment-17852669
 ] 

Leonard Xu commented on FLINK-35092:


master: 1a360635c7ac936106e78d87867f96ed3f64e04b

> Add integrated test for Doris / Starrocks sink pipeline connector
> -
>
> Key: FLINK-35092
> URL: https://issues.apache.org/jira/browse/FLINK-35092
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: yux
>Assignee: yux
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> Currently, no integrated test are being applied to Doris pipeline connector 
> (there's only one DorisRowConverterTest case for now). Adding ITcases would 
> improving Doris connector's code quality and reliability.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35415) CDC Fails to create sink with Flink 1.19

2024-06-06 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852672#comment-17852672
 ] 

Leonard Xu commented on FLINK-35415:


master: 9a9603413c7c6d7094a30f8c8e676b65b0d3b9ba

> CDC Fails to create sink with Flink 1.19
> 
>
> Key: FLINK-35415
> URL: https://issues.apache.org/jira/browse/FLINK-35415
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: yux
>Assignee: yux
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> Currently, Flink CDC doesn't work with Flink 1.19 with the following 
> exception:
> Exception in thread "main" java.lang.NoSuchMethodError: 'void 
> org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory.(org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink,
>  boolean, boolean)'
> The reason is Flink CDC uses Flink @Internal API and it was changed in 1.19 
> update.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35316) Add CDC e2e test case for on Flink 1.19

2024-06-06 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852671#comment-17852671
 ] 

Leonard Xu commented on FLINK-35316:


master: d8a9c8c63ef5fb958a72c3a15e47ee29d72a481e

> Add CDC e2e test case for on Flink 1.19
> ---
>
> Key: FLINK-35316
> URL: https://issues.apache.org/jira/browse/FLINK-35316
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: yux
>Priority: Minor
>  Labels: pull-request-available
>
> Since Flink 1.19 has been generally available, Flink CDC is expected to be 
> used with it. E2e test cases should cover this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35538) pipeline sink support caseSensitive

2024-06-06 Thread melin (Jira)
melin created FLINK-35538:
-

 Summary: pipeline sink support caseSensitive
 Key: FLINK-35538
 URL: https://issues.apache.org/jira/browse/FLINK-35538
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.2.0, cdc-3.1.1
Reporter: melin
 Attachments: image-2024-06-06-15-51-02-428.png

source is case sensitive, sink is case insensitive. Even paimon doesn't allow 
capital letters

!image-2024-06-06-15-51-02-428.png!

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35120) Add Doris Pipeline connector integration test cases

2024-06-06 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852670#comment-17852670
 ] 

Leonard Xu commented on FLINK-35120:


master: 8eb7e53965e59c2f09c7d86461c095133e1d8998

> Add Doris Pipeline connector integration test cases
> ---
>
> Key: FLINK-35120
> URL: https://issues.apache.org/jira/browse/FLINK-35120
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: yux
>Assignee: yux
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> Currently, Flink CDC Doris pipeline connector has very limited test cases 
> (which only covers row convertion). Adding an ITCase testing its data 
> pipeline and metadata applier should help improving connector's reliability.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35464) Flink CDC 3.1 breaks operator state compatiblity

2024-06-06 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852673#comment-17852673
 ] 

Leonard Xu commented on FLINK-35464:


master: 414720d1c5e9126ca557bf59389e9fb1b460c998

> Flink CDC 3.1 breaks operator state compatiblity
> 
>
> Key: FLINK-35464
> URL: https://issues.apache.org/jira/browse/FLINK-35464
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: yux
>Assignee: yux
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: cdc-3.1.1
>
>
> Flink CDC 3.1 changed how SchemaRegistry [de]serializes state data, which 
> causes any checkpoint states saved with earlier version could not be restored 
> in version 3.1.0.
> This could be resolved by adding serialization versioning control logic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [BP-3.1.1][FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink (#3233) [flink-cdc]

2024-06-06 Thread via GitHub


loserwang1024 commented on code in PR #3387:
URL: https://github.com/apache/flink-cdc/pull/3387#discussion_r1628958303


##
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslatorTest.java:
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer.flink.translator;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+
+import org.junit.Assert;

Review Comment:
   Thanks, i will also modified it in master



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35049][state] Implement Map Async State API for ForStStateBackend [flink]

2024-06-06 Thread via GitHub


fredia commented on code in PR #24812:
URL: https://github.com/apache/flink/pull/24812#discussion_r1628938515


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBBunchPutRequest.java:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.state.InternalStateFuture;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * The Bunch Put access request for ForStDB.
+ *
+ * @param  The type of key in original state access request.
+ */
+public class ForStDBBunchPutRequest extends 
ForStDBPutRequest, Map> {
+
+/** Serializer for the user values. */
+final TypeSerializer userValueSerializer;
+
+/** The data outputStream used for value serializer, which should be 
thread-safe. */
+final ThreadLocal valueSerializerView;
+
+/** The data inputStream used for value deserializer, which should be 
thread-safe. */
+final ThreadLocal valueDeserializerView;
+
+public ForStDBBunchPutRequest(
+ContextKey key, Map value, ForStMapState table, 
InternalStateFuture future) {
+super(key, value, table, future);
+Preconditions.checkArgument(table instanceof ForStMapState);
+this.userValueSerializer = table.userValueSerializer;
+this.valueSerializerView = table.valueSerializerView;
+this.valueDeserializerView = table.valueDeserializerView;
+}
+
+public Map getBunchValue() {
+return value;
+}
+
+@Override
+public byte[] buildSerializedKey() throws IOException {
+key.resetExtra();
+return table.serializeKey(key);
+}
+
+public byte[] buildSerializedKey(Object userKey) throws IOException {
+key.setUserKey(userKey);
+return table.serializeKey(key);
+}
+
+public byte[] buildSerializedValue(Object singleValue) throws IOException {
+DataOutputSerializer outputView = valueSerializerView.get();
+outputView.clear();
+userValueSerializer.serialize(singleValue, outputView);
+return outputView.getCopyOfBuffer();
+}
+
+/**
+ * Find the next byte array that is lexicographically larger than input 
byte array.
+ *
+ * @param bytes the input byte array.
+ * @return the next byte array.
+ */
+public static byte[] nextBytes(byte[] bytes) {
+Preconditions.checkState(bytes != null && bytes.length > 0);
+int len = bytes.length;
+byte[] nextBytes = new byte[len];
+System.arraycopy(bytes, 0, nextBytes, 0, len);
+boolean find = false;
+for (int i = len - 1; i >= 0; i--) {
+byte currentByte = ++nextBytes[i];
+if (currentByte != Byte.MIN_VALUE) {
+find = true;
+break;
+}
+}
+if (!find) {
+byte[] newBytes = new byte[len + 1];
+System.arraycopy(bytes, 0, newBytes, 0, len);
+newBytes[len] = 1;
+return newBytes;
+}

Review Comment:
   Thanks for the suggestion, after reading the code in rocksdb, I think 
`newBytes[len] = Byte.MAX_VALUE` is enough.
   ```C++
   inline int Slice::compare(const Slice& b) const {
 assert(data_ != nullptr && b.data_ != nullptr);
 const size_t min_len = (size_ < b.size_) ? size_ : b.size_;
 int r = memcmp(data_, b.data_, min_len);
 if (r == 0) {
   if (size_ < b.size_)
 r = -1;
   else if (size_ > b.size_)
 r = +1;
 }
 return r;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35539) The artifactId of flink-migration-test-utils module has a typo

2024-06-06 Thread Zhen Wang (Jira)
Zhen Wang created FLINK-35539:
-

 Summary: The artifactId of  flink-migration-test-utils module has 
a typo
 Key: FLINK-35539
 URL: https://issues.apache.org/jira/browse/FLINK-35539
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.20.0
Reporter: Zhen Wang


The artifactId of  flink-migration-test-utils module has a typo.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35049][state] Implement Map Async State API for ForStStateBackend [flink]

2024-06-06 Thread via GitHub


fredia commented on code in PR #24812:
URL: https://github.com/apache/flink/pull/24812#discussion_r1628979026


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java:
##
@@ -55,22 +54,44 @@ public class ForStWriteBatchOperation implements 
ForStDBOperation {
 public CompletableFuture process() {
 return CompletableFuture.runAsync(
 () -> {
-try (WriteBatch writeBatch =
-new WriteBatch(batchRequest.size() * 
PER_RECORD_ESTIMATE_BYTES)) {
+try (ForStDBWriteBatchWrapper writeBatch =
+new ForStDBWriteBatchWrapper(db, writeOptions, 
batchRequest.size())) {
 for (ForStDBPutRequest request : batchRequest) {
+ColumnFamilyHandle cf = 
request.getColumnFamilyHandle();
 if (request.valueIsNull()) {
-// put(key, null) == delete(key)
-writeBatch.delete(
-request.getColumnFamilyHandle(),
-request.buildSerializedKey());
+if (request instanceof ForStDBBunchPutRequest) 
{
+ForStDBBunchPutRequest bunchPutRequest =
+(ForStDBBunchPutRequest) 
request;
+byte[] primaryKey = 
bunchPutRequest.buildSerializedKey(null);

Review Comment:
   For map state, there are two types of key: primary key and user key.
   Logically it is organized like this:
   ``
   
   We concatenate primary key and user key together as the key of rocksdb,  and 
physically organized like this:
   ```<  , user value> ```
   
   For `Bunch remove`, we will remove all user keys under one primary key, so 
we use the prefix to match all keys.
   For `Single remove`, we use the bytes of <`primary key`-`user key`> to match 
one specific key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35539] Fix artifactId of flink-migration-test-utils module [flink]

2024-06-06 Thread via GitHub


wForget opened a new pull request, #24901:
URL: https://github.com/apache/flink/pull/24901

   
   
   ## What is the purpose of the change
   
   fix typo
   
   
   ## Brief change log
   
   ## Verifying this change
   
   Flink CI
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive):no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35539) The artifactId of flink-migration-test-utils module has a typo

2024-06-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35539:
---
Labels: pull-request-available  (was: )

> The artifactId of  flink-migration-test-utils module has a typo
> ---
>
> Key: FLINK-35539
> URL: https://issues.apache.org/jira/browse/FLINK-35539
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.20.0
>Reporter: Zhen Wang
>Priority: Major
>  Labels: pull-request-available
>
> The artifactId of  flink-migration-test-utils module has a typo.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35522][runtime] Fix the issue that the source task may get stuck in speculative execution mode. [flink]

2024-06-06 Thread via GitHub


zhuzhurk commented on PR #24899:
URL: https://github.com/apache/flink/pull/24899#issuecomment-2151672311

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35049][state] Implement Map Async State API for ForStStateBackend [flink]

2024-06-06 Thread via GitHub


fredia commented on code in PR #24812:
URL: https://github.com/apache/flink/pull/24812#discussion_r1628988650


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java:
##
@@ -55,22 +54,44 @@ public class ForStWriteBatchOperation implements 
ForStDBOperation {
 public CompletableFuture process() {
 return CompletableFuture.runAsync(
 () -> {
-try (WriteBatch writeBatch =
-new WriteBatch(batchRequest.size() * 
PER_RECORD_ESTIMATE_BYTES)) {
+try (ForStDBWriteBatchWrapper writeBatch =
+new ForStDBWriteBatchWrapper(db, writeOptions, 
batchRequest.size())) {
 for (ForStDBPutRequest request : batchRequest) {
+ColumnFamilyHandle cf = 
request.getColumnFamilyHandle();
 if (request.valueIsNull()) {
-// put(key, null) == delete(key)
-writeBatch.delete(
-request.getColumnFamilyHandle(),
-request.buildSerializedKey());
+if (request instanceof ForStDBBunchPutRequest) 
{
+ForStDBBunchPutRequest bunchPutRequest =
+(ForStDBBunchPutRequest) 
request;
+byte[] primaryKey = 
bunchPutRequest.buildSerializedKey(null);
+byte[] endKey = 
ForStDBBunchPutRequest.nextBytes(primaryKey);

Review Comment:
   We concatenate primary key and user key together as the key of rocksdb, and 
physically organized like this:
   ```
   <  , user value1> 
   <  , user value2> 
   <  , user value3> 
   <  , user value4> 
   ```
   Here, we use primary key as a prefix to calculate an interval to leverage 
`deleteRange`.
   
   But too much `deleteRange` may affect the read performance, and we will 
consider changing it iterator.delete later.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35049][state] Implement Map Async State API for ForStStateBackend [flink]

2024-06-06 Thread via GitHub


fredia commented on code in PR #24812:
URL: https://github.com/apache/flink/pull/24812#discussion_r1628990950


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java:
##
@@ -34,12 +34,35 @@ public class ForStDBGetRequest {
 
 private final K key;
 private final ForStInnerTable table;
-private final InternalStateFuture future;
+private final InternalStateFuture future;
 
-private ForStDBGetRequest(K key, ForStInnerTable table, 
InternalStateFuture future) {
+private final boolean toBoolean;

Review Comment:
   👍I added `ForStDBMapCheckRequest` for this kind of operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35539] Fix artifactId of flink-migration-test-utils module [flink]

2024-06-06 Thread via GitHub


flinkbot commented on PR #24901:
URL: https://github.com/apache/flink/pull/24901#issuecomment-2151681134

   
   ## CI report:
   
   * 50ae68d1e07a382fbcd3f635270f1855b7755237 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35149][hotfix][cdc-composor] adjust test of cdc composer from junit4 to junit5. [flink-cdc]

2024-06-06 Thread via GitHub


loserwang1024 commented on PR #3394:
URL: https://github.com/apache/flink-cdc/pull/3394#issuecomment-2151681364

   @Jiabao-Sun , CC


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35049][state] Implement Map Async State API for ForStStateBackend [flink]

2024-06-06 Thread via GitHub


fredia commented on code in PR #24812:
URL: https://github.com/apache/flink/pull/24812#discussion_r1628994002


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java:
##
@@ -34,23 +35,33 @@
  */
 public class ForStDBPutRequest {
 
-private final K key;
-@Nullable private final V value;
-private final ForStInnerTable table;
-private final InternalStateFuture future;
+protected final K key;
 
-private ForStDBPutRequest(
+@Nullable protected final V value;
+
+protected final ForStInnerTable table;
+
+protected final InternalStateFuture future;
+
+protected final boolean tableIsMap;
+
+protected ForStDBPutRequest(
 K key, V value, ForStInnerTable table, 
InternalStateFuture future) {
 this.key = key;
 this.value = value;
 this.table = table;
 this.future = future;
+this.tableIsMap = table instanceof ForStMapState;
 }
 
 public boolean valueIsNull() {
 return value == null;
 }
 
+public boolean valueIsMap() {

Review Comment:
   I moved the logic of interacting with the db to the `ForStDBPutRequest` and 
`ForStDBBunchPutRequest`, `valueIsMap` is deleted now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35517) CI pipeline triggered by pull request seems unstable

2024-06-06 Thread Jing Ge (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851994#comment-17851994
 ] 

Jing Ge edited comment on FLINK-35517 at 6/6/24 8:25 AM:
-

It should work again. Thanks [~chesnay] for your support! [~Weijie Guo] please 
let me know if you still have any issues. Thanks!


was (Author: jingge):
It should work again. [~Weijie Guo] please let me know if you still have any 
issues. Thanks!

> CI pipeline triggered by pull request seems unstable
> 
>
> Key: FLINK-35517
> URL: https://issues.apache.org/jira/browse/FLINK-35517
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Assignee: Jing Ge
>Priority: Blocker
> Fix For: 1.20.0
>
>
> Flink CI pipeline triggered by pull request seems sort of unstable. 
> For example, https://github.com/apache/flink/pull/24883 was filed 15 hours 
> ago, but CI report is UNKNOWN.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35049][state] Implement Map Async State API for ForStStateBackend [flink]

2024-06-06 Thread via GitHub


fredia commented on code in PR #24812:
URL: https://github.com/apache/flink/pull/24812#discussion_r1629027426


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java:
##
@@ -55,22 +54,44 @@ public class ForStWriteBatchOperation implements 
ForStDBOperation {
 public CompletableFuture process() {
 return CompletableFuture.runAsync(
 () -> {
-try (WriteBatch writeBatch =
-new WriteBatch(batchRequest.size() * 
PER_RECORD_ESTIMATE_BYTES)) {
+try (ForStDBWriteBatchWrapper writeBatch =
+new ForStDBWriteBatchWrapper(db, writeOptions, 
batchRequest.size())) {
 for (ForStDBPutRequest request : batchRequest) {
+ColumnFamilyHandle cf = 
request.getColumnFamilyHandle();
 if (request.valueIsNull()) {
-// put(key, null) == delete(key)
-writeBatch.delete(
-request.getColumnFamilyHandle(),
-request.buildSerializedKey());
+if (request instanceof ForStDBBunchPutRequest) 
{
+ForStDBBunchPutRequest bunchPutRequest =
+(ForStDBBunchPutRequest) 
request;
+byte[] primaryKey = 
bunchPutRequest.buildSerializedKey(null);
+byte[] endKey = 
ForStDBBunchPutRequest.nextBytes(primaryKey);

Review Comment:
   I found that `deleteRange` may be ambiguous in some cases, such as when the 
primary key is an array of `Byte. MAX_VALUE `, so I changed it to iterator and 
delete one by one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-06-06 Thread via GitHub


snuyanzin commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1629030390


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayIntersectFunction.java:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider;
+import org.apache.flink.table.runtime.util.ObjectContainer;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_INTERSECT}. */
+@Internal
+public class ArrayIntersectFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+private final EqualityAndHashcodeProvider equalityAndHashcodeProvider;
+
+public ArrayIntersectFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.ARRAY_INTERSECT, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType()
+.toInternal();
+elementGetter = 
ArrayData.createElementGetter(dataType.toInternal().getLogicalType());
+this.equalityAndHashcodeProvider = new 
EqualityAndHashcodeProvider(context, dataType);
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+equalityAndHashcodeProvider.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) {
+try {
+if (arrayOne == null || arrayTwo == null) {
+return null;
+}
+
+Set set = new HashSet<>();
+for (int pos = 0; pos < arrayTwo.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayTwo, pos);
+final ObjectContainer objectContainer = 
createObjectContainer(element);
+set.add(objectContainer);
+}
+
+Set res = new LinkedHashSet<>();

Review Comment:
   nit:
   do we really need `LinkedHashSet` here?
   I guess we could use `ArrayList` to achieve same result which more space 
efficient
   like if a record present in `set` then add it to array list otherwise add it 
to `set`
   WDYT? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35353][docs-zh]Translate "Profiler" page into Chinese [flink]

2024-06-06 Thread via GitHub


drymatini commented on PR #24822:
URL: https://github.com/apache/flink/pull/24822#issuecomment-2151726109

   Hi @Myasuka ,thank you for the comments, I have made changes to solve the 
problems, please check again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-06-06 Thread via GitHub


snuyanzin commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1629032823


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayIntersectFunction.java:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider;
+import org.apache.flink.table.runtime.util.ObjectContainer;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_INTERSECT}. */
+@Internal
+public class ArrayIntersectFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+private final EqualityAndHashcodeProvider equalityAndHashcodeProvider;
+
+public ArrayIntersectFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.ARRAY_INTERSECT, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType()
+.toInternal();
+elementGetter = 
ArrayData.createElementGetter(dataType.toInternal().getLogicalType());
+this.equalityAndHashcodeProvider = new 
EqualityAndHashcodeProvider(context, dataType);
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+equalityAndHashcodeProvider.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) {
+try {
+if (arrayOne == null || arrayTwo == null) {
+return null;
+}
+
+Set set = new HashSet<>();
+for (int pos = 0; pos < arrayTwo.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayTwo, pos);
+final ObjectContainer objectContainer = 
createObjectContainer(element);
+set.add(objectContainer);
+}
+
+Set res = new LinkedHashSet<>();
+for (int pos = 0; pos < arrayOne.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayOne, pos);
+final ObjectContainer objectContainer = 
createObjectContainer(element);
+if (set.contains(objectContainer)) {
+res.add(objectContainer);
+}
+}
+
+return new GenericArrayData(
+res.stream()
+.map(element -> element != null ? 
element.getObject() : null)
+.toArray());

Review Comment:
   Is there anything blocking from usage of just `toArray` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-06-06 Thread via GitHub


snuyanzin commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1629035418


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/ObjectContainer.java:
##
@@ -31,21 +31,25 @@
 @Internal
 public class ObjectContainer {
 
-private final Object o;
+private final Object object;

Review Comment:
   I guess we do not need it if we use just ArrayList as mentioned above
   
   Or did I miss anything?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35537) Error parsing list of enum in legacy yaml configuration

2024-06-06 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852688#comment-17852688
 ] 

Zakelly Lan commented on FLINK-35537:
-

I have confirmed that the problem comes from the 
{{{}EmbeddedRocksDBStateBackend#mergeConfigurableOptions{}}}, which merges two 
configuration map into one. It uses {{toString}} of each already parsed value 
and insert the value string into a new raw map. The {{toString}} gives the 
string format of enum list, not the legacy form of the value. The new raw map 
get parsed later, that's when the error happens.

> Error parsing list of enum in legacy yaml configuration 
> 
>
> Key: FLINK-35537
> URL: https://issues.apache.org/jira/browse/FLINK-35537
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration
>Affects Versions: 1.19.0
>Reporter: Zakelly Lan
>Priority: Major
>
> In flink 1.9.0, when I submit a job to a standalone cluster, the TM throws
> {code:java}
> Caused by: java.lang.IllegalArgumentException: Could not parse value 
> '[NO_COMPRESSION]' for key 'state.backend.rocksdb.compression.per.level'.
>   at 
> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:827)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.internalGetOption(RocksDBResourceContainer.java:312)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.setColumnFamilyOptionsFromConfigurableOptions(RocksDBResourceContainer.java:361)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.getColumnOptions(RocksDBResourceContainer.java:181)
>   at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.lambda$createKeyedStateBackend$0(EmbeddedRocksDBStateBackend.java:449)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyOptions(RocksDBOperationUtils.java:219)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.loadDb(RocksDBHandle.java:138)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.openDB(RocksDBHandle.java:113)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation.restore(RocksDBNoneRestoreOperation.java:62)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:333)
>   ... 19 more
> Caused by: java.lang.IllegalArgumentException: Could not parse value for enum 
> class org.rocksdb.CompressionType. Expected one of: [[NO_COMPRESSION, 
> SNAPPY_COMPRESSION, ZLIB_COMPRESSION, BZLIB2_COMPRESSION, LZ4_COMPRESSION, 
> LZ4HC_COMPRESSION, XPRESS_COMPRESSION, ZSTD_COMPRESSION, 
> DISABLE_COMPRESSION_OPTION]]
>   at 
> org.apache.flink.configuration.ConfigurationUtils.lambda$convertToEnum$12(ConfigurationUtils.java:502)
>   at java.util.Optional.orElseThrow(Optional.java:290)
>   at 
> org.apache.flink.configuration.ConfigurationUtils.convertToEnum(ConfigurationUtils.java:499)
>   at 
> org.apache.flink.configuration.ConfigurationUtils.convertValue(ConfigurationUtils.java:392)
>   at 
> org.apache.flink.configuration.ConfigurationUtils.lambda$convertToListWithLegacyProperties$4(ConfigurationUtils.java:440)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>   at 
> org.apache.flink.configuration.ConfigurationUtils.convertToListWithLegacyProperties(ConfigurationUtils.java:441)
>   at 
> org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:432)
>   at 
> org.apache.flink.configuration.Configuration.lambda$getOptional$3(Configuration.java:819)
>   at java.util.Optional.map(Optional.java:215)
>   at 
> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:819)
>   ... 28 more
> {code}
> I configured 'state.backend.rocksdb.compression.per.level: NO_COMPRESSION' in 
> flink-conf.yaml. I also tried the flink-1.18.1, and it runs well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35540) flink-cdc-pipeline-connector-mysql lost table which database and table with the same name

2024-06-06 Thread Qigeng Lin (Jira)
Qigeng Lin created FLINK-35540:
--

 Summary: flink-cdc-pipeline-connector-mysql lost table which 
database and table with the same name
 Key: FLINK-35540
 URL: https://issues.apache.org/jira/browse/FLINK-35540
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: Qigeng Lin


h1. Description
When the parameter of 'tables' in mysql pipeline job contains a table which 
database and table are with the same name like 'app.app', the job will fail and 
the error meaasge is like:
{code:java}
java.lang.IllegalArgumentException: Cannot find any table by the option 
'tables' = app.app {code}
h1. How to reproduce

Create database and table all named `{{{}app`{}}}, then submit a pipeline job 
like this YAML defined:
{code:java}
source:
  type: mysql
  hostname: localhost
  port: 3306
  username: root
  password: 123456
  tables: app.app
  server-id: 5400-5404
  server-time-zone: UTCsink:
  type: doris
  fenodes: 127.0.0.1:8030
  username: root
  password: ""
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1pipeline:
  name: Sync MySQL Database to Doris
  parallelism: 2 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35540) flink-cdc-pipeline-connector-mysql lost table which database and table with the same name

2024-06-06 Thread Qigeng Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qigeng Lin updated FLINK-35540:
---
Description: 
h1. Description

When the parameter of 'tables' in mysql pipeline job contains a table which 
database and table are with the same name like 'app.app', the job will fail and 
the error meaasge is like:
{code:java}
java.lang.IllegalArgumentException: Cannot find any table by the option 
'tables' = app.app {code}
h1. How to reproduce

Create database and table all named `{{{}app`{}}}, then submit a pipeline job 
like this YAML defined:
{code:java}
source:
  type: mysql
  hostname: localhost
  port: 3306
  username: root
  password: 123456
  tables: app.app
  server-id: 5400-5404
  server-time-zone: UTC
sink:
  type: doris
  fenodes: 127.0.0.1:8030
  username: root
  password: ""
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1pipeline:
  name: Sync MySQL Database to Doris
  parallelism: 2 {code}

  was:
h1. Description
When the parameter of 'tables' in mysql pipeline job contains a table which 
database and table are with the same name like 'app.app', the job will fail and 
the error meaasge is like:
{code:java}
java.lang.IllegalArgumentException: Cannot find any table by the option 
'tables' = app.app {code}
h1. How to reproduce

Create database and table all named `{{{}app`{}}}, then submit a pipeline job 
like this YAML defined:
{code:java}
source:
  type: mysql
  hostname: localhost
  port: 3306
  username: root
  password: 123456
  tables: app.app
  server-id: 5400-5404
  server-time-zone: UTCsink:
  type: doris
  fenodes: 127.0.0.1:8030
  username: root
  password: ""
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1pipeline:
  name: Sync MySQL Database to Doris
  parallelism: 2 {code}


> flink-cdc-pipeline-connector-mysql lost table which database and table with 
> the same name
> -
>
> Key: FLINK-35540
> URL: https://issues.apache.org/jira/browse/FLINK-35540
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Qigeng Lin
>Priority: Major
>
> h1. Description
> When the parameter of 'tables' in mysql pipeline job contains a table which 
> database and table are with the same name like 'app.app', the job will fail 
> and the error meaasge is like:
> {code:java}
> java.lang.IllegalArgumentException: Cannot find any table by the option 
> 'tables' = app.app {code}
> h1. How to reproduce
> Create database and table all named `{{{}app`{}}}, then submit a pipeline job 
> like this YAML defined:
> {code:java}
> source:
>   type: mysql
>   hostname: localhost
>   port: 3306
>   username: root
>   password: 123456
>   tables: app.app
>   server-id: 5400-5404
>   server-time-zone: UTC
> sink:
>   type: doris
>   fenodes: 127.0.0.1:8030
>   username: root
>   password: ""
>   table.create.properties.light_schema_change: true
>   table.create.properties.replication_num: 1pipeline:
>   name: Sync MySQL Database to Doris
>   parallelism: 2 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35537) Error when setting state.backend.rocksdb.compression.per.level

2024-06-06 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan updated FLINK-35537:

Summary: Error when setting state.backend.rocksdb.compression.per.level  
(was: Error parsing list of enum in legacy yaml configuration )

> Error when setting state.backend.rocksdb.compression.per.level
> --
>
> Key: FLINK-35537
> URL: https://issues.apache.org/jira/browse/FLINK-35537
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration
>Affects Versions: 1.19.0
>Reporter: Zakelly Lan
>Priority: Major
>
> In flink 1.9.0, when I submit a job to a standalone cluster, the TM throws
> {code:java}
> Caused by: java.lang.IllegalArgumentException: Could not parse value 
> '[NO_COMPRESSION]' for key 'state.backend.rocksdb.compression.per.level'.
>   at 
> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:827)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.internalGetOption(RocksDBResourceContainer.java:312)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.setColumnFamilyOptionsFromConfigurableOptions(RocksDBResourceContainer.java:361)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.getColumnOptions(RocksDBResourceContainer.java:181)
>   at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.lambda$createKeyedStateBackend$0(EmbeddedRocksDBStateBackend.java:449)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyOptions(RocksDBOperationUtils.java:219)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.loadDb(RocksDBHandle.java:138)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.openDB(RocksDBHandle.java:113)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation.restore(RocksDBNoneRestoreOperation.java:62)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:333)
>   ... 19 more
> Caused by: java.lang.IllegalArgumentException: Could not parse value for enum 
> class org.rocksdb.CompressionType. Expected one of: [[NO_COMPRESSION, 
> SNAPPY_COMPRESSION, ZLIB_COMPRESSION, BZLIB2_COMPRESSION, LZ4_COMPRESSION, 
> LZ4HC_COMPRESSION, XPRESS_COMPRESSION, ZSTD_COMPRESSION, 
> DISABLE_COMPRESSION_OPTION]]
>   at 
> org.apache.flink.configuration.ConfigurationUtils.lambda$convertToEnum$12(ConfigurationUtils.java:502)
>   at java.util.Optional.orElseThrow(Optional.java:290)
>   at 
> org.apache.flink.configuration.ConfigurationUtils.convertToEnum(ConfigurationUtils.java:499)
>   at 
> org.apache.flink.configuration.ConfigurationUtils.convertValue(ConfigurationUtils.java:392)
>   at 
> org.apache.flink.configuration.ConfigurationUtils.lambda$convertToListWithLegacyProperties$4(ConfigurationUtils.java:440)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>   at 
> org.apache.flink.configuration.ConfigurationUtils.convertToListWithLegacyProperties(ConfigurationUtils.java:441)
>   at 
> org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:432)
>   at 
> org.apache.flink.configuration.Configuration.lambda$getOptional$3(Configuration.java:819)
>   at java.util.Optional.map(Optional.java:215)
>   at 
> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:819)
>   ... 28 more
> {code}
> I configured 'state.backend.rocksdb.compression.per.level: NO_COMPRESSION' in 
> flink-conf.yaml. I also tried the flink-1.18.1, and it runs well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35522][runtime] Fix the issue that the source task may get stuck in speculative execution mode. [flink]

2024-06-06 Thread via GitHub


zhuzhurk commented on PR #24899:
URL: https://github.com/apache/flink/pull/24899#issuecomment-2151759062

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35525] Add a token services configuration to allow obtained token to be passed to Yarn AM [flink]

2024-06-06 Thread via GitHub


gaborgsomogyi commented on PR #24891:
URL: https://github.com/apache/flink/pull/24891#issuecomment-2151764327

   Unless there are further comments I'm intended to merge this next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35525) HDFS delegation token fetched by custom DelegationTokenProvider is not passed to Yarn AM

2024-06-06 Thread Gabor Somogyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi reassigned FLINK-35525:
-

Assignee: Zhen Wang

> HDFS  delegation token fetched by custom DelegationTokenProvider is not 
> passed to Yarn AM
> -
>
> Key: FLINK-35525
> URL: https://issues.apache.org/jira/browse/FLINK-35525
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Zhen Wang
>Assignee: Zhen Wang
>Priority: Major
>  Labels: pull-request-available
>
> I tried running flink with hadoop proxy user by disabling HadoopModuleFactory 
> and flink built-in token providers, and implementing a custom token provider.
> However, only the hdfs token obtained by hadoopfs provider was added in 
> YarnClusterDescriptor, which resulted in Yarn AM submission failure.
> Discussion: https://github.com/apache/flink/pull/22009#issuecomment-2132676114



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35537] Fix exception when setting 'state.backend.rocksdb.compression.per.level' in yaml [flink]

2024-06-06 Thread via GitHub


Zakelly opened a new pull request, #24902:
URL: https://github.com/apache/flink/pull/24902

   ## What is the purpose of the change
   
   See FLINK-35537, an exception thrown when setting 
'state.backend.rocksdb.compression.per.level' in flink 1.19.0. I have confirmed 
that the problem comes from the 
`EmbeddedRocksDBStateBackend#mergeConfigurableOptions`, which merges two 
configuration map into one. It uses `toString` of each already parsed value and 
insert the value string into a new raw map. The `toString` gives the string 
format of enum list, not the legacy form of the value. The new raw map get 
parsed later, that's when the error happens.
   
   This PR fixes `EmbeddedRocksDBStateBackend#mergeConfigurableOptions`.
   
   ## Brief change log
   
- `EmbeddedRocksDBStateBackend#mergeConfigurableOptions`.
- Added test `testConfigureRocksDBCompressionPerLevel`
   
   
   ## Verifying this change
   
   Newly added test `testConfigureRocksDBCompressionPerLevel`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35537) Error when setting state.backend.rocksdb.compression.per.level

2024-06-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35537:
---
Labels: pull-request-available  (was: )

> Error when setting state.backend.rocksdb.compression.per.level
> --
>
> Key: FLINK-35537
> URL: https://issues.apache.org/jira/browse/FLINK-35537
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration
>Affects Versions: 1.19.0
>Reporter: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
>
> In flink 1.9.0, when I submit a job to a standalone cluster, the TM throws
> {code:java}
> Caused by: java.lang.IllegalArgumentException: Could not parse value 
> '[NO_COMPRESSION]' for key 'state.backend.rocksdb.compression.per.level'.
>   at 
> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:827)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.internalGetOption(RocksDBResourceContainer.java:312)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.setColumnFamilyOptionsFromConfigurableOptions(RocksDBResourceContainer.java:361)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.getColumnOptions(RocksDBResourceContainer.java:181)
>   at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.lambda$createKeyedStateBackend$0(EmbeddedRocksDBStateBackend.java:449)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyOptions(RocksDBOperationUtils.java:219)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.loadDb(RocksDBHandle.java:138)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.openDB(RocksDBHandle.java:113)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation.restore(RocksDBNoneRestoreOperation.java:62)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:333)
>   ... 19 more
> Caused by: java.lang.IllegalArgumentException: Could not parse value for enum 
> class org.rocksdb.CompressionType. Expected one of: [[NO_COMPRESSION, 
> SNAPPY_COMPRESSION, ZLIB_COMPRESSION, BZLIB2_COMPRESSION, LZ4_COMPRESSION, 
> LZ4HC_COMPRESSION, XPRESS_COMPRESSION, ZSTD_COMPRESSION, 
> DISABLE_COMPRESSION_OPTION]]
>   at 
> org.apache.flink.configuration.ConfigurationUtils.lambda$convertToEnum$12(ConfigurationUtils.java:502)
>   at java.util.Optional.orElseThrow(Optional.java:290)
>   at 
> org.apache.flink.configuration.ConfigurationUtils.convertToEnum(ConfigurationUtils.java:499)
>   at 
> org.apache.flink.configuration.ConfigurationUtils.convertValue(ConfigurationUtils.java:392)
>   at 
> org.apache.flink.configuration.ConfigurationUtils.lambda$convertToListWithLegacyProperties$4(ConfigurationUtils.java:440)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>   at 
> org.apache.flink.configuration.ConfigurationUtils.convertToListWithLegacyProperties(ConfigurationUtils.java:441)
>   at 
> org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:432)
>   at 
> org.apache.flink.configuration.Configuration.lambda$getOptional$3(Configuration.java:819)
>   at java.util.Optional.map(Optional.java:215)
>   at 
> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:819)
>   ... 28 more
> {code}
> I configured 'state.backend.rocksdb.compression.per.level: NO_COMPRESSION' in 
> flink-conf.yaml. I also tried the flink-1.18.1, and it runs well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35537] Fix exception when setting 'state.backend.rocksdb.compression.per.level' in yaml [flink]

2024-06-06 Thread via GitHub


Zakelly commented on PR #24902:
URL: https://github.com/apache/flink/pull/24902#issuecomment-2151772776

   @fredia  @masteryhx  Would you please take a look? thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35371) Allow the keystore and truststore type to configured for SSL

2024-06-06 Thread Gabor Somogyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852689#comment-17852689
 ] 

Gabor Somogyi commented on FLINK-35371:
---

[~ammarm] any news on this?

> Allow the keystore and truststore type to configured for SSL
> 
>
> Key: FLINK-35371
> URL: https://issues.apache.org/jira/browse/FLINK-35371
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.19.0
>Reporter: Ammar Master
>Assignee: Ammar Master
>Priority: Minor
>  Labels: SSL
>
> Flink always creates a keystore and trustore using the [default 
> type|https://github.com/apache/flink/blob/b87ead743dca161cdae8a1fef761954d206b81fb/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java#L236]
>  defined in the JDK, which in most cases is JKS.
> {code}
> KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
> {code}
> We should add other configuration options to set the type explicitly to 
> support other custom formats, and match the options provided by other 
> applications by 
> [Spark|https://spark.apache.org/docs/latest/security.html#:~:text=the%20key%20store.-,%24%7Bns%7D.keyStoreType,-JKS]
>  and 
> [Kafka|https://kafka.apache.org/documentation/#:~:text=per%2Dbroker-,ssl.keystore.type,-The%20file%20format]
>  already. The default would continue to be specified by the JDK.
>  
> The SSLContext for the REST API can read the configuration option directly, 
> and we need to add extra logic to the 
> [CustomSSLEngineProvider|https://github.com/apache/flink/blob/master/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java]
>  for Pekko.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [BP-3.1.1][FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink (#3233) [flink-cdc]

2024-06-06 Thread via GitHub


Jiabao-Sun merged PR #3387:
URL: https://github.com/apache/flink-cdc/pull/3387


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35537] Fix exception when setting 'state.backend.rocksdb.compression.per.level' in yaml [flink]

2024-06-06 Thread via GitHub


flinkbot commented on PR #24902:
URL: https://github.com/apache/flink/pull/24902#issuecomment-2151784661

   
   ## CI report:
   
   * 3e4451a8c373e99bf99b1b9c4ecd2f8448855773 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35448] Translate pod templates documentation into Chinese [flink-kubernetes-operator]

2024-06-06 Thread via GitHub


1996fanrui commented on code in PR #830:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/830#discussion_r1629028807


##
docs/content/docs/custom-resource/pod-template.md:
##
@@ -26,21 +26,18 @@ under the License.
 
 # Pod template
 
-The operator CRD is designed to have a minimal set of direct, short-hand CRD 
settings to express the most
-basic attributes of a deployment. For all other settings the CRD provides the 
`flinkConfiguration` and
-`podTemplate` fields.
+
 
-Pod templates permit customization of the Flink job and task manager pods, for 
example to specify
-volume mounts, ephemeral storage, sidecar containers etc.
+Operator CRD 被设计为一组直接、简短的 CRD 设置,以表达部署的最基本属性。对于所有其他设置,CRD 提供了 
`flinkConfiguration` 和 `podTemplate` 字段。

Review Comment:
   ```suggestion
   Operator CRD 被设计为一组直接、简短的 CRD 设置,以表达 deployment 的最基本属性。对于所有其他设置,CRD 提供了 
`flinkConfiguration` 和 `podTemplate` 字段。
   ```
   
   We don't need to translate `deployment `.



##
docs/content/docs/custom-resource/pod-template.md:
##
@@ -26,21 +26,18 @@ under the License.
 
 # Pod template
 
-The operator CRD is designed to have a minimal set of direct, short-hand CRD 
settings to express the most
-basic attributes of a deployment. For all other settings the CRD provides the 
`flinkConfiguration` and
-`podTemplate` fields.
+
 
-Pod templates permit customization of the Flink job and task manager pods, for 
example to specify
-volume mounts, ephemeral storage, sidecar containers etc.
+Operator CRD 被设计为一组直接、简短的 CRD 设置,以表达部署的最基本属性。对于所有其他设置,CRD 提供了 
`flinkConfiguration` 和 `podTemplate` 字段。
 
-Pod templates can be layered, as shown in the example below.
-A common pod template may hold the settings that apply to both job and task 
manager,
-like `volumeMounts`. Another template under job or task manager can define 
additional settings that supplement or override those
-in the common template, such as a task manager sidecar.
+Pod templates 保证了 Flink 作业和任务管理器 pod 的自定义,例如指定卷挂载、临时存储、sidecar 容器等。
 
-The operator is going to merge the common and specific templates for job and 
task manager respectively.
+Pod template 可以被分层,如下面的示例所示。
+一个通用的 pod template 可以保存适用于作业和任务管理器的设置,比如 
`volumeMounts`。作业或任务管理器下的另一个模板可以定义补充或覆盖通用模板中的其他设置,比如一个任务管理器 sidecar。
 
-Here the full example:
+Operator 将分别合并作业和任务管理器的通用和特定模板。

Review Comment:
   Same comment as above.



##
docs/content/docs/custom-resource/pod-template.md:
##
@@ -26,21 +26,18 @@ under the License.
 
 # Pod template
 
-The operator CRD is designed to have a minimal set of direct, short-hand CRD 
settings to express the most
-basic attributes of a deployment. For all other settings the CRD provides the 
`flinkConfiguration` and
-`podTemplate` fields.
+
 
-Pod templates permit customization of the Flink job and task manager pods, for 
example to specify
-volume mounts, ephemeral storage, sidecar containers etc.
+Operator CRD 被设计为一组直接、简短的 CRD 设置,以表达部署的最基本属性。对于所有其他设置,CRD 提供了 
`flinkConfiguration` 和 `podTemplate` 字段。
 
-Pod templates can be layered, as shown in the example below.
-A common pod template may hold the settings that apply to both job and task 
manager,
-like `volumeMounts`. Another template under job or task manager can define 
additional settings that supplement or override those
-in the common template, such as a task manager sidecar.
+Pod templates 保证了 Flink 作业和任务管理器 pod 的自定义,例如指定卷挂载、临时存储、sidecar 容器等。

Review Comment:
   ```suggestion
   Pod templates 允许自定义 Flink Job 和 Task Manager 的 pod,例如指定卷挂载、临时存储、sidecar 容器等。
   ```
   
   Task Manager should be translated as well.



##
docs/content/docs/custom-resource/pod-template.md:
##
@@ -125,4 +124,4 @@ arr1: [{name: a, p2: v2}, {name: c, p2: v2}]
 merged: [{name: a, p1: v1, p2: v2}, {name: b, p1: v1}, {name: c, p2: v2}]
 ```
 
-Merging by name can we be very convenient when merging container specs or when 
the base and override templates are not defined together.
+当合并容器规范或者当基础模板和覆盖模板没有一起定义时,按名称合并可以非常方便。

Review Comment:
   ```suggestion
   当合并容器规格或者当基础模板和覆盖模板没有一起定义时,按名称合并可以非常方便。
   ```
   
   The original English doc doesn't need the `we`, right?
   
   If yes, we can remove it in this PR directly.



##
docs/content/docs/custom-resource/pod-template.md:
##
@@ -26,21 +26,18 @@ under the License.
 
 # Pod template
 
-The operator CRD is designed to have a minimal set of direct, short-hand CRD 
settings to express the most
-basic attributes of a deployment. For all other settings the CRD provides the 
`flinkConfiguration` and
-`podTemplate` fields.
+
 
-Pod templates permit customization of the Flink job and task manager pods, for 
example to specify
-volume mounts, ephemeral storage, sidecar containers etc.
+Operator CRD 被设计为一组直接、简短的 CRD 设置,以表达部署的最基本属性。对于所有其他设置,CRD 提供了 
`flinkConfiguration` 和 `podTemplate` 字段。
 
-Pod templates can be layered, as shown in the example below.
-A common pod template may hold the settings that apply to both job and task 
manager,
-like `volum

[jira] [Resolved] (FLINK-35149) Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink

2024-06-06 Thread Jiabao Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiabao Sun resolved FLINK-35149.

Release Note: cdc release-3.1: fa9fb0b1c49848e77c211a5913d7f28c33e04ff0
  Resolution: Fixed

> Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not 
> TwoPhaseCommittingSink
> ---
>
> Key: FLINK-35149
> URL: https://issues.apache.org/jira/browse/FLINK-35149
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.2.0, cdc-3.1.1
>
>
> Current , when sink is not instanceof TwoPhaseCommittingSink, use 
> input.transform rather than stream. It means that pre-write topology will be 
> ignored.
> {code:java}
> private void sinkTo(
> DataStream input,
> Sink sink,
> String sinkName,
> OperatorID schemaOperatorID) {
> DataStream stream = input;
> // Pre write topology
> if (sink instanceof WithPreWriteTopology) {
> stream = ((WithPreWriteTopology) 
> sink).addPreWriteTopology(stream);
> }
> if (sink instanceof TwoPhaseCommittingSink) {
> addCommittingTopology(sink, stream, sinkName, schemaOperatorID);
> } else {
> input.transform(
> SINK_WRITER_PREFIX + sinkName,
> CommittableMessageTypeInfo.noOutput(),
> new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID));
> }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35149) Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink

2024-06-06 Thread Jiabao Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiabao Sun updated FLINK-35149:
---
Release Note:   (was: cdc release-3.1: 
fa9fb0b1c49848e77c211a5913d7f28c33e04ff0)

> Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not 
> TwoPhaseCommittingSink
> ---
>
> Key: FLINK-35149
> URL: https://issues.apache.org/jira/browse/FLINK-35149
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.2.0, cdc-3.1.1
>
>
> Current , when sink is not instanceof TwoPhaseCommittingSink, use 
> input.transform rather than stream. It means that pre-write topology will be 
> ignored.
> {code:java}
> private void sinkTo(
> DataStream input,
> Sink sink,
> String sinkName,
> OperatorID schemaOperatorID) {
> DataStream stream = input;
> // Pre write topology
> if (sink instanceof WithPreWriteTopology) {
> stream = ((WithPreWriteTopology) 
> sink).addPreWriteTopology(stream);
> }
> if (sink instanceof TwoPhaseCommittingSink) {
> addCommittingTopology(sink, stream, sinkName, schemaOperatorID);
> } else {
> input.transform(
> SINK_WRITER_PREFIX + sinkName,
> CommittableMessageTypeInfo.noOutput(),
> new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID));
> }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35149) Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink

2024-06-06 Thread Jiabao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852690#comment-17852690
 ] 

Jiabao Sun commented on FLINK-35149:


flink-cdc release-3.1: fa9fb0b1c49848e77c211a5913d7f28c33e04ff0

> Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not 
> TwoPhaseCommittingSink
> ---
>
> Key: FLINK-35149
> URL: https://issues.apache.org/jira/browse/FLINK-35149
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.2.0, cdc-3.1.1
>
>
> Current , when sink is not instanceof TwoPhaseCommittingSink, use 
> input.transform rather than stream. It means that pre-write topology will be 
> ignored.
> {code:java}
> private void sinkTo(
> DataStream input,
> Sink sink,
> String sinkName,
> OperatorID schemaOperatorID) {
> DataStream stream = input;
> // Pre write topology
> if (sink instanceof WithPreWriteTopology) {
> stream = ((WithPreWriteTopology) 
> sink).addPreWriteTopology(stream);
> }
> if (sink instanceof TwoPhaseCommittingSink) {
> addCommittingTopology(sink, stream, sinkName, schemaOperatorID);
> } else {
> input.transform(
> SINK_WRITER_PREFIX + sinkName,
> CommittableMessageTypeInfo.noOutput(),
> new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID));
> }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33278) RemotePekkoRpcActorTest.failsRpcResultImmediatelyIfRemoteRpcServiceIsNotAvailable fails on AZP

2024-06-06 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-33278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852691#comment-17852691
 ] 

Pedro Mázala commented on FLINK-33278:
--

Is there any updates on this?

I'm also seeing it on Flink 1.18.x

> RemotePekkoRpcActorTest.failsRpcResultImmediatelyIfRemoteRpcServiceIsNotAvailable
>  fails on AZP
> --
>
> Key: FLINK-33278
> URL: https://issues.apache.org/jira/browse/FLINK-33278
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / RPC
>Affects Versions: 1.19.0
>Reporter: Sergey Nuyanzin
>Priority: Critical
>  Labels: test-stability
> Attachments: screenshot-1.png, screenshot-2.png, screenshot-3.png, 
> screenshot-4.png
>
>
> This build 
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53740&view=logs&j=0e7be18f-84f2-53f0-a32d-4a5e4a174679&t=7c1d86e3-35bd-5fd5-3b7c-30c126a78702&l=6563]
> fails as
> {noformat}
> Oct 15 01:02:20 Multiple Failures (1 failure)
> Oct 15 01:02:20 -- failure 1 --
> Oct 15 01:02:20 [Any cause is instance of class 'class 
> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException'] 
> Oct 15 01:02:20 Expecting any element of:
> Oct 15 01:02:20   [java.util.concurrent.CompletionException: 
> java.util.concurrent.TimeoutException: Invocation of 
> [RemoteRpcInvocation(SerializedValueRespondingGateway.getSerializedValue())] 
> at recipient 
> [pekko.tcp://flink@localhost:38231/user/rpc/8c211f34-41e5-4efe-93bd-8eca6c590a7f]
>  timed out. This is usually caused by: 1) Pekko failed sending the message 
> silently, due to problems like oversized payload or serialization failures. 
> In that case, you should find detailed error information in the logs. 2) The 
> recipient needs more time for responding, due to problems like slow machines 
> or network jitters. In that case, you can try to increase pekko.ask.timeout.
> Oct 15 01:02:20   at 
> java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:375)
> Oct 15 01:02:20   at 
> java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
> Oct 15 01:02:20   at 
> org.apache.flink.runtime.rpc.pekko.RemotePekkoRpcActorTest.lambda$failsRpcResultImmediatelyIfRemoteRpcServiceIsNotAvailable$1(RemotePekkoRpcActorTest.java:168)
> Oct 15 01:02:20   ...(63 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed),
> ...
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35541) Introduce retry limiting for AWS connector sinks

2024-06-06 Thread Aleksandr Pilipenko (Jira)
Aleksandr Pilipenko created FLINK-35541:
---

 Summary: Introduce retry limiting for AWS connector sinks
 Key: FLINK-35541
 URL: https://issues.apache.org/jira/browse/FLINK-35541
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / AWS, Connectors / DynamoDB, Connectors / 
Firehose, Connectors / Kinesis
Affects Versions: aws-connector-4.2.0
Reporter: Aleksandr Pilipenko


Currently if the record write operation in the sink consistently fails with 
retriable error, sinks will retry indefinitely. In case when cause of the error 
is not resolved this may lead to stuck operator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-06-06 Thread via GitHub


liuyongvs commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1629112803


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayIntersectFunction.java:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider;
+import org.apache.flink.table.runtime.util.ObjectContainer;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_INTERSECT}. */
+@Internal
+public class ArrayIntersectFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+private final EqualityAndHashcodeProvider equalityAndHashcodeProvider;
+
+public ArrayIntersectFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.ARRAY_INTERSECT, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType()
+.toInternal();
+elementGetter = 
ArrayData.createElementGetter(dataType.toInternal().getLogicalType());
+this.equalityAndHashcodeProvider = new 
EqualityAndHashcodeProvider(context, dataType);
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+equalityAndHashcodeProvider.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) {
+try {
+if (arrayOne == null || arrayTwo == null) {
+return null;
+}
+
+Set set = new HashSet<>();
+for (int pos = 0; pos < arrayTwo.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayTwo, pos);
+final ObjectContainer objectContainer = 
createObjectContainer(element);
+set.add(objectContainer);
+}
+
+Set res = new LinkedHashSet<>();

Review Comment:
if a record present in set then add it to array list otherwise add it to 
set?
   how to deduplicate?
   if array1: a, a, array2: b, a
   how to deduplicate a ,a ? @snuyanzin 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-06-06 Thread via GitHub


liuyongvs commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1629113958


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayIntersectFunction.java:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider;
+import org.apache.flink.table.runtime.util.ObjectContainer;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_INTERSECT}. */
+@Internal
+public class ArrayIntersectFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+private final EqualityAndHashcodeProvider equalityAndHashcodeProvider;
+
+public ArrayIntersectFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.ARRAY_INTERSECT, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType()
+.toInternal();
+elementGetter = 
ArrayData.createElementGetter(dataType.toInternal().getLogicalType());
+this.equalityAndHashcodeProvider = new 
EqualityAndHashcodeProvider(context, dataType);
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+equalityAndHashcodeProvider.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) {
+try {
+if (arrayOne == null || arrayTwo == null) {
+return null;
+}
+
+Set set = new HashSet<>();
+for (int pos = 0; pos < arrayTwo.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayTwo, pos);
+final ObjectContainer objectContainer = 
createObjectContainer(element);
+set.add(objectContainer);
+}
+
+Set res = new LinkedHashSet<>();
+for (int pos = 0; pos < arrayOne.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayOne, pos);
+final ObjectContainer objectContainer = 
createObjectContainer(element);
+if (set.contains(objectContainer)) {
+res.add(objectContainer);
+}
+}
+
+return new GenericArrayData(
+res.stream()
+.map(element -> element != null ? 
element.getObject() : null)
+.toArray());

Review Comment:
   when element has null, it will NPE @snuyanzin 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33278) RemotePekkoRpcActorTest.failsRpcResultImmediatelyIfRemoteRpcServiceIsNotAvailable fails on AZP

2024-06-06 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852700#comment-17852700
 ] 

Matthias Pohl commented on FLINK-33278:
---

What are you seeing on Flink 1.18.x? Are you referring to the test instability 
or some error?

I guess, we haven't continued working on the issue for now because that test 
instability was only observed once so far.

> RemotePekkoRpcActorTest.failsRpcResultImmediatelyIfRemoteRpcServiceIsNotAvailable
>  fails on AZP
> --
>
> Key: FLINK-33278
> URL: https://issues.apache.org/jira/browse/FLINK-33278
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / RPC
>Affects Versions: 1.19.0
>Reporter: Sergey Nuyanzin
>Priority: Critical
>  Labels: test-stability
> Attachments: screenshot-1.png, screenshot-2.png, screenshot-3.png, 
> screenshot-4.png
>
>
> This build 
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53740&view=logs&j=0e7be18f-84f2-53f0-a32d-4a5e4a174679&t=7c1d86e3-35bd-5fd5-3b7c-30c126a78702&l=6563]
> fails as
> {noformat}
> Oct 15 01:02:20 Multiple Failures (1 failure)
> Oct 15 01:02:20 -- failure 1 --
> Oct 15 01:02:20 [Any cause is instance of class 'class 
> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException'] 
> Oct 15 01:02:20 Expecting any element of:
> Oct 15 01:02:20   [java.util.concurrent.CompletionException: 
> java.util.concurrent.TimeoutException: Invocation of 
> [RemoteRpcInvocation(SerializedValueRespondingGateway.getSerializedValue())] 
> at recipient 
> [pekko.tcp://flink@localhost:38231/user/rpc/8c211f34-41e5-4efe-93bd-8eca6c590a7f]
>  timed out. This is usually caused by: 1) Pekko failed sending the message 
> silently, due to problems like oversized payload or serialization failures. 
> In that case, you should find detailed error information in the logs. 2) The 
> recipient needs more time for responding, due to problems like slow machines 
> or network jitters. In that case, you can try to increase pekko.ask.timeout.
> Oct 15 01:02:20   at 
> java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:375)
> Oct 15 01:02:20   at 
> java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
> Oct 15 01:02:20   at 
> org.apache.flink.runtime.rpc.pekko.RemotePekkoRpcActorTest.lambda$failsRpcResultImmediatelyIfRemoteRpcServiceIsNotAvailable$1(RemotePekkoRpcActorTest.java:168)
> Oct 15 01:02:20   ...(63 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed),
> ...
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35490][cdc][JUnit5 Migration] For `flink-cdc-connect/flink-cdc-source-connectors` module [flink-cdc]

2024-06-06 Thread via GitHub


morazow opened a new pull request, #3395:
URL: https://github.com/apache/flink-cdc/pull/3395

   https://issues.apache.org/jira/browse/FLINK-35490
   
   Junit4 to Junit5 migration of 
`flink-cdc-connect/flink-cdc-source-connectors` module. Reference document: 
[Junit5 Migration 
Guide](https://docs.google.com/document/d/1514Wa_aNB9bJUen4xm5uiuXOooOJTtXqS_Jqk9KJitU).
   
   - All the unit and integration tests should work as expected


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35490) [JUnit5 Migration] Module: Flink CDC flink-cdc-connect/flink-cdc-source-connectors

2024-06-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35490:
---
Labels: pull-request-available  (was: )

> [JUnit5 Migration] Module: Flink CDC 
> flink-cdc-connect/flink-cdc-source-connectors
> --
>
> Key: FLINK-35490
> URL: https://issues.apache.org/jira/browse/FLINK-35490
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: Muhammet Orazov
>Priority: Major
>  Labels: pull-request-available
>
> Migrate Junit4 tests to Junit5 in the Flink CDC following modules:
>  
> - flink-cdc-connect/flink-cdc-source-connectors



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35526) Remove deprecated stedolan/jq Docker image from Flink e2e tests

2024-06-06 Thread Robert Metzger (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852710#comment-17852710
 ] 

Robert Metzger commented on FLINK-35526:


1.18: 
https://github.com/apache/flink/commit/b51dc716ce48862269fdd409ea85036d61ee7b57
1.19: 
https://github.com/apache/flink/commit/b9c666e6a51702c35101af2d13277115f48be993

> Remove deprecated stedolan/jq Docker image from Flink e2e tests
> ---
>
> Key: FLINK-35526
> URL: https://issues.apache.org/jira/browse/FLINK-35526
> Project: Flink
>  Issue Type: Bug
>  Components: Test Infrastructure
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Minor
>  Labels: pull-request-available
>
> Our CI logs contain this warning: 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60060&view=logs&j=af184cdd-c6d8-5084-0b69-7e9c67b35f7a&t=0f3adb59-eefa-51c6-2858-3654d9e0749d&l=3828
> {code}
> latest: Pulling from stedolan/jq
> [DEPRECATION NOTICE] Docker Image Format v1, and Docker Image manifest 
> version 2, schema 1 support will be removed in an upcoming release. Suggest 
> the author of docker.io/stedolan/jq:latest to upgrade the image to the OCI 
> Format, or Docker Image manifest v2, schema 2. More information at 
> https://docs.docker.com/go/deprecated-image-specs/
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35526) Remove deprecated stedolan/jq Docker image from Flink e2e tests

2024-06-06 Thread Robert Metzger (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger resolved FLINK-35526.

Fix Version/s: 1.18.2
   1.19.1
   Resolution: Fixed

> Remove deprecated stedolan/jq Docker image from Flink e2e tests
> ---
>
> Key: FLINK-35526
> URL: https://issues.apache.org/jira/browse/FLINK-35526
> Project: Flink
>  Issue Type: Bug
>  Components: Test Infrastructure
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.18.2, 1.19.1
>
>
> Our CI logs contain this warning: 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60060&view=logs&j=af184cdd-c6d8-5084-0b69-7e9c67b35f7a&t=0f3adb59-eefa-51c6-2858-3654d9e0749d&l=3828
> {code}
> latest: Pulling from stedolan/jq
> [DEPRECATION NOTICE] Docker Image Format v1, and Docker Image manifest 
> version 2, schema 1 support will be removed in an upcoming release. Suggest 
> the author of docker.io/stedolan/jq:latest to upgrade the image to the OCI 
> Format, or Docker Image manifest v2, schema 2. More information at 
> https://docs.docker.com/go/deprecated-image-specs/
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Adding MongoDB Connector v1.2.0 [flink-web]

2024-06-06 Thread via GitHub


dannycranmer merged PR #735:
URL: https://github.com/apache/flink-web/pull/735


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-35139) Release flink-connector-mongodb v1.2.0 for Flink 1.19

2024-06-06 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer resolved FLINK-35139.
---
Resolution: Fixed

> Release flink-connector-mongodb v1.2.0 for Flink 1.19
> -
>
> Key: FLINK-35139
> URL: https://issues.apache.org/jira/browse/FLINK-35139
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / MongoDB
>Reporter: Danny Cranmer
>Assignee: Danny Cranmer
>Priority: Major
>  Labels: pull-request-available
> Fix For: mongodb-1.2.0
>
>
> https://github.com/apache/flink-connector-mongodb



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35532][Runtime/Web Frontend] Prevent Cross-Site Authentication… [flink]

2024-06-06 Thread via GitHub


hlteoh37 merged PR #24897:
URL: https://github.com/apache/flink/pull/24897


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35532) Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard

2024-06-06 Thread Hong Liang Teoh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852716#comment-17852716
 ] 

Hong Liang Teoh commented on FLINK-35532:
-

merged commit 
[{{ceb4db4}}|https://github.com/apache/flink/commit/ceb4db4d88a91546a179cc9fe1bc86ea1d7bb42a]
 into   apache:master

> Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard
> --
>
> Key: FLINK-35532
> URL: https://issues.apache.org/jira/browse/FLINK-35532
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Web Frontend
>Affects Versions: 1.19.0, 1.19.1
>Reporter: Hong Liang Teoh
>Assignee: Hong Liang Teoh
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.2
>
>
> As part of FLINK-33325, we introduced a new tab on the Flink dashboard to 
> trigger the async profiler on the JobManager and TaskManager.
>  
> The HTML component introduced links out to async profiler page on Github -> 
> [https://github.com/async-profiler/async-profiler/wiki].
> However, the anchor element introduced does not follow best practices around 
> preventing XSA attacks, by setting up the below:
> {code:java}
> target="_blank" rel="noopener noreferrer"{code}
> We should add these attributes as best practice!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35532][Runtime/Web Frontend] Prevent Cross-Site Authentication… [flink]

2024-06-06 Thread via GitHub


hlteoh37 opened a new pull request, #24903:
URL: https://github.com/apache/flink/pull/24903

   … (XSA) attacks on Flink dashboard
   
   
   ## What is the purpose of the change
   - Backport of https://github.com/apache/flink/pull/24897
   - Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard
   
   ## Brief change log
   - Set `target="_blank" rel="noopener noreferrer"` properties on the 
hyperlink reference element on Flink dashboard.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35532][Runtime/Web Frontend] Prevent Cross-Site Authentication… [flink]

2024-06-06 Thread via GitHub


hlteoh37 merged PR #24903:
URL: https://github.com/apache/flink/pull/24903


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35532) Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard

2024-06-06 Thread Hong Liang Teoh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852719#comment-17852719
 ] 

Hong Liang Teoh commented on FLINK-35532:
-

 merged commit 
[{{175ed72}}|https://github.com/apache/flink/commit/175ed72e15c6228dbc36cf42105ef9f5a03b]
 into   apache:release-1.19

> Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard
> --
>
> Key: FLINK-35532
> URL: https://issues.apache.org/jira/browse/FLINK-35532
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Web Frontend
>Affects Versions: 1.19.0, 1.19.1
>Reporter: Hong Liang Teoh
>Assignee: Hong Liang Teoh
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.2
>
>
> As part of FLINK-33325, we introduced a new tab on the Flink dashboard to 
> trigger the async profiler on the JobManager and TaskManager.
>  
> The HTML component introduced links out to async profiler page on Github -> 
> [https://github.com/async-profiler/async-profiler/wiki].
> However, the anchor element introduced does not follow best practices around 
> preventing XSA attacks, by setting up the below:
> {code:java}
> target="_blank" rel="noopener noreferrer"{code}
> We should add these attributes as best practice!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35532) Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard

2024-06-06 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh updated FLINK-35532:

Fix Version/s: 1.20.0
   1.19.1
   (was: 1.19.2)

> Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard
> --
>
> Key: FLINK-35532
> URL: https://issues.apache.org/jira/browse/FLINK-35532
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Web Frontend
>Affects Versions: 1.19.0, 1.19.1
>Reporter: Hong Liang Teoh
>Assignee: Hong Liang Teoh
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.20.0, 1.19.1
>
>
> As part of FLINK-33325, we introduced a new tab on the Flink dashboard to 
> trigger the async profiler on the JobManager and TaskManager.
>  
> The HTML component introduced links out to async profiler page on Github -> 
> [https://github.com/async-profiler/async-profiler/wiki].
> However, the anchor element introduced does not follow best practices around 
> preventing XSA attacks, by setting up the below:
> {code:java}
> target="_blank" rel="noopener noreferrer"{code}
> We should add these attributes as best practice!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35532) Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard

2024-06-06 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh resolved FLINK-35532.
-
Resolution: Fixed

> Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard
> --
>
> Key: FLINK-35532
> URL: https://issues.apache.org/jira/browse/FLINK-35532
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Web Frontend
>Affects Versions: 1.19.0
>Reporter: Hong Liang Teoh
>Assignee: Hong Liang Teoh
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.20.0, 1.19.1
>
>
> As part of FLINK-33325, we introduced a new tab on the Flink dashboard to 
> trigger the async profiler on the JobManager and TaskManager.
>  
> The HTML component introduced links out to async profiler page on Github -> 
> [https://github.com/async-profiler/async-profiler/wiki].
> However, the anchor element introduced does not follow best practices around 
> preventing XSA attacks, by setting up the below:
> {code:java}
> target="_blank" rel="noopener noreferrer"{code}
> We should add these attributes as best practice!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35532) Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard

2024-06-06 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh updated FLINK-35532:

Affects Version/s: (was: 1.19.1)

> Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard
> --
>
> Key: FLINK-35532
> URL: https://issues.apache.org/jira/browse/FLINK-35532
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Web Frontend
>Affects Versions: 1.19.0
>Reporter: Hong Liang Teoh
>Assignee: Hong Liang Teoh
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.20.0, 1.19.1
>
>
> As part of FLINK-33325, we introduced a new tab on the Flink dashboard to 
> trigger the async profiler on the JobManager and TaskManager.
>  
> The HTML component introduced links out to async profiler page on Github -> 
> [https://github.com/async-profiler/async-profiler/wiki].
> However, the anchor element introduced does not follow best practices around 
> preventing XSA attacks, by setting up the below:
> {code:java}
> target="_blank" rel="noopener noreferrer"{code}
> We should add these attributes as best practice!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35542) ClassNotFoundException when deserializing CheckpointedOffset

2024-06-06 Thread Jan Gurda (Jira)
Jan Gurda created FLINK-35542:
-

 Summary: ClassNotFoundException when deserializing 
CheckpointedOffset
 Key: FLINK-35542
 URL: https://issues.apache.org/jira/browse/FLINK-35542
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Affects Versions: jdbc-3.1.2
 Environment: Flink 1.19.0

Flink JDBC Connector 3.2-SNAPSHOT (commit 
2defbbcf4fc550a76dd9c664e1eed7d261e028ca)

JDK 11 (Temurin)
Reporter: Jan Gurda
 Fix For: jdbc-3.2.0


I use the latest flink-connector-jdbc code from the main branch, it's actually 
3.2-SNAPSHOT (commit 2defbbcf4fc550a76dd9c664e1eed7d261e028ca).

 

When jobs get interrupted while reading data from the JDBC source (for example, 
by the TaskManager outage), they cannot recover due to the following exception:
{code:java}
java.lang.RuntimeException: java.lang.ClassNotFoundException: 
org.apache.flink.connector.jdbc.source.split.CheckpointedOffset
    at 
org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:71)
    at 
org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:34)
    at 
org.apache.flink.connector.base.source.hybrid.HybridSourceSplit.unwrapSplit(HybridSourceSplit.java:122)
    at 
org.apache.flink.connector.base.source.hybrid.HybridSourceReader.addSplits(HybridSourceReader.java:158)
    at 
org.apache.flink.connector.base.source.hybrid.HybridSourceReader.setCurrentReader(HybridSourceReader.java:247)
    at 
org.apache.flink.connector.base.source.hybrid.HybridSourceReader.handleSourceEvents(HybridSourceReader.java:186)
    at 
org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:571)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:72)
    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:80)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$22(StreamTask.java:1540)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.connector.jdbc.source.split.CheckpointedOffset
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown 
Source)
    at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown 
Source)
    at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
    at java.base/java.lang.Class.forName0(Native Method)
    at java.base/java.lang.Class.forName(Unknown Source)
    at java.base/java.io.ObjectInputStream.resolveClass(Unknown Source)
    at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:92)
    at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
    at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
    at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539)
    at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527)
    at 
org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserializeJdbcSourceSplit(JdbcSourceSplitSerializer.java:109)
    at 
org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:69)
 

[jira] [Updated] (FLINK-35542) ClassNotFoundException when deserializing CheckpointedOffset

2024-06-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35542:
---
Labels: pull-request-available  (was: )

> ClassNotFoundException when deserializing CheckpointedOffset
> 
>
> Key: FLINK-35542
> URL: https://issues.apache.org/jira/browse/FLINK-35542
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: jdbc-3.1.2
> Environment: Flink 1.19.0
> Flink JDBC Connector 3.2-SNAPSHOT (commit 
> 2defbbcf4fc550a76dd9c664e1eed7d261e028ca)
> JDK 11 (Temurin)
>Reporter: Jan Gurda
>Priority: Major
>  Labels: pull-request-available
> Fix For: jdbc-3.2.0
>
>
> I use the latest flink-connector-jdbc code from the main branch, it's 
> actually 3.2-SNAPSHOT (commit 2defbbcf4fc550a76dd9c664e1eed7d261e028ca).
>  
> When jobs get interrupted while reading data from the JDBC source (for 
> example, by the TaskManager outage), they cannot recover due to the following 
> exception:
> {code:java}
> java.lang.RuntimeException: java.lang.ClassNotFoundException: 
> org.apache.flink.connector.jdbc.source.split.CheckpointedOffset
>     at 
> org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:71)
>     at 
> org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:34)
>     at 
> org.apache.flink.connector.base.source.hybrid.HybridSourceSplit.unwrapSplit(HybridSourceSplit.java:122)
>     at 
> org.apache.flink.connector.base.source.hybrid.HybridSourceReader.addSplits(HybridSourceReader.java:158)
>     at 
> org.apache.flink.connector.base.source.hybrid.HybridSourceReader.setCurrentReader(HybridSourceReader.java:247)
>     at 
> org.apache.flink.connector.base.source.hybrid.HybridSourceReader.handleSourceEvents(HybridSourceReader.java:186)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:571)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:72)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:80)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$22(StreamTask.java:1540)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>     at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.connector.jdbc.source.split.CheckpointedOffset
>     at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown 
> Source)
>     at 
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown 
> Source)
>     at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>     at java.base/java.lang.Class.forName0(Native Method)
>     at java.base/java.lang.Class.forName(Unknown Source)
>     at java.base/java.io.ObjectInputStream.resolveClass(Unknown Source)
>     at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:92)
>     at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
> 

[jira] [Updated] (FLINK-35541) Introduce retry limiting for AWS connector sinks

2024-06-06 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh updated FLINK-35541:

Description: 
Currently if the record write operation in the sink consistently fails with 
retriable error, sinks will retry indefinitely. In case when cause of the error 
is not resolved this may lead to poison pill.

 

Proposal here is to add a configurable retry limit for each record. Users can 
specify a maximum retry per record, and the sink will fail once the retry limit 
is reached.

 

We will implement this for all AWS connectors:
 * DDBSink
 * Firehose Sink
 * Kinesis Sink

 

  was:Currently if the record write operation in the sink consistently fails 
with retriable error, sinks will retry indefinitely. In case when cause of the 
error is not resolved this may lead to stuck operator.


> Introduce retry limiting for AWS connector sinks
> 
>
> Key: FLINK-35541
> URL: https://issues.apache.org/jira/browse/FLINK-35541
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / AWS, Connectors / DynamoDB, Connectors / 
> Firehose, Connectors / Kinesis
>Affects Versions: aws-connector-4.2.0
>Reporter: Aleksandr Pilipenko
>Priority: Major
>
> Currently if the record write operation in the sink consistently fails with 
> retriable error, sinks will retry indefinitely. In case when cause of the 
> error is not resolved this may lead to poison pill.
>  
> Proposal here is to add a configurable retry limit for each record. Users can 
> specify a maximum retry per record, and the sink will fail once the retry 
> limit is reached.
>  
> We will implement this for all AWS connectors:
>  * DDBSink
>  * Firehose Sink
>  * Kinesis Sink
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35542] Set the correct class loader to lookup for the class CheckpointedOffset [flink-connector-jdbc]

2024-06-06 Thread via GitHub


jan-osc opened a new pull request, #130:
URL: https://github.com/apache/flink-connector-jdbc/pull/130

   - [FLINK-35542]  Use the class loader of class CheckpointedOffset while 
deserializing JdbcSourceSplit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35542] Set the correct class loader to lookup for the class CheckpointedOffset [flink-connector-jdbc]

2024-06-06 Thread via GitHub


boring-cyborg[bot] commented on PR #130:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/130#issuecomment-2151980850

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35540][cdc-common][cdc-connector][mysql] fix lost table when database and table are with the same name [flink-cdc]

2024-06-06 Thread via GitHub


qg-lin opened a new pull request, #3396:
URL: https://github.com/apache/flink-cdc/pull/3396

   https://issues.apache.org/jira/browse/FLINK-35540


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35540) flink-cdc-pipeline-connector-mysql lost table which database and table with the same name

2024-06-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35540:
---
Labels: pull-request-available  (was: )

> flink-cdc-pipeline-connector-mysql lost table which database and table with 
> the same name
> -
>
> Key: FLINK-35540
> URL: https://issues.apache.org/jira/browse/FLINK-35540
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: linqigeng
>Priority: Major
>  Labels: pull-request-available
>
> h1. Description
> When the parameter of 'tables' in mysql pipeline job contains a table which 
> database and table are with the same name like 'app.app', the job will fail 
> and the error meaasge is like:
> {code:java}
> java.lang.IllegalArgumentException: Cannot find any table by the option 
> 'tables' = app.app {code}
> h1. How to reproduce
> Create database and table all named `{{{}app`{}}}, then submit a pipeline job 
> like this YAML defined:
> {code:java}
> source:
>   type: mysql
>   hostname: localhost
>   port: 3306
>   username: root
>   password: 123456
>   tables: app.app
>   server-id: 5400-5404
>   server-time-zone: UTC
> sink:
>   type: doris
>   fenodes: 127.0.0.1:8030
>   username: root
>   password: ""
>   table.create.properties.light_schema_change: true
>   table.create.properties.replication_num: 1pipeline:
>   name: Sync MySQL Database to Doris
>   parallelism: 2 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [BP-3.1][FLINK-35415][base] Fix compatibility with Flink 1.19 [flink-cdc]

2024-06-06 Thread via GitHub


yuxiqian commented on PR #3375:
URL: https://github.com/apache/flink-cdc/pull/3375#issuecomment-2152104901

   Fixed E2e package failure & rebased to `release-3.1` branch, could 
@leonardBang please trigger the CI? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-3.1][FLINK-35325][cdc-connector][paimon]Support for specifying column order when adding new columns to a table. [flink-cdc]

2024-06-06 Thread via GitHub


leonardBang commented on PR #3376:
URL: https://github.com/apache/flink-cdc/pull/3376#issuecomment-2152114473

   Thanks @joyCurry30 for the contribution, I close this one as it's a feature 
and should not be merged to release-3.1, only bugfix should be merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-3.1][FLINK-35325][cdc-connector][paimon]Support for specifying column order when adding new columns to a table. [flink-cdc]

2024-06-06 Thread via GitHub


leonardBang closed pull request #3376: 
[BP-3.1][FLINK-35325][cdc-connector][paimon]Support for specifying column order 
when adding new columns to a table.
URL: https://github.com/apache/flink-cdc/pull/3376


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20625][pubsub,e2e] Add PubSubSource connector using FLIP-27 [flink-connector-gcp-pubsub]

2024-06-06 Thread via GitHub


vahmed-hamdy commented on code in PR #2:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/2#discussion_r1627352870


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/source/PubSubSource.java:
##
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.gcp.pubsub.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import 
org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorState;
+import 
org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubEnumeratorStateSerializer;
+import 
org.apache.flink.connector.gcp.pubsub.source.enumerator.PubSubSourceEnumerator;
+import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubRecordEmitter;
+import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSourceReader;
+import org.apache.flink.connector.gcp.pubsub.source.reader.PubSubSplitReader;
+import org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplit;
+import 
org.apache.flink.connector.gcp.pubsub.source.split.PubSubSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import org.apache.flink.util.Preconditions;
+
+import com.google.auth.Credentials;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * A source implementation to pull messages from GCP Pub/Sub into Flink.
+ *
+ * The {@link PubSubSourceEnumerator} assigns a static {@link PubSubSplit} 
to every {@link
+ * PubSubSourceReader} that joins. The split does not contain any 
split-specific information because
+ * Pub/Sub does not allow subscribers to specify a "range" of messages to pull 
by providing
+ * partitions or offsets. However, Pub/Sub will automatically load-balance 
messages between multiple
+ * readers using same subscription.
+ *
+ * A {@link PubSubSource} can be constructed through the {@link 
PubSubSourceBuilder} like so:
+ *
+ * {@code
+ * PubSubSource.builder()
+ * // The deserialization schema to deserialize Pub/Sub messages
+ * .setDeserializationSchema(new SimpleStringSchema())
+ * // The name string of your Pub/Sub project
+ * .setProjectName(PROJECT_NAME)
+ * // The name string of the subscription you would like to receive 
messages from
+ * .setSubscriptionName(SUBSCRIPTION_NAME)
+ * // An instance of com.google.auth.Credentials to authenticate 
against Google Cloud
+ * .setCredentials(CREDENTIALS)
+ * .setPubSubSubscriberFactory(
+ * // The maximum number of messages that should be pulled in 
one go
+ * 3,
+ * // The timeout after which a message pull request is deemed 
a failure
+ * Duration.ofSeconds(1),
+ * // The number of times the reception of a

[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-06-06 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852748#comment-17852748
 ] 

Matthias Pohl commented on FLINK-35035:
---

Thanks for the pointer, [~dmvk]. We looked into this issue while working on 
[FLIP-461|https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler]
 (which is kind of related) and plan to do a follow-up FLIP that will align the 
resource controlling mechanism of the {{AdaptiveScheduler}}'s 
{{WaitingForResources}} and {{Executing}} states. 

Currently, we have parameters intervening in the rescaling in different places 
([j.a.scaling-interval.min|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-min],
 
[j.a.scaling-interval.max|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-max]
 being utilized in {{Executing}} and 
[j.a.resource-stabilization-timeout|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-stabilization-timeout)
 being utilized in {{WaitingForResources}}). Having a 
{{resource-stabilization}} phase in {{Executing}} should resolve the problem 
described in this Jira issue here.

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-06-06 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852748#comment-17852748
 ] 

Matthias Pohl edited comment on FLINK-35035 at 6/6/24 11:47 AM:


Thanks for the pointer, [~dmvk]. We looked into this issue while working on 
[FLIP-461|https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler]
 (which is kind of related) and plan to do a follow-up FLIP that will align the 
resource controlling mechanism of the {{{}AdaptiveScheduler{}}}'s 
{{WaitingForResources}} and {{Executing}} states.

Currently, we have parameters intervening in the rescaling in different places 
([j.a.scaling-interval.min|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-min],
 
[j.a.scaling-interval.max|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-max]
 being utilized in {{Executing}} and 
[j.a.resource-stabilization-timeout|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-stabilization-timeout]
 being utilized in {{{}WaitingForResources){}}}. Having a 
{{resource-stabilization}} phase in {{Executing}} should resolve the problem 
described in this Jira issue here.


was (Author: mapohl):
Thanks for the pointer, [~dmvk]. We looked into this issue while working on 
[FLIP-461|https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler]
 (which is kind of related) and plan to do a follow-up FLIP that will align the 
resource controlling mechanism of the {{AdaptiveScheduler}}'s 
{{WaitingForResources}} and {{Executing}} states. 

Currently, we have parameters intervening in the rescaling in different places 
([j.a.scaling-interval.min|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-min],
 
[j.a.scaling-interval.max|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-max]
 being utilized in {{Executing}} and 
[j.a.resource-stabilization-timeout|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-stabilization-timeout)
 being utilized in {{WaitingForResources}}). Having a 
{{resource-stabilization}} phase in {{Executing}} should resolve the problem 
described in this Jira issue here.

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34908][pipeline-connector][doris] Fix Mysql pipeline to doris will lost precision for timestamp [flink-cdc]

2024-06-06 Thread via GitHub


leonardBang merged PR #3207:
URL: https://github.com/apache/flink-cdc/pull/3207


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34908][pipeline-connector][doris] Fix Mysql pipeline to doris will lost precision for timestamp [flink-cdc]

2024-06-06 Thread via GitHub


leonardBang commented on PR #3207:
URL: https://github.com/apache/flink-cdc/pull/3207#issuecomment-2152195587

   @gong would you like to open a PR for release-3.1 branch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34908) [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision for timestamp

2024-06-06 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-34908:
---
Issue Type: Bug  (was: Improvement)

> [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision 
> for timestamp
> ---
>
> Key: FLINK-34908
> URL: https://issues.apache.org/jira/browse/FLINK-34908
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Xin Gong
>Assignee: Xin Gong
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> flink cdc pipeline will decide timestamp zone by config of pipeline. I found 
> mysql2doris and mysql2starracks will specific datetime format
> -MM-dd HH:mm:ss, it will cause lost datatime precision. I think we 
> should't set fixed datetime format, just return LocalDateTime object.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34908) [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision for timestamp

2024-06-06 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852752#comment-17852752
 ] 

Leonard Xu commented on FLINK-34908:


master: e2ccc836a056c16974e4956190bdce249705b7ee

3.1: todo

> [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision 
> for timestamp
> ---
>
> Key: FLINK-34908
> URL: https://issues.apache.org/jira/browse/FLINK-34908
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Xin Gong
>Assignee: Xin Gong
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> flink cdc pipeline will decide timestamp zone by config of pipeline. I found 
> mysql2doris and mysql2starracks will specific datetime format
> -MM-dd HH:mm:ss, it will cause lost datatime precision. I think we 
> should't set fixed datetime format, just return LocalDateTime object.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34908) [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision for timestamp

2024-06-06 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-34908:
---
Fix Version/s: cdc-3.1.1

> [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision 
> for timestamp
> ---
>
> Key: FLINK-34908
> URL: https://issues.apache.org/jira/browse/FLINK-34908
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Xin Gong
>Assignee: Xin Gong
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.2.0, cdc-3.1.1
>
>
> flink cdc pipeline will decide timestamp zone by config of pipeline. I found 
> mysql2doris and mysql2starracks will specific datetime format
> -MM-dd HH:mm:ss, it will cause lost datatime precision. I think we 
> should't set fixed datetime format, just return LocalDateTime object.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34908) [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision for timestamp

2024-06-06 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-34908:
---
Affects Version/s: cdc-3.1.0

> [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision 
> for timestamp
> ---
>
> Key: FLINK-34908
> URL: https://issues.apache.org/jira/browse/FLINK-34908
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Xin Gong
>Assignee: Xin Gong
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> flink cdc pipeline will decide timestamp zone by config of pipeline. I found 
> mysql2doris and mysql2starracks will specific datetime format
> -MM-dd HH:mm:ss, it will cause lost datatime precision. I think we 
> should't set fixed datetime format, just return LocalDateTime object.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35092][cdc][starrocks] Add starrocks integration test cases [flink-cdc]

2024-06-06 Thread via GitHub


yuxiqian commented on PR #3231:
URL: https://github.com/apache/flink-cdc/pull/3231#issuecomment-2152228771

   Fixed in #3348.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35120][doris] Add Doris integration test cases [flink-cdc]

2024-06-06 Thread via GitHub


yuxiqian commented on PR #3227:
URL: https://github.com/apache/flink-cdc/pull/3227#issuecomment-2152229529

   Fixed in #3348.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35092][cdc][starrocks] Add starrocks integration test cases [flink-cdc]

2024-06-06 Thread via GitHub


yuxiqian closed pull request #3231: [FLINK-35092][cdc][starrocks] Add starrocks 
integration test cases
URL: https://github.com/apache/flink-cdc/pull/3231


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][tests] Fix occasional pipeline E2e testcases failure [flink-cdc]

2024-06-06 Thread via GitHub


yuxiqian commented on PR #3309:
URL: https://github.com/apache/flink-cdc/pull/3309#issuecomment-2152233915

   Fixed in #3348.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-3.1][minor][cdc][docs] Optimize markdown formats in doris quickstart doc [flink-cdc]

2024-06-06 Thread via GitHub


leonardBang merged PR #3325:
URL: https://github.com/apache/flink-cdc/pull/3325


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35120][doris] Add Doris integration test cases [flink-cdc]

2024-06-06 Thread via GitHub


yuxiqian closed pull request #3227: [FLINK-35120][doris] Add Doris integration 
test cases
URL: https://github.com/apache/flink-cdc/pull/3227


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-3.0][minor][cdc][docs] Optimize markdown formats in doris quickstart doc [flink-cdc]

2024-06-06 Thread via GitHub


leonardBang merged PR #3326:
URL: https://github.com/apache/flink-cdc/pull/3326


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][tests] Fix occasional pipeline E2e testcases failure [flink-cdc]

2024-06-06 Thread via GitHub


yuxiqian closed pull request #3309: [hotfix][tests] Fix occasional pipeline E2e 
testcases failure
URL: https://github.com/apache/flink-cdc/pull/3309


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35463]Fixed issue for route rule changed when restored from checkpoint. [flink-cdc]

2024-06-06 Thread via GitHub


yuxiqian commented on code in PR #3364:
URL: https://github.com/apache/flink-cdc/pull/3364#discussion_r1629411656


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java:
##
@@ -130,12 +130,13 @@ public CompletableFuture 
handleSchemaChangeRequest(
 LOG.info(
 "Received schema change event request from table {}. Start 
to buffer requests for others.",
 request.getTableId().toString());
-if (request.getSchemaChangeEvent() instanceof CreateTableEvent
-&& schemaManager.schemaExists(request.getTableId())) {
-return CompletableFuture.completedFuture(
-wrap(new 
SchemaChangeResponse(Collections.emptyList(;

Review Comment:
   I think this check is meant to avoid sending duplicate CreateTableEvent to 
downstream after restored from checkpoints. Will this patch change that 
behavior?



##
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManagerTest.java:
##


Review Comment:
   Could @hk-lrzy please add an integrated case to validate if route rule could 
be correctly restored?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35436]Fixed Schema Behavior Action for the Pipeline. [flink-cdc]

2024-06-06 Thread via GitHub


yuxiqian commented on PR #3355:
URL: https://github.com/apache/flink-cdc/pull/3355#issuecomment-2152262621

   Seems FLINK-35436 has lots of overlapping with FLINK-35242, which is 
expected to be shipped with CDC 3.2, I would prefer closing this PR first. 
Thoughts? @leonardBang


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35501] Use common IO thread pool for RocksDB data transfer [flink]

2024-06-06 Thread via GitHub


rkhachatryan merged PR #24882:
URL: https://github.com/apache/flink/pull/24882


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35501] Use common IO thread pool for RocksDB data transfer [flink]

2024-06-06 Thread via GitHub


rkhachatryan commented on PR #24882:
URL: https://github.com/apache/flink/pull/24882#issuecomment-2152267918

   Thanks for the review!
   Merged into master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35501) Use common thread pools when transferring RocksDB state files

2024-06-06 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852758#comment-17852758
 ] 

Roman Khachatryan commented on FLINK-35501:
---

Merged into master as 
9708f9fd65751296b3b0377c964207b630958259..5855f5354985cabea6f01b6b2effaaa8cfcbee55

> Use common thread pools when transferring RocksDB state files
> -
>
> Key: FLINK-35501
> URL: https://issues.apache.org/jira/browse/FLINK-35501
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.20.0
>Reporter: Roman Khachatryan
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Currently, each RocksDB state backend creates an executor backed by a thread 
> pool.
> This makes it difficult to control the total number of threads per TM because 
> it might have at least one task per slot and theoretically, many state 
> backends per task (because of chaining).
> Additionally, using a common thread pool allows to indirectly control the 
> load on the underlying DFS (e.g. the total number of requests to S3 from a 
> TM).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >