Re: [PR] [FLINK-35400][checkpoint] Release FileMergingSnapshotManager if all tasks finished [flink]
flinkbot commented on PR #24817: URL: https://github.com/apache/flink/pull/24817#issuecomment-2121766002 ## CI report: * 99713100d9b0ca11356ae2628edebc7429bedb46 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35400) Rebuild FileMergingSnapshotManager in failover
[ https://issues.apache.org/jira/browse/FLINK-35400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35400: --- Labels: pull-request-available (was: ) > Rebuild FileMergingSnapshotManager in failover > -- > > Key: FLINK-35400 > URL: https://issues.apache.org/jira/browse/FLINK-35400 > Project: Flink > Issue Type: Sub-task >Reporter: Zakelly Lan >Priority: Major > Labels: pull-request-available > > Currently, the {{FileMergingSnapshotManager}} is released within > {{{}releaseJobResources{}}}, which will not be invoked during failover and > restore. However, the manager should be created again to clear all internal > states in a new job attempt. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35400][checkpoint] Release FileMergingSnapshotManager if all tasks finished [flink]
Zakelly opened a new pull request, #24817: URL: https://github.com/apache/flink/pull/24817 ## What is the purpose of the change Currently, the `FileMergingSnapshotManager` is created for each job, only when the corresponding job released, the manager is released and disposed. For failover scenario, the tasks quit but job is still there, leading a reuse of `FileMergingSnapshotManager`, which violates the design of `FileMergingSnapshotManager`. ## Brief change log - Make `TaskExecutorFileMergingManager` record reference from `ExecutionAttemptID` to `FileMergingSnapshotManager`, and release the reference when task resources release. ## Verifying this change This change is already covered by modified existing tests, such as `TaskExecutorFileMergingManagerTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35395) Fix KeyedStateStore class annotation error
[ https://issues.apache.org/jira/browse/FLINK-35395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-35395: --- Priority: Minor (was: Major) > Fix KeyedStateStore class annotation error > -- > > Key: FLINK-35395 > URL: https://issues.apache.org/jira/browse/FLINK-35395 > Project: Flink > Issue Type: Improvement > Components: API / Core >Reporter: bradley >Assignee: bradley >Priority: Minor > Labels: pull-request-available > Fix For: 1.20.0 > > > The KeyedStateStore class is annotated incorrectly, and the examples there > are obviously wrong and will mislead users. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-35395) Fix KeyedStateStore class annotation error
[ https://issues.apache.org/jira/browse/FLINK-35395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo resolved FLINK-35395. Resolution: Fixed master(1.20) via 5f5722fe916bafc00ec05539d6c19ac28ac1c5c4. > Fix KeyedStateStore class annotation error > -- > > Key: FLINK-35395 > URL: https://issues.apache.org/jira/browse/FLINK-35395 > Project: Flink > Issue Type: Bug > Components: API / Core >Reporter: bradley >Assignee: bradley >Priority: Minor > Labels: pull-request-available > Fix For: 1.20.0 > > > The KeyedStateStore class is annotated incorrectly, and the examples there > are obviously wrong and will mislead users. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35395) Fix KeyedStateStore class annotation error
[ https://issues.apache.org/jira/browse/FLINK-35395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-35395: --- Issue Type: Bug (was: Improvement) > Fix KeyedStateStore class annotation error > -- > > Key: FLINK-35395 > URL: https://issues.apache.org/jira/browse/FLINK-35395 > Project: Flink > Issue Type: Bug > Components: API / Core >Reporter: bradley >Assignee: bradley >Priority: Minor > Labels: pull-request-available > Fix For: 1.20.0 > > > The KeyedStateStore class is annotated incorrectly, and the examples there > are obviously wrong and will mislead users. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-35395) Fix KeyedStateStore class annotation error
[ https://issues.apache.org/jira/browse/FLINK-35395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo closed FLINK-35395. -- > Fix KeyedStateStore class annotation error > -- > > Key: FLINK-35395 > URL: https://issues.apache.org/jira/browse/FLINK-35395 > Project: Flink > Issue Type: Bug > Components: API / Core >Reporter: bradley >Assignee: bradley >Priority: Minor > Labels: pull-request-available > Fix For: 1.20.0 > > > The KeyedStateStore class is annotated incorrectly, and the examples there > are obviously wrong and will mislead users. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35395) Fix KeyedStateStore class annotation error
[ https://issues.apache.org/jira/browse/FLINK-35395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo reassigned FLINK-35395: -- Assignee: bradley > Fix KeyedStateStore class annotation error > -- > > Key: FLINK-35395 > URL: https://issues.apache.org/jira/browse/FLINK-35395 > Project: Flink > Issue Type: Improvement > Components: API / Core >Reporter: bradley >Assignee: bradley >Priority: Major > Labels: pull-request-available > > The KeyedStateStore class is annotated incorrectly, and the examples there > are obviously wrong and will mislead users. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35395][core] Fix KeyedStateStore class annotation error [flink]
reswqa commented on PR #24808: URL: https://github.com/apache/flink/pull/24808#issuecomment-2121760418 Thanks @usberkeley , merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35395) Fix KeyedStateStore class annotation error
[ https://issues.apache.org/jira/browse/FLINK-35395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-35395: --- Fix Version/s: 1.20.0 > Fix KeyedStateStore class annotation error > -- > > Key: FLINK-35395 > URL: https://issues.apache.org/jira/browse/FLINK-35395 > Project: Flink > Issue Type: Improvement > Components: API / Core >Reporter: bradley >Assignee: bradley >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > The KeyedStateStore class is annotated incorrectly, and the examples there > are obviously wrong and will mislead users. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35395][core] Fix KeyedStateStore class annotation error [flink]
reswqa merged PR #24808: URL: https://github.com/apache/flink/pull/24808 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34944] Use Incremental Source Framework in Flink CDC OceanBase Source Connector [flink-cdc]
whhe commented on PR #3211: URL: https://github.com/apache/flink-cdc/pull/3211#issuecomment-2121749049 > hello,When can this feature be merged into version 3.2? I will add test cases for this PR in the next two days, and after that we can ask the committers to review it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35405) Add buffer size/in-flight records metrics for AsyncExecutionController
Yanfei Lei created FLINK-35405: -- Summary: Add buffer size/in-flight records metrics for AsyncExecutionController Key: FLINK-35405 URL: https://issues.apache.org/jira/browse/FLINK-35405 Project: Flink Issue Type: Sub-task Reporter: Yanfei Lei -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-35030) Introduce Epoch Manager for async execution
[ https://issues.apache.org/jira/browse/FLINK-35030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yanfei Lei resolved FLINK-35030. Resolution: Resolved > Introduce Epoch Manager for async execution > --- > > Key: FLINK-35030 > URL: https://issues.apache.org/jira/browse/FLINK-35030 > Project: Flink > Issue Type: Sub-task >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35030) Introduce Epoch Manager for async execution
[ https://issues.apache.org/jira/browse/FLINK-35030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848041#comment-17848041 ] Yanfei Lei commented on FLINK-35030: Merged into master via f1ecb9e > Introduce Epoch Manager for async execution > --- > > Key: FLINK-35030 > URL: https://issues.apache.org/jira/browse/FLINK-35030 > Project: Flink > Issue Type: Sub-task >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix][ForStStateBackend] Create new copy for each SerializedCompositeKeyBuilder in ForStStateBackend [flink]
flinkbot commented on PR #24816: URL: https://github.com/apache/flink/pull/24816#issuecomment-2121707146 ## CI report: * 6a92726158cb6a53b1fb0960a242d66973d9d447 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32706][table] Add built-in SPLIT_STRING function [flink]
mohitjain2504 commented on code in PR #24365: URL: https://github.com/apache/flink/pull/24365#discussion_r1607603000 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/SplitFunction.java: ## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.data.binary.BinaryStringDataUtil; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +/** Implementation of {@link BuiltInFunctionDefinitions#SPLIT}. */ +@Internal +public class SplitFunction extends BuiltInScalarFunction { +public SplitFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.SPLIT, context); +} + +public @Nullable ArrayData eval(@Nullable StringData string, @Nullable StringData delimiter) { +try { +if (string == null || delimiter == null) { +return null; +} +if (delimiter.toString().isEmpty()) { +List res = new ArrayList<>(); +for (int i = 0; i < string.toString().length(); ++i) { Review Comment: can convert the for loop into foreach loop for better readability ``` String str = string.toString(); for (char c : str.toCharArray()) { res.add(StringData.fromString(String.valueOf(c))); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32706][table] Add built-in SPLIT_STRING function [flink]
mohitjain2504 commented on code in PR #24365: URL: https://github.com/apache/flink/pull/24365#discussion_r1607597612 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/SplitFunction.java: ## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.data.binary.BinaryStringDataUtil; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +/** Implementation of {@link BuiltInFunctionDefinitions#SPLIT}. */ +@Internal +public class SplitFunction extends BuiltInScalarFunction { +public SplitFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.SPLIT, context); +} + +public @Nullable ArrayData eval(@Nullable StringData string, @Nullable StringData delimiter) { +try { +if (string == null || delimiter == null) { +return null; +} +if (delimiter.toString().isEmpty()) { +List res = new ArrayList<>(); +for (int i = 0; i < string.toString().length(); ++i) { + res.add(StringData.fromString(String.valueOf(string.toString().charAt(i; +} +return new GenericArrayData(res.toArray()); +} +BinaryStringData[] binaryStringData = Review Comment: Just a thought: Do we need a try-catch block over the whole function or just capturing the below statement will be enough? If we don't expect any exception in the above lines, can keep the statements out of try-catch block. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32706][table] Add built-in SPLIT_STRING function [flink]
mohitjain2504 commented on code in PR #24365: URL: https://github.com/apache/flink/pull/24365#discussion_r1607592570 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/SplitFunction.java: ## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.data.binary.BinaryStringDataUtil; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +/** Implementation of {@link BuiltInFunctionDefinitions#SPLIT}. */ +@Internal +public class SplitFunction extends BuiltInScalarFunction { +public SplitFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.SPLIT, context); +} + +public @Nullable ArrayData eval(@Nullable StringData string, @Nullable StringData delimiter) { +try { +if (string == null || delimiter == null) { +return null; +} +if (delimiter.toString().isEmpty()) { +List res = new ArrayList<>(); Review Comment: It can be initialised directly to str.length() ``` String str = string.toString(); List res = new ArrayList<>(str.length()); ``` This will also help us reduce the redundant calls to `string.toString()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35037][table-planner]Optimize uniqueKeys and upsertKeys inference of windows with ROW_NUMBER. [flink]
libenchao commented on code in PR #24638: URL: https://github.com/apache/flink/pull/24638#discussion_r1607579249 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeysTest.scala: ## @@ -188,7 +188,7 @@ class FlinkRelMdUpsertKeysTest extends FlinkRelMdHandlerTestBase { rank => assertEquals(toBitSet(Array(0)), mq.getUpsertKeys(rank).toSet) } -Array(logicalRowNumber, flinkLogicalRowNumber, streamRowNumber) +Array(logicalWindow, logicalRowNumber, flinkLogicalRowNumber, streamRowNumber) Review Comment: I'm wondering why the existing tests seem already have some ability to infer `row_number` as a part of unique key? ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala: ## @@ -186,28 +186,34 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] { } def getUpsertKeys(rel: Window, mq: RelMetadataQuery): JSet[ImmutableBitSet] = { -getUpsertKeysOnOver(rel, mq, rel.groups.map(_.keys): _*) +getUpsertKeysOnOver(rel, mq) } def getUpsertKeys( rel: BatchPhysicalOverAggregate, mq: RelMetadataQuery): JSet[ImmutableBitSet] = { -getUpsertKeysOnOver(rel, mq, ImmutableBitSet.of(rel.partitionKeyIndices: _*)) +getUpsertKeysOnOver(rel, mq) } def getUpsertKeys( rel: StreamPhysicalOverAggregate, mq: RelMetadataQuery): JSet[ImmutableBitSet] = { -getUpsertKeysOnOver(rel, mq, rel.logicWindow.groups.map(_.keys): _*) +getUpsertKeysOnOver(rel, mq) } private def getUpsertKeysOnOver( - rel: SingleRel, - mq: RelMetadataQuery, - distributionKeys: ImmutableBitSet*): JSet[ImmutableBitSet] = { -var inputKeys = FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(rel.getInput) -for (distributionKey <- distributionKeys) { + window: SingleRel, + mq: RelMetadataQuery): JSet[ImmutableBitSet] = { +var (groups, aggStartPos) = FlinkRelMdUniqueKeys.INSTANCE.getGroupsAndStartPos(window) +var inputKeys = FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(window.getInput) +for (group <- groups) { + val distributionKey = group.keys inputKeys = filterKeys(inputKeys, distributionKey) + FlinkRelMdUniqueKeys.INSTANCE.getUniqueKeysOfWindowGroup(group, aggStartPos) match { +case Some(upsertKeys) => inputKeys.addAll(upsertKeys) +case _ => + } + aggStartPos = aggStartPos + group.aggCalls.length Review Comment: filtering the input keys, and adding the keys inferred via 'partition key and row_number' is orthogonal, can we just use two different for loops, which is more clear? ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala: ## @@ -439,6 +439,18 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata mq: RelMetadataQuery, columns: ImmutableBitSet, ignoreNulls: Boolean): JBoolean = { +var (groups, aggStartPos) = FlinkRelMdUniqueKeys.INSTANCE.getGroupsAndStartPos(overAgg) +for (group <- groups) { + FlinkRelMdUniqueKeys.INSTANCE.getUniqueKeysOfWindowGroup(group, aggStartPos) match { +case Some(upsertKeys) => Review Comment: the name should be `uniqueKeys`? ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala: ## @@ -455,11 +455,56 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu getUniqueKeysOfOverAgg(rel, mq, ignoreNulls) } + def getUniqueKeysOfWindowGroup( + group: Window.Group, + startPos: Int): Option[JSet[ImmutableBitSet]] = { +val retSet = new JHashSet[ImmutableBitSet] +val aggCalls = group.aggCalls +for ((aggCall, offset) <- aggCalls.zipWithIndex) { + // If it's a ROW_NUMBER window, then the unique keys are partition by key and row number. + if (aggCall.getOperator.equals(SqlStdOperatorTable.ROW_NUMBER)) { +val rowNumberColumnIndex = startPos + offset +retSet.add(group.keys.union(ImmutableBitSet.of(rowNumberColumnIndex))) + } +} +if (retSet.isEmpty) { + None +} else { + Some(retSet) +} + } + + def getGroupsAndStartPos(window: SingleRel): Tuple2[JList[Window.Group], Int] = { +val groups: JList[Window.Group] = window match { + case window: Window => window.groups + case streamOverAggregate: StreamPhysicalOverAggregate => +streamOverAggregate.logicWindow.groups + case batchOverAggregate: BatchPhysicalOverAggregate => batchOverAggregate.windowGroups + case _ => throw new IllegalArgumentException("Illegal window type.") +} +val aggCounts = groups.map(_.aggCalls.length).sum +val aggStartIndex = window.getRowType.getFieldCount - aggCounts +
Re: [PR] [hotfix][ForStStateBackend] Create new copy for each SerializedCompositeKeyBuilder in ForStStateBackend [flink]
zoltar9264 commented on PR #24816: URL: https://github.com/apache/flink/pull/24816#issuecomment-2121678039 Please have a check @masteryhx . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [hotfix][ForStStateBackend] Create new copy for each SerializedCompositeKeyBuilder in ForStStateBackend [flink]
zoltar9264 opened a new pull request, #24816: URL: https://github.com/apache/flink/pull/24816 ## What is the purpose of the change This is a internal hotfix within development of ForStStateBackend (see: FLIP-427). ## Brief change log - As title, create new copy for each SerializedCompositeKeyBuilder in ForStStateBackend. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35296) Flink mysql-cdc connector stops reading data
[ https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848037#comment-17848037 ] Gang Yang commented on FLINK-35296: --- Hi,[~ruanhang1993] ,Below is a screenshot of the thread dump !image-2024-05-21-11-16-01-719.png! The parallelism of my job is 2: !image-2024-05-21-11-17-13-441.png! > Flink mysql-cdc connector stops reading data > > > Key: FLINK-35296 > URL: https://issues.apache.org/jira/browse/FLINK-35296 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Gang Yang >Priority: Major > Fix For: cdc-3.2.0 > > Attachments: image-2024-05-06-17-42-19-059.png, > image-2024-05-14-11-25-55-565.png, image-2024-05-21-10-51-08-452.png, > image-2024-05-21-10-51-17-196.png, image-2024-05-21-10-52-02-530.png, > image-2024-05-21-10-52-52-827.png, image-2024-05-21-10-53-05-634.png, > image-2024-05-21-10-53-54-946.png, image-2024-05-21-11-16-01-719.png, > image-2024-05-21-11-17-13-441.png > > > *Background:* > Consume sub-database and sub-table data through regular expressions, > scan.startup.mode=initial > *Problems:* > 1. The task occurs during the snapshot data synchronization phase; > 2. After the task runs normally for a period of time, no more data will be > read. In fact, there is still a lot of data in the upstream Mysql table; > 3. When the task is restarted from the state, it will read normally for a > period of time and then stop reading. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35167][cdc-connector] Introduce MaxCompute pipeline DataSink [flink-cdc]
dingxin-tech commented on PR #3254: URL: https://github.com/apache/flink-cdc/pull/3254#issuecomment-2121641714 > I am wondering how a commercial database sink like MaxCompute to do e2e test? I will soon be working on creating a Docker image for a `MaxCompute Emulator` that launches a mocked version of MaxCompute. This will allow for end-to-end testing to be performed by initializing this image prior to regression testing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35296) Flink mysql-cdc connector stops reading data
[ https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gang Yang updated FLINK-35296: -- Attachment: image-2024-05-21-11-17-13-441.png > Flink mysql-cdc connector stops reading data > > > Key: FLINK-35296 > URL: https://issues.apache.org/jira/browse/FLINK-35296 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Gang Yang >Priority: Major > Fix For: cdc-3.2.0 > > Attachments: image-2024-05-06-17-42-19-059.png, > image-2024-05-14-11-25-55-565.png, image-2024-05-21-10-51-08-452.png, > image-2024-05-21-10-51-17-196.png, image-2024-05-21-10-52-02-530.png, > image-2024-05-21-10-52-52-827.png, image-2024-05-21-10-53-05-634.png, > image-2024-05-21-10-53-54-946.png, image-2024-05-21-11-16-01-719.png, > image-2024-05-21-11-17-13-441.png > > > *Background:* > Consume sub-database and sub-table data through regular expressions, > scan.startup.mode=initial > *Problems:* > 1. The task occurs during the snapshot data synchronization phase; > 2. After the task runs normally for a period of time, no more data will be > read. In fact, there is still a lot of data in the upstream Mysql table; > 3. When the task is restarted from the state, it will read normally for a > period of time and then stop reading. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35296) Flink mysql-cdc connector stops reading data
[ https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gang Yang updated FLINK-35296: -- Attachment: image-2024-05-21-11-16-01-719.png > Flink mysql-cdc connector stops reading data > > > Key: FLINK-35296 > URL: https://issues.apache.org/jira/browse/FLINK-35296 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Gang Yang >Priority: Major > Fix For: cdc-3.2.0 > > Attachments: image-2024-05-06-17-42-19-059.png, > image-2024-05-14-11-25-55-565.png, image-2024-05-21-10-51-08-452.png, > image-2024-05-21-10-51-17-196.png, image-2024-05-21-10-52-02-530.png, > image-2024-05-21-10-52-52-827.png, image-2024-05-21-10-53-05-634.png, > image-2024-05-21-10-53-54-946.png, image-2024-05-21-11-16-01-719.png > > > *Background:* > Consume sub-database and sub-table data through regular expressions, > scan.startup.mode=initial > *Problems:* > 1. The task occurs during the snapshot data synchronization phase; > 2. After the task runs normally for a period of time, no more data will be > read. In fact, there is still a lot of data in the upstream Mysql table; > 3. When the task is restarted from the state, it will read normally for a > period of time and then stop reading. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35296) Flink mysql-cdc connector stops reading data
[ https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848033#comment-17848033 ] Gang Yang commented on FLINK-35296: --- Hi,[~ruanhang1993] ,The following is a screenshot of my customized log com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader#pollSplitRecords !image-2024-05-21-10-51-17-196.png! !image-2024-05-21-10-53-54-946.png! com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader#fetch !image-2024-05-21-10-52-02-530.png! !image-2024-05-21-10-53-05-634.png! > Flink mysql-cdc connector stops reading data > > > Key: FLINK-35296 > URL: https://issues.apache.org/jira/browse/FLINK-35296 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Gang Yang >Priority: Major > Fix For: cdc-3.2.0 > > Attachments: image-2024-05-06-17-42-19-059.png, > image-2024-05-14-11-25-55-565.png, image-2024-05-21-10-51-08-452.png, > image-2024-05-21-10-51-17-196.png, image-2024-05-21-10-52-02-530.png, > image-2024-05-21-10-52-52-827.png, image-2024-05-21-10-53-05-634.png, > image-2024-05-21-10-53-54-946.png > > > *Background:* > Consume sub-database and sub-table data through regular expressions, > scan.startup.mode=initial > *Problems:* > 1. The task occurs during the snapshot data synchronization phase; > 2. After the task runs normally for a period of time, no more data will be > read. In fact, there is still a lot of data in the upstream Mysql table; > 3. When the task is restarted from the state, it will read normally for a > period of time and then stop reading. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35296) Flink mysql-cdc connector stops reading data
[ https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gang Yang updated FLINK-35296: -- Attachment: image-2024-05-21-10-53-54-946.png > Flink mysql-cdc connector stops reading data > > > Key: FLINK-35296 > URL: https://issues.apache.org/jira/browse/FLINK-35296 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Gang Yang >Priority: Major > Fix For: cdc-3.2.0 > > Attachments: image-2024-05-06-17-42-19-059.png, > image-2024-05-14-11-25-55-565.png, image-2024-05-21-10-51-08-452.png, > image-2024-05-21-10-51-17-196.png, image-2024-05-21-10-52-02-530.png, > image-2024-05-21-10-52-52-827.png, image-2024-05-21-10-53-05-634.png, > image-2024-05-21-10-53-54-946.png > > > *Background:* > Consume sub-database and sub-table data through regular expressions, > scan.startup.mode=initial > *Problems:* > 1. The task occurs during the snapshot data synchronization phase; > 2. After the task runs normally for a period of time, no more data will be > read. In fact, there is still a lot of data in the upstream Mysql table; > 3. When the task is restarted from the state, it will read normally for a > period of time and then stop reading. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35296) Flink mysql-cdc connector stops reading data
[ https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gang Yang updated FLINK-35296: -- Attachment: image-2024-05-21-10-53-05-634.png > Flink mysql-cdc connector stops reading data > > > Key: FLINK-35296 > URL: https://issues.apache.org/jira/browse/FLINK-35296 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Gang Yang >Priority: Major > Fix For: cdc-3.2.0 > > Attachments: image-2024-05-06-17-42-19-059.png, > image-2024-05-14-11-25-55-565.png, image-2024-05-21-10-51-08-452.png, > image-2024-05-21-10-51-17-196.png, image-2024-05-21-10-52-02-530.png, > image-2024-05-21-10-52-52-827.png, image-2024-05-21-10-53-05-634.png, > image-2024-05-21-10-53-54-946.png > > > *Background:* > Consume sub-database and sub-table data through regular expressions, > scan.startup.mode=initial > *Problems:* > 1. The task occurs during the snapshot data synchronization phase; > 2. After the task runs normally for a period of time, no more data will be > read. In fact, there is still a lot of data in the upstream Mysql table; > 3. When the task is restarted from the state, it will read normally for a > period of time and then stop reading. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35296) Flink mysql-cdc connector stops reading data
[ https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gang Yang updated FLINK-35296: -- Attachment: image-2024-05-21-10-52-52-827.png > Flink mysql-cdc connector stops reading data > > > Key: FLINK-35296 > URL: https://issues.apache.org/jira/browse/FLINK-35296 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Gang Yang >Priority: Major > Fix For: cdc-3.2.0 > > Attachments: image-2024-05-06-17-42-19-059.png, > image-2024-05-14-11-25-55-565.png, image-2024-05-21-10-51-08-452.png, > image-2024-05-21-10-51-17-196.png, image-2024-05-21-10-52-02-530.png, > image-2024-05-21-10-52-52-827.png > > > *Background:* > Consume sub-database and sub-table data through regular expressions, > scan.startup.mode=initial > *Problems:* > 1. The task occurs during the snapshot data synchronization phase; > 2. After the task runs normally for a period of time, no more data will be > read. In fact, there is still a lot of data in the upstream Mysql table; > 3. When the task is restarted from the state, it will read normally for a > period of time and then stop reading. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35296) Flink mysql-cdc connector stops reading data
[ https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gang Yang updated FLINK-35296: -- Attachment: image-2024-05-21-10-52-02-530.png > Flink mysql-cdc connector stops reading data > > > Key: FLINK-35296 > URL: https://issues.apache.org/jira/browse/FLINK-35296 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Gang Yang >Priority: Major > Fix For: cdc-3.2.0 > > Attachments: image-2024-05-06-17-42-19-059.png, > image-2024-05-14-11-25-55-565.png, image-2024-05-21-10-51-08-452.png, > image-2024-05-21-10-51-17-196.png, image-2024-05-21-10-52-02-530.png, > image-2024-05-21-10-52-52-827.png > > > *Background:* > Consume sub-database and sub-table data through regular expressions, > scan.startup.mode=initial > *Problems:* > 1. The task occurs during the snapshot data synchronization phase; > 2. After the task runs normally for a period of time, no more data will be > read. In fact, there is still a lot of data in the upstream Mysql table; > 3. When the task is restarted from the state, it will read normally for a > period of time and then stop reading. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35296) Flink mysql-cdc connector stops reading data
[ https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gang Yang updated FLINK-35296: -- Attachment: image-2024-05-21-10-51-17-196.png > Flink mysql-cdc connector stops reading data > > > Key: FLINK-35296 > URL: https://issues.apache.org/jira/browse/FLINK-35296 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Gang Yang >Priority: Major > Fix For: cdc-3.2.0 > > Attachments: image-2024-05-06-17-42-19-059.png, > image-2024-05-14-11-25-55-565.png, image-2024-05-21-10-51-08-452.png, > image-2024-05-21-10-51-17-196.png > > > *Background:* > Consume sub-database and sub-table data through regular expressions, > scan.startup.mode=initial > *Problems:* > 1. The task occurs during the snapshot data synchronization phase; > 2. After the task runs normally for a period of time, no more data will be > read. In fact, there is still a lot of data in the upstream Mysql table; > 3. When the task is restarted from the state, it will read normally for a > period of time and then stop reading. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35296) Flink mysql-cdc connector stops reading data
[ https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gang Yang updated FLINK-35296: -- Attachment: image-2024-05-21-10-51-08-452.png > Flink mysql-cdc connector stops reading data > > > Key: FLINK-35296 > URL: https://issues.apache.org/jira/browse/FLINK-35296 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Gang Yang >Priority: Major > Fix For: cdc-3.2.0 > > Attachments: image-2024-05-06-17-42-19-059.png, > image-2024-05-14-11-25-55-565.png, image-2024-05-21-10-51-08-452.png, > image-2024-05-21-10-51-17-196.png > > > *Background:* > Consume sub-database and sub-table data through regular expressions, > scan.startup.mode=initial > *Problems:* > 1. The task occurs during the snapshot data synchronization phase; > 2. After the task runs normally for a period of time, no more data will be > read. In fact, there is still a lot of data in the upstream Mysql table; > 3. When the task is restarted from the state, it will read normally for a > period of time and then stop reading. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35382) ChangelogCompatibilityITCase.testRestore fails with an NPE
[ https://issues.apache.org/jira/browse/FLINK-35382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848030#comment-17848030 ] Yanfei Lei edited comment on FLINK-35382 at 5/21/24 2:35 AM: - Merged into master via 26b149a. Let's observe for a while and then close it. was (Author: yanfei lei): Merged into master via 26b149a. > ChangelogCompatibilityITCase.testRestore fails with an NPE > -- > > Key: FLINK-35382 > URL: https://issues.apache.org/jira/browse/FLINK-35382 > Project: Flink > Issue Type: Bug >Affects Versions: 1.20.0 >Reporter: Ryan Skraba >Assignee: Jinzhong Li >Priority: Critical > Labels: pull-request-available, test-stability > > * 1.20 Java 8 / Test (module: tests) > https://github.com/apache/flink/actions/runs/9110398985/job/25045798401#step:10:8192 > It looks like there can be a [NullPointerException at this > line|https://github.com/apache/flink/blob/9a5a99b1a30054268bbde36d565cbb1b81018890/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java#L666] > causing a test failure: > {code} > Error: 10:36:23 10:36:23.312 [ERROR] Tests run: 9, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 19.31 s <<< FAILURE! -- in > org.apache.flink.test.state.ChangelogCompatibilityITCase > Error: 10:36:23 10:36:23.313 [ERROR] > org.apache.flink.test.state.ChangelogCompatibilityITCase.testRestore[startWithChangelog=false, > restoreWithChangelog=true, restoreFrom=CHECKPOINT, allowStore=true, > allowRestore=true] -- Time elapsed: 1.492 s <<< ERROR! > May 16 10:36:23 java.lang.RuntimeException: > org.opentest4j.AssertionFailedError: Graph is in globally terminal state > (FAILED) > May 16 10:36:23 at > org.apache.flink.test.state.ChangelogCompatibilityITCase.tryRun(ChangelogCompatibilityITCase.java:204) > May 16 10:36:23 at > org.apache.flink.test.state.ChangelogCompatibilityITCase.restoreAndValidate(ChangelogCompatibilityITCase.java:190) > May 16 10:36:23 at java.util.Optional.ifPresent(Optional.java:159) > May 16 10:36:23 at > org.apache.flink.test.state.ChangelogCompatibilityITCase.testRestore(ChangelogCompatibilityITCase.java:118) > May 16 10:36:23 at java.lang.reflect.Method.invoke(Method.java:498) > May 16 10:36:23 Caused by: org.opentest4j.AssertionFailedError: Graph is in > globally terminal state (FAILED) > May 16 10:36:23 at > org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:42) > May 16 10:36:23 at > org.junit.jupiter.api.Assertions.fail(Assertions.java:150) > May 16 10:36:23 at > org.apache.flink.runtime.testutils.CommonTestUtils.lambda$waitForAllTaskRunning$3(CommonTestUtils.java:214) > May 16 10:36:23 at > org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:151) > May 16 10:36:23 at > org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:145) > May 16 10:36:23 at > org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning(CommonTestUtils.java:209) > May 16 10:36:23 at > org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning(CommonTestUtils.java:182) > May 16 10:36:23 at > org.apache.flink.test.state.ChangelogCompatibilityITCase.submit(ChangelogCompatibilityITCase.java:284) > May 16 10:36:23 at > org.apache.flink.test.state.ChangelogCompatibilityITCase.tryRun(ChangelogCompatibilityITCase.java:197) > May 16 10:36:23 ... 4 more > May 16 10:36:23 Caused by: org.apache.flink.runtime.JobException: > org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > May 16 10:36:23 at > org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219) > May 16 10:36:23 at > org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166) > May 16 10:36:23 at > org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121) > May 16 10:36:23 at > org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:279) > May 16 10:36:23 at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:270) > May 16 10:36:23 at > org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:263) > May 16 10:36:23 at > org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:788) > May 16 10:36:23 at >
[jira] [Resolved] (FLINK-35382) ChangelogCompatibilityITCase.testRestore fails with an NPE
[ https://issues.apache.org/jira/browse/FLINK-35382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yanfei Lei resolved FLINK-35382. Assignee: Jinzhong Li Resolution: Fixed > ChangelogCompatibilityITCase.testRestore fails with an NPE > -- > > Key: FLINK-35382 > URL: https://issues.apache.org/jira/browse/FLINK-35382 > Project: Flink > Issue Type: Bug >Affects Versions: 1.20.0 >Reporter: Ryan Skraba >Assignee: Jinzhong Li >Priority: Critical > Labels: pull-request-available, test-stability > > * 1.20 Java 8 / Test (module: tests) > https://github.com/apache/flink/actions/runs/9110398985/job/25045798401#step:10:8192 > It looks like there can be a [NullPointerException at this > line|https://github.com/apache/flink/blob/9a5a99b1a30054268bbde36d565cbb1b81018890/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java#L666] > causing a test failure: > {code} > Error: 10:36:23 10:36:23.312 [ERROR] Tests run: 9, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 19.31 s <<< FAILURE! -- in > org.apache.flink.test.state.ChangelogCompatibilityITCase > Error: 10:36:23 10:36:23.313 [ERROR] > org.apache.flink.test.state.ChangelogCompatibilityITCase.testRestore[startWithChangelog=false, > restoreWithChangelog=true, restoreFrom=CHECKPOINT, allowStore=true, > allowRestore=true] -- Time elapsed: 1.492 s <<< ERROR! > May 16 10:36:23 java.lang.RuntimeException: > org.opentest4j.AssertionFailedError: Graph is in globally terminal state > (FAILED) > May 16 10:36:23 at > org.apache.flink.test.state.ChangelogCompatibilityITCase.tryRun(ChangelogCompatibilityITCase.java:204) > May 16 10:36:23 at > org.apache.flink.test.state.ChangelogCompatibilityITCase.restoreAndValidate(ChangelogCompatibilityITCase.java:190) > May 16 10:36:23 at java.util.Optional.ifPresent(Optional.java:159) > May 16 10:36:23 at > org.apache.flink.test.state.ChangelogCompatibilityITCase.testRestore(ChangelogCompatibilityITCase.java:118) > May 16 10:36:23 at java.lang.reflect.Method.invoke(Method.java:498) > May 16 10:36:23 Caused by: org.opentest4j.AssertionFailedError: Graph is in > globally terminal state (FAILED) > May 16 10:36:23 at > org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:42) > May 16 10:36:23 at > org.junit.jupiter.api.Assertions.fail(Assertions.java:150) > May 16 10:36:23 at > org.apache.flink.runtime.testutils.CommonTestUtils.lambda$waitForAllTaskRunning$3(CommonTestUtils.java:214) > May 16 10:36:23 at > org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:151) > May 16 10:36:23 at > org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:145) > May 16 10:36:23 at > org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning(CommonTestUtils.java:209) > May 16 10:36:23 at > org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning(CommonTestUtils.java:182) > May 16 10:36:23 at > org.apache.flink.test.state.ChangelogCompatibilityITCase.submit(ChangelogCompatibilityITCase.java:284) > May 16 10:36:23 at > org.apache.flink.test.state.ChangelogCompatibilityITCase.tryRun(ChangelogCompatibilityITCase.java:197) > May 16 10:36:23 ... 4 more > May 16 10:36:23 Caused by: org.apache.flink.runtime.JobException: > org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > May 16 10:36:23 at > org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219) > May 16 10:36:23 at > org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166) > May 16 10:36:23 at > org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121) > May 16 10:36:23 at > org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:279) > May 16 10:36:23 at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:270) > May 16 10:36:23 at > org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:263) > May 16 10:36:23 at > org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:788) > May 16 10:36:23 at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:765) > May 16 10:36:23 at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83) > May 16 10:36:23 at >
Re: [PR] [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f… [flink]
wForget commented on PR #22009: URL: https://github.com/apache/flink/pull/22009#issuecomment-2121596615 I've tried specifying proxyUser using `HADOOP_PROXY_USER` and maintaining the delegation tokens with an external service and sending them into jobmanager, but I can't seem to update delegation tokens of taskmanager. Flink seems to use `DefaultDelegationTokenManager#listener` to update the delegation tokens of taskmanager. Can we provide a custom `DelegationTokenManager`? @venkata91 @gaborgsomogyi Could you please take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35382) ChangelogCompatibilityITCase.testRestore fails with an NPE
[ https://issues.apache.org/jira/browse/FLINK-35382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848030#comment-17848030 ] Yanfei Lei commented on FLINK-35382: Merged into master via 26b149a. > ChangelogCompatibilityITCase.testRestore fails with an NPE > -- > > Key: FLINK-35382 > URL: https://issues.apache.org/jira/browse/FLINK-35382 > Project: Flink > Issue Type: Bug >Affects Versions: 1.20.0 >Reporter: Ryan Skraba >Priority: Critical > Labels: pull-request-available, test-stability > > * 1.20 Java 8 / Test (module: tests) > https://github.com/apache/flink/actions/runs/9110398985/job/25045798401#step:10:8192 > It looks like there can be a [NullPointerException at this > line|https://github.com/apache/flink/blob/9a5a99b1a30054268bbde36d565cbb1b81018890/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java#L666] > causing a test failure: > {code} > Error: 10:36:23 10:36:23.312 [ERROR] Tests run: 9, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 19.31 s <<< FAILURE! -- in > org.apache.flink.test.state.ChangelogCompatibilityITCase > Error: 10:36:23 10:36:23.313 [ERROR] > org.apache.flink.test.state.ChangelogCompatibilityITCase.testRestore[startWithChangelog=false, > restoreWithChangelog=true, restoreFrom=CHECKPOINT, allowStore=true, > allowRestore=true] -- Time elapsed: 1.492 s <<< ERROR! > May 16 10:36:23 java.lang.RuntimeException: > org.opentest4j.AssertionFailedError: Graph is in globally terminal state > (FAILED) > May 16 10:36:23 at > org.apache.flink.test.state.ChangelogCompatibilityITCase.tryRun(ChangelogCompatibilityITCase.java:204) > May 16 10:36:23 at > org.apache.flink.test.state.ChangelogCompatibilityITCase.restoreAndValidate(ChangelogCompatibilityITCase.java:190) > May 16 10:36:23 at java.util.Optional.ifPresent(Optional.java:159) > May 16 10:36:23 at > org.apache.flink.test.state.ChangelogCompatibilityITCase.testRestore(ChangelogCompatibilityITCase.java:118) > May 16 10:36:23 at java.lang.reflect.Method.invoke(Method.java:498) > May 16 10:36:23 Caused by: org.opentest4j.AssertionFailedError: Graph is in > globally terminal state (FAILED) > May 16 10:36:23 at > org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:42) > May 16 10:36:23 at > org.junit.jupiter.api.Assertions.fail(Assertions.java:150) > May 16 10:36:23 at > org.apache.flink.runtime.testutils.CommonTestUtils.lambda$waitForAllTaskRunning$3(CommonTestUtils.java:214) > May 16 10:36:23 at > org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:151) > May 16 10:36:23 at > org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:145) > May 16 10:36:23 at > org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning(CommonTestUtils.java:209) > May 16 10:36:23 at > org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning(CommonTestUtils.java:182) > May 16 10:36:23 at > org.apache.flink.test.state.ChangelogCompatibilityITCase.submit(ChangelogCompatibilityITCase.java:284) > May 16 10:36:23 at > org.apache.flink.test.state.ChangelogCompatibilityITCase.tryRun(ChangelogCompatibilityITCase.java:197) > May 16 10:36:23 ... 4 more > May 16 10:36:23 Caused by: org.apache.flink.runtime.JobException: > org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > May 16 10:36:23 at > org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219) > May 16 10:36:23 at > org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166) > May 16 10:36:23 at > org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121) > May 16 10:36:23 at > org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:279) > May 16 10:36:23 at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:270) > May 16 10:36:23 at > org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:263) > May 16 10:36:23 at > org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:788) > May 16 10:36:23 at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:765) > May 16 10:36:23 at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83) > May 16 10:36:23 at >
Re: [PR] [FLINK-35382][test] Disable snapshot-file-merging in ChangelogCompabilityITCase [flink]
fredia merged PR #24813: URL: https://github.com/apache/flink/pull/24813 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35277][cdc-connector][db2] Fix the error in the `asncdcaddremove.sql` script for the DB2 test container. [flink-cdc]
gong commented on PR #3286: URL: https://github.com/apache/flink-cdc/pull/3286#issuecomment-2121589464 LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-35296) Flink mysql-cdc connector stops reading data
[ https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848028#comment-17848028 ] Gang Yang edited comment on FLINK-35296 at 5/21/24 2:19 AM: Hi,[~ruanhang1993] At present, I found that I am stuck in:com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader#close mainly here: if (statefulTaskContext.getBinaryLogClient() != null) { statefulTaskContext.getBinaryLogClient().disconnect(); } was (Author: 清月): At present, I found that I am stuck in:com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader#close mainly here: if (statefulTaskContext.getBinaryLogClient() != null) { statefulTaskContext.getBinaryLogClient().disconnect(); } > Flink mysql-cdc connector stops reading data > > > Key: FLINK-35296 > URL: https://issues.apache.org/jira/browse/FLINK-35296 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Gang Yang >Priority: Major > Fix For: cdc-3.2.0 > > Attachments: image-2024-05-06-17-42-19-059.png, > image-2024-05-14-11-25-55-565.png > > > *Background:* > Consume sub-database and sub-table data through regular expressions, > scan.startup.mode=initial > *Problems:* > 1. The task occurs during the snapshot data synchronization phase; > 2. After the task runs normally for a period of time, no more data will be > read. In fact, there is still a lot of data in the upstream Mysql table; > 3. When the task is restarted from the state, it will read normally for a > period of time and then stop reading. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35296) Flink mysql-cdc connector stops reading data
[ https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848028#comment-17848028 ] Gang Yang edited comment on FLINK-35296 at 5/21/24 2:18 AM: At present, I found that I am stuck in:com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader#close mainly here: if (statefulTaskContext.getBinaryLogClient() != null) { statefulTaskContext.getBinaryLogClient().disconnect(); } was (Author: 清月): !https://alidocs.dingtalk.com/core/api/resources/img/5eecdaf48460cde5c76d9b669a1468175eb12bc271ba88b9ac1d964c3b7cbddd8fd9558551b5fe3aec177c308ebd5304aa134dd402f30bcfbd0496da3fc6ae06de22bfffe0648d0c6f0fb69a5e40eefc547fcc4b7fabaaa44fb4c8ed7016461c?tmpCode=a51ae03b-b271-4e26-82eb-741ca6a184e9! At present, I found that I am stuck in:com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader#close > Flink mysql-cdc connector stops reading data > > > Key: FLINK-35296 > URL: https://issues.apache.org/jira/browse/FLINK-35296 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Gang Yang >Priority: Major > Fix For: cdc-3.2.0 > > Attachments: image-2024-05-06-17-42-19-059.png, > image-2024-05-14-11-25-55-565.png > > > *Background:* > Consume sub-database and sub-table data through regular expressions, > scan.startup.mode=initial > *Problems:* > 1. The task occurs during the snapshot data synchronization phase; > 2. After the task runs normally for a period of time, no more data will be > read. In fact, there is still a lot of data in the upstream Mysql table; > 3. When the task is restarted from the state, it will read normally for a > period of time and then stop reading. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35296) Flink mysql-cdc connector stops reading data
[ https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848028#comment-17848028 ] Gang Yang commented on FLINK-35296: --- !https://alidocs.dingtalk.com/core/api/resources/img/5eecdaf48460cde5c76d9b669a1468175eb12bc271ba88b9ac1d964c3b7cbddd8fd9558551b5fe3aec177c308ebd5304aa134dd402f30bcfbd0496da3fc6ae06de22bfffe0648d0c6f0fb69a5e40eefc547fcc4b7fabaaa44fb4c8ed7016461c?tmpCode=a51ae03b-b271-4e26-82eb-741ca6a184e9! At present, I found that I am stuck in:com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader#close > Flink mysql-cdc connector stops reading data > > > Key: FLINK-35296 > URL: https://issues.apache.org/jira/browse/FLINK-35296 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Gang Yang >Priority: Major > Fix For: cdc-3.2.0 > > Attachments: image-2024-05-06-17-42-19-059.png, > image-2024-05-14-11-25-55-565.png > > > *Background:* > Consume sub-database and sub-table data through regular expressions, > scan.startup.mode=initial > *Problems:* > 1. The task occurs during the snapshot data synchronization phase; > 2. After the task runs normally for a period of time, no more data will be > read. In fact, there is still a lot of data in the upstream Mysql table; > 3. When the task is restarted from the state, it will read normally for a > period of time and then stop reading. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [hotfix][cdc][docs] Supplementary answer to the Mysql CDC Q15 question [flink-cdc]
PONYLEE opened a new pull request, #3337: URL: https://github.com/apache/flink-cdc/pull/3337 Supplementary answer to the [Mysql CDC Q15] question -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35379][Checkpoint] Fix incorrect checkpoint notification handling in file merging [flink]
Zakelly merged PR #24806: URL: https://github.com/apache/flink/pull/24806 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34456][configuration]Move all checkpoint-related options into CheckpointingOptions [flink]
spoon-lz commented on PR #24374: URL: https://github.com/apache/flink/pull/24374#issuecomment-2121565086 @Zakelly New code has been submitted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
HuangZhenQiu commented on PR #24754: URL: https://github.com/apache/flink/pull/24754#issuecomment-2121563878 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35047][state] Shutdown StateExecutors when ForStKeyedStateBackend is closed [flink]
masteryhx commented on code in PR #24768: URL: https://github.com/apache/flink/pull/24768#discussion_r1607490839 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java: ## @@ -147,15 +160,30 @@ public S createState(@Nonnull StateDescriptor stateDes @Override @Nonnull public StateExecutor createStateExecutor() { -// TODO: Make io parallelism configurable -return new ForStStateExecutor(4, db, optionsContainer.getWriteOptions()); +synchronized (lock) { Review Comment: Thanks for the update. It makes sense to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35354] Support host mapping in Flink tikv cdc [flink-cdc]
Mrart opened a new pull request, #3336: URL: https://github.com/apache/flink-cdc/pull/3336 In tidb production environment deployment, there are usually two kinds of network: internal network and public network. When we use pd mode in tikv, we need to do network mapping, such as `spark.tispark.host_mapping` in https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md. So I think we need support `host_mapping` in our Flink tikv cdc connector. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35354) Support host mapping in Flink tikv cdc
[ https://issues.apache.org/jira/browse/FLINK-35354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35354: --- Labels: pull-request-available (was: ) > Support host mapping in Flink tikv cdc > -- > > Key: FLINK-35354 > URL: https://issues.apache.org/jira/browse/FLINK-35354 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.1.0, cdc-3.2.0 >Reporter: ouyangwulin >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.2.0 > > > In tidb production environment deployment, there are usually two kinds of > network: internal network and public network. When we use pd mode in tikv, we > need to do network mapping, such as `spark.tispark.host_mapping` in > [https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I > think we need support `host_mapping` in our Flink tikv cdc connector. > > Add param: > tikv.host_mapping:192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-25537) [JUnit5 Migration] Module: flink-core
[ https://issues.apache.org/jira/browse/FLINK-25537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845561#comment-17845561 ] Rui Fan edited comment on FLINK-25537 at 5/21/24 1:29 AM: -- Merged to master(1.20.0) via: * 5af9acba5b9fbcdf9aadf62310cd337d508158c3 * e5398e1025ec4312bac74a8b32b98d03cb254667 was (Author: fanrui): Merged to master(1.20.0) via: 5af9acba5b9fbcdf9aadf62310cd337d508158c3 > [JUnit5 Migration] Module: flink-core > - > > Key: FLINK-25537 > URL: https://issues.apache.org/jira/browse/FLINK-25537 > Project: Flink > Issue Type: Sub-task > Components: API / Core >Reporter: Qingsheng Ren >Assignee: Aiden Gong >Priority: Minor > Labels: pull-request-available, stale-assigned > Fix For: 1.20.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-25537][JUnit5 Migration] Module: flink-core with,Package: types [flink]
1996fanrui merged PR #24613: URL: https://github.com/apache/flink/pull/24613 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35359][config] General Improvement to Configuration for Flink 2.0 [flink]
flinkbot commented on PR #24815: URL: https://github.com/apache/flink/pull/24815#issuecomment-2121526672 ## CI report: * 3fc215385500d8eb69d85c860e6e936ab103eae4 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35359) General Improvement to Configuration for Flink 2.0
[ https://issues.apache.org/jira/browse/FLINK-35359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35359: --- Labels: pull-request-available (was: ) > General Improvement to Configuration for Flink 2.0 > -- > > Key: FLINK-35359 > URL: https://issues.apache.org/jira/browse/FLINK-35359 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Reporter: Xuannan Su >Priority: Major > Labels: pull-request-available > > As Flink moves toward version 2.0, we want to provide users with a better > experience with the existing configuration. In this FLIP, we outline several > general improvements to the current configuration: > * Ensure all the ConfigOptions are properly annotated > * Ensure all user-facing configurations are included in the documentation > generation process > * Make the existing ConfigOptions use the proper type > * Mark all internally used ConfigOptions with the @Internal annotation > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-25537][JUnit5 Migration] Module: flink-core with,Package: types [flink]
1996fanrui commented on code in PR #24613: URL: https://github.com/apache/flink/pull/24613#discussion_r1598105369 ## flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java: ## @@ -51,24 +49,24 @@ public class BasicTypeInfoTest extends TestLogger { }; @Test -public void testBasicTypeInfoEquality() { +void testBasicTypeInfoEquality() { for (Class clazz : classes) { BasicTypeInfo tpeInfo1 = BasicTypeInfo.getInfoFor(clazz); BasicTypeInfo tpeInfo2 = BasicTypeInfo.getInfoFor(clazz); -assertEquals(tpeInfo1, tpeInfo2); -assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode()); +assertThat(tpeInfo2).isEqualTo(tpeInfo1); +assertThat(tpeInfo2.hashCode()).isEqualTo(tpeInfo1.hashCode()); Review Comment: hasSameHashCode -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35359][config] General Improvement to Configuration for Flink 2.0 [flink]
Sxnan opened a new pull request, #24815: URL: https://github.com/apache/flink/pull/24815 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35403] Allow Skipping Invocation of Function Calls While Constant-folding [flink]
flinkbot commented on PR #24814: URL: https://github.com/apache/flink/pull/24814#issuecomment-2121448911 ## CI report: * 78c4501b833f0860ce931ef91c1efaae71d90e09 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35403) FLIP-452: Allow Skipping Invocation of Function Calls While Constant-folding
[ https://issues.apache.org/jira/browse/FLINK-35403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35403: --- Labels: pull-request-available (was: ) > FLIP-452: Allow Skipping Invocation of Function Calls While Constant-folding > > > Key: FLINK-35403 > URL: https://issues.apache.org/jira/browse/FLINK-35403 > Project: Flink > Issue Type: Improvement >Reporter: Alan Sheinberg >Assignee: Alan Sheinberg >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35403] Allow Skipping Invocation of Function Calls While Constant-folding [flink]
AlanConfluent opened a new pull request, #24814: URL: https://github.com/apache/flink/pull/24814 ## What is the purpose of the change This PR adds a new method to `FunctionDefinition`, `supportsConstantFolding` which indicates of a function call should have constant expression reduction done during planning time. For example, `AddOne(10)` could be reduced to `11` by invoking `AddOne` during planning time. This is the default and current Flink functionality. This addition allows overriding the default behavior and returning false, which would keep the expression intact, to be invoked at runtime instead. This functionality and motivation is covered in more detail in [FLIP 452](https://cwiki.apache.org/confluence/display/FLINK/FLIP-452%3A+Allow+Skipping+Invocation+of+Function+Calls+While+Constant-folding). ## Verifying this change - Added new test cases to `ExpressionReductionRulesTest`. - These test that existing plans are unchanged - New tests show that disabling constant folding also keeps plans without reducing away the invocation ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - It shouldn't affect default behavior, which is to constant fold - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35404] Report all metrics of KafkaConsumer in new Kafka source [flink-connector-kafka]
boring-cyborg[bot] commented on PR #103: URL: https://github.com/apache/flink-connector-kafka/pull/103#issuecomment-2121252403 Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35404] Report all metrics of KafkaConsumer in new Kafka source [flink-connector-kafka]
hmit opened a new pull request, #103: URL: https://github.com/apache/flink-connector-kafka/pull/103 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-35404) Report metrics of KafkaConsumer in new Kafka source
[ https://issues.apache.org/jira/browse/FLINK-35404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847993#comment-17847993 ] Harshit Mittal edited comment on FLINK-35404 at 5/20/24 8:55 PM: - A better way to solve this problem is to implement interface `MetricsReporter` from kafka metrics using a MetricGroup from apache-flink. was (Author: hmittal83): A better way to solve this problem is to implement KafkaMetrics reporter interface using a MetricGroup from apache-flink. > Report metrics of KafkaConsumer in new Kafka source > --- > > Key: FLINK-35404 > URL: https://issues.apache.org/jira/browse/FLINK-35404 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.13.0 >Reporter: Harshit Mittal >Assignee: Qingsheng Ren >Priority: Major > Labels: pull-request-available > > Currently new Kafka source only registers metrics that are available on > KafkaPartitionSplitReader initialization. However, there are metrics added > later in the lifecycle like consumer lag metrics that are missing from the > KafkaConsumer metric group. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35404) Report metrics of KafkaConsumer in new Kafka source
[ https://issues.apache.org/jira/browse/FLINK-35404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Harshit Mittal updated FLINK-35404: --- Description: Currently new Kafka source only registers metrics that are available on KafkaPartitionSplitReader initialization. However, there are metrics added later in the lifecycle like consumer lag metrics that are missing from the KafkaConsumer metric group. was:Currently Kafka new source doesn't register metrics of KafkaConsumer in KafkaPartitionSplitReader. These metrics should be added for debugging and monitoring purpose. > Report metrics of KafkaConsumer in new Kafka source > --- > > Key: FLINK-35404 > URL: https://issues.apache.org/jira/browse/FLINK-35404 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.13.0 >Reporter: Harshit Mittal >Assignee: Qingsheng Ren >Priority: Major > Labels: pull-request-available > > Currently new Kafka source only registers metrics that are available on > KafkaPartitionSplitReader initialization. However, there are metrics added > later in the lifecycle like consumer lag metrics that are missing from the > KafkaConsumer metric group. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35404) Report metrics of KafkaConsumer in new Kafka source
[ https://issues.apache.org/jira/browse/FLINK-35404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847993#comment-17847993 ] Harshit Mittal commented on FLINK-35404: A better way to solve this problem is to implement KafkaMetrics reporter interface using a MetricGroup from apache-flink. > Report metrics of KafkaConsumer in new Kafka source > --- > > Key: FLINK-35404 > URL: https://issues.apache.org/jira/browse/FLINK-35404 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.13.0 >Reporter: Harshit Mittal >Assignee: Qingsheng Ren >Priority: Major > Labels: pull-request-available > > Currently new Kafka source only registers metrics that are available on > KafkaPartitionSplitReader initialization. However, there are metrics added > later in the lifecycle like consumer lag metrics that are missing from the > KafkaConsumer metric group. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35404) Report metrics of KafkaConsumer in new Kafka source
[ https://issues.apache.org/jira/browse/FLINK-35404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Harshit Mittal updated FLINK-35404: --- Fix Version/s: (was: 1.14.0) (was: 1.13.2) > Report metrics of KafkaConsumer in new Kafka source > --- > > Key: FLINK-35404 > URL: https://issues.apache.org/jira/browse/FLINK-35404 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.13.0 >Reporter: Harshit Mittal >Assignee: Qingsheng Ren >Priority: Major > Labels: pull-request-available > > Currently Kafka new source doesn't register metrics of KafkaConsumer in > KafkaPartitionSplitReader. These metrics should be added for debugging and > monitoring purpose. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35404) Report metrics of KafkaConsumer in new Kafka source
Harshit Mittal created FLINK-35404: -- Summary: Report metrics of KafkaConsumer in new Kafka source Key: FLINK-35404 URL: https://issues.apache.org/jira/browse/FLINK-35404 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.13.0 Reporter: Harshit Mittal Assignee: Qingsheng Ren Fix For: 1.14.0, 1.13.2 Currently Kafka new source doesn't register metrics of KafkaConsumer in KafkaPartitionSplitReader. These metrics should be added for debugging and monitoring purpose. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35403) FLIP-452: Allow Skipping Invocation of Function Calls While Constant-folding
Alan Sheinberg created FLINK-35403: -- Summary: FLIP-452: Allow Skipping Invocation of Function Calls While Constant-folding Key: FLINK-35403 URL: https://issues.apache.org/jira/browse/FLINK-35403 Project: Flink Issue Type: Improvement Reporter: Alan Sheinberg Assignee: Alan Sheinberg -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on PR #24784: URL: https://github.com/apache/flink/pull/24784#issuecomment-2120974575 Sorry to bother you again, but I made a separate one [Pull request with hotfix](https://github.com/apache/flink/pull/24811) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1607065866 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ## @@ -421,7 +423,27 @@ public void reDistributeInputChannelStates(TaskStateAssignment stateAssignment) stateAssignment.oldState.get(stateAssignment.inputOperatorID); final List> inputOperatorState = splitBySubtasks(inputState, OperatorSubtaskState::getInputChannelState); -if (inputState.getParallelism() == executionJobVertex.getParallelism()) { + +boolean noNeedRescale = + stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() +.map(JobEdge::getDownstreamSubtaskStateMapper) +.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) +&& stateAssignment.executionJobVertex.getInputs().stream() +.map(IntermediateResult::getProducer) +.map(vertexAssignments::get) +.anyMatch( +taskStateAssignment -> { +final int oldParallelism = +stateAssignment +.oldState + .get(stateAssignment.inputOperatorID) +.getParallelism(); +return oldParallelism +== taskStateAssignment.executionJobVertex +.getParallelism(); +}); Review Comment: Thanks for the analysis. I think you can change a test with incomplete FULL, you are right. I hope I didn't make a mistake with the mistake note last time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1607065866 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ## @@ -421,7 +423,27 @@ public void reDistributeInputChannelStates(TaskStateAssignment stateAssignment) stateAssignment.oldState.get(stateAssignment.inputOperatorID); final List> inputOperatorState = splitBySubtasks(inputState, OperatorSubtaskState::getInputChannelState); -if (inputState.getParallelism() == executionJobVertex.getParallelism()) { + +boolean noNeedRescale = + stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() +.map(JobEdge::getDownstreamSubtaskStateMapper) +.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) +&& stateAssignment.executionJobVertex.getInputs().stream() +.map(IntermediateResult::getProducer) +.map(vertexAssignments::get) +.anyMatch( +taskStateAssignment -> { +final int oldParallelism = +stateAssignment +.oldState + .get(stateAssignment.inputOperatorID) +.getParallelism(); +return oldParallelism +== taskStateAssignment.executionJobVertex +.getParallelism(); +}); Review Comment: Thanks for the analysis. I think you can add a test with incomplete FULL, you are right. I hope I didn't make a mistake with the mistake note last time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1607065866 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ## @@ -421,7 +423,27 @@ public void reDistributeInputChannelStates(TaskStateAssignment stateAssignment) stateAssignment.oldState.get(stateAssignment.inputOperatorID); final List> inputOperatorState = splitBySubtasks(inputState, OperatorSubtaskState::getInputChannelState); -if (inputState.getParallelism() == executionJobVertex.getParallelism()) { + +boolean noNeedRescale = + stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() +.map(JobEdge::getDownstreamSubtaskStateMapper) +.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) +&& stateAssignment.executionJobVertex.getInputs().stream() +.map(IntermediateResult::getProducer) +.map(vertexAssignments::get) +.anyMatch( +taskStateAssignment -> { +final int oldParallelism = +stateAssignment +.oldState + .get(stateAssignment.inputOperatorID) +.getParallelism(); +return oldParallelism +== taskStateAssignment.executionJobVertex +.getParallelism(); +}); Review Comment: Thanks for the analysis. I think you can add a test with incomplete FULL, you are right. I hope I didn't make a mistake with the error note last time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1607061596 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.FileUtils; + +import org.junit.After; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; + +import static org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment; +import static org.junit.Assert.fail; + +/** Integration test for performing rescale of unaligned checkpoint with custom partitioner. */ +public class UnalignedCheckpointCustomRescaleITCase { Review Comment: A difficult way out of the situation, but quite feasible, ok. Thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35402) [flink-operator][Deployment] add labels to metadata
[ https://issues.apache.org/jira/browse/FLINK-35402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luís Costa updated FLINK-35402: --- Description: Greetings dear team, I would like to add labels to flink-operator Deployment metadata - https://github.com/apache/flink-kubernetes-operator/pull/829 Best regards, Luís Costa was: Greetings dear team, I would like to add labels to flink-operator Deployment metadata > [flink-operator][Deployment] add labels to metadata > --- > > Key: FLINK-35402 > URL: https://issues.apache.org/jira/browse/FLINK-35402 > Project: Flink > Issue Type: Improvement > Components: flink-contrib >Reporter: Luís Costa >Priority: Minor > Labels: pull-request-available > > Greetings dear team, > I would like to add labels to flink-operator Deployment metadata - > https://github.com/apache/flink-kubernetes-operator/pull/829 > Best regards, > Luís Costa -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] FLINK-35402 [flink-operator][Deployment] add labels to metadata [flink-kubernetes-operator]
luismacosta opened a new pull request, #829: URL: https://github.com/apache/flink-kubernetes-operator/pull/829 FLINK-35402 [flink-operator][Deployment] add labels to metadata - https://issues.apache.org/jira/browse/FLINK-35402 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35402) [flink-operator][Deployment] add labels to metadata
[ https://issues.apache.org/jira/browse/FLINK-35402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35402: --- Labels: pull-request-available (was: ) > [flink-operator][Deployment] add labels to metadata > --- > > Key: FLINK-35402 > URL: https://issues.apache.org/jira/browse/FLINK-35402 > Project: Flink > Issue Type: Improvement > Components: flink-contrib >Reporter: Luís Costa >Priority: Minor > Labels: pull-request-available > > Greetings dear team, > I would like to add labels to flink-operator Deployment metadata -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35402) [flink-operator][Deployment] add labels to metadata
Luís Costa created FLINK-35402: -- Summary: [flink-operator][Deployment] add labels to metadata Key: FLINK-35402 URL: https://issues.apache.org/jira/browse/FLINK-35402 Project: Flink Issue Type: Improvement Components: flink-contrib Reporter: Luís Costa Greetings dear team, I would like to add labels to flink-operator Deployment metadata -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32706][table] Add built-in SPLIT_STRING function [flink]
hanyuzheng7 commented on code in PR #24365: URL: https://github.com/apache/flink/pull/24365#discussion_r1606922419 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/SplitFunction.java: ## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.data.binary.BinaryStringDataUtil; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +/** Implementation of {@link BuiltInFunctionDefinitions#SPLIT}. */ +@Internal +public class SplitFunction extends BuiltInScalarFunction { +public SplitFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.SPLIT, context); +} + +public @Nullable ArrayData eval(@Nullable StringData string, @Nullable StringData delimiter) { +try { +if (string == null || delimiter == null) { +return null; +} +if (delimiter.toString().isEmpty()) { +List res = new ArrayList<>(); +for (int i = 0; i < string.toString().length(); ++i) { + res.add(StringData.fromString(String.valueOf(string.toString().charAt(i; +} +return new GenericArrayData(res.toArray()); Review Comment: handle empty delimiter at first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32706][table] Add built-in SPLIT_STRING function [flink]
hanyuzheng7 commented on PR #24365: URL: https://github.com/apache/flink/pull/24365#issuecomment-2120677719 > > When the delimiter is empty, do we need to handle this situation specially?/ > > Yes, we do. Do you mind updating the PR? I have already updated the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31223][sqlgateway] Introduce getFlinkConfigurationOptions to [flink]
reswqa commented on PR #24741: URL: https://github.com/apache/flink/pull/24741#issuecomment-2120663487 We should include it as flink has nightly ci that check jdk17 build also. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
pnowojski commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1606882746 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.FileUtils; + +import org.junit.After; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; + +import static org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment; +import static org.junit.Assert.fail; + +/** Integration test for performing rescale of unaligned checkpoint with custom partitioner. */ +public class UnalignedCheckpointCustomRescaleITCase { Review Comment: > You could always most likely change the record type for UnalignedCheckpointRescaleITCase. String there should work just as fine, but that's probably more work vs option 1. I've just realised that I don't see any place in the `UnalignedCheckpointRescaleITCase` that hardcodes the type of the record. You can add a new `Topology` (`UnalignedCheckpointRescaleITCase.Topology`) that creates any JobGraph, so you can keep your proposed records format. Also I wouldn't mind if you re-used the same `LongSource`, but changed the record type from `Long` to some POJO of `Long` and `String payload` ``` public static class Record { private final long value; private final @Nullable String payload; (...) } ``` where `payload` length/size would be configurable, between 0 (`payload == null`) to whatever value you would configured (`3713`?). This way you won't be duplicating all of the setup code and you will leverage the same -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35401) Add SQS Table API support
[ https://issues.apache.org/jira/browse/FLINK-35401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ahmed Hamdy updated FLINK-35401: Fix Version/s: aws-connector-4.4.0 > Add SQS Table API support > - > > Key: FLINK-35401 > URL: https://issues.apache.org/jira/browse/FLINK-35401 > Project: Flink > Issue Type: New Feature > Components: Connectors / AWS >Reporter: Ahmed Hamdy >Priority: Major > Fix For: aws-connector-4.4.0 > > > # Add Table API support for Amazon SQS sink as per > [FLIP-481|https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35401) Add SQS Table API support
[ https://issues.apache.org/jira/browse/FLINK-35401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ahmed Hamdy updated FLINK-35401: Description: Add Table API support for Amazon SQS (was: This is an umbrella task for FLIP-438. FLIP-438: https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector) > Add SQS Table API support > - > > Key: FLINK-35401 > URL: https://issues.apache.org/jira/browse/FLINK-35401 > Project: Flink > Issue Type: New Feature > Components: Connectors / AWS >Reporter: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > > Add Table API support for Amazon SQS -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35401) Add SQS Table API support
[ https://issues.apache.org/jira/browse/FLINK-35401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ahmed Hamdy updated FLINK-35401: Labels: (was: pull-request-available) > Add SQS Table API support > - > > Key: FLINK-35401 > URL: https://issues.apache.org/jira/browse/FLINK-35401 > Project: Flink > Issue Type: New Feature > Components: Connectors / AWS >Reporter: Ahmed Hamdy >Priority: Major > > Add Table API support for Amazon SQS -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35401) Add SQS Table API support
[ https://issues.apache.org/jira/browse/FLINK-35401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ahmed Hamdy updated FLINK-35401: Description: # Add Table API support for Amazon SQS sink as per [FLIP-481|https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector] (was: # Add Table API support for Amazon SQS sink as per [FLIP-481|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector]]) > Add SQS Table API support > - > > Key: FLINK-35401 > URL: https://issues.apache.org/jira/browse/FLINK-35401 > Project: Flink > Issue Type: New Feature > Components: Connectors / AWS >Reporter: Ahmed Hamdy >Priority: Major > > # Add Table API support for Amazon SQS sink as per > [FLIP-481|https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35401) Add SQS Table API support
[ https://issues.apache.org/jira/browse/FLINK-35401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ahmed Hamdy updated FLINK-35401: Description: # Add Table API support for Amazon SQS sink as per [FLIP-481|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector]] (was: Add Table API support for Amazon SQS sink as per FLIP-481) > Add SQS Table API support > - > > Key: FLINK-35401 > URL: https://issues.apache.org/jira/browse/FLINK-35401 > Project: Flink > Issue Type: New Feature > Components: Connectors / AWS >Reporter: Ahmed Hamdy >Priority: Major > > # Add Table API support for Amazon SQS sink as per > [FLIP-481|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector]] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35401) Add SQS Table API support
[ https://issues.apache.org/jira/browse/FLINK-35401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ahmed Hamdy updated FLINK-35401: Description: Add Table API support for Amazon SQS sink as per FLIP-481 (was: Add Table API support for Amazon SQS) > Add SQS Table API support > - > > Key: FLINK-35401 > URL: https://issues.apache.org/jira/browse/FLINK-35401 > Project: Flink > Issue Type: New Feature > Components: Connectors / AWS >Reporter: Ahmed Hamdy >Priority: Major > > Add Table API support for Amazon SQS sink as per FLIP-481 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35401) Add SQS Table API support
Ahmed Hamdy created FLINK-35401: --- Summary: Add SQS Table API support Key: FLINK-35401 URL: https://issues.apache.org/jira/browse/FLINK-35401 Project: Flink Issue Type: New Feature Components: Connectors / AWS Reporter: Ahmed Hamdy This is an umbrella task for FLIP-438. FLIP-438: https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-31223][sqlgateway] Introduce getFlinkConfigurationOptions to [flink]
davidradl commented on PR #24741: URL: https://github.com/apache/flink/pull/24741#issuecomment-2120541110 @reswqa If you have strong views I can include it in this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35395][core] Fix KeyedStateStore class annotation error [flink]
usberkeley commented on PR #24808: URL: https://github.com/apache/flink/pull/24808#issuecomment-2120534284 > Oh, `/doc` means flink documentation, while java doc means the doc/comments on java class/interface/method/field. > > But It doesn't matter, this fix is good enough. Would you mind changing the commit message to `[FLINK-35395][core] Fix the example code of KeyedStateStore#getState`? Yes, it's just a java doc error, thank you. Okay, the commit message has been modified. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35395][core] Fix KeyedStateStore class annotation error [flink]
usberkeley commented on PR #24808: URL: https://github.com/apache/flink/pull/24808#issuecomment-2120526422 > Oh, `/doc` means flink documentation, while java doc means the doc/comments on java class/interface/method/field. > > But It doesn't matter, this fix is good enough. Would you mind changing the commit message to `[FLINK-35395][core] Fix the example code of KeyedStateStore#getState`? I understand, it's just a java doc error, thank you. Okay, the commit message has been modified. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
pnowojski commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1606820986 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ## @@ -421,7 +423,27 @@ public void reDistributeInputChannelStates(TaskStateAssignment stateAssignment) stateAssignment.oldState.get(stateAssignment.inputOperatorID); final List> inputOperatorState = splitBySubtasks(inputState, OperatorSubtaskState::getInputChannelState); -if (inputState.getParallelism() == executionJobVertex.getParallelism()) { + +boolean noNeedRescale = + stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() +.map(JobEdge::getDownstreamSubtaskStateMapper) +.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) +&& stateAssignment.executionJobVertex.getInputs().stream() +.map(IntermediateResult::getProducer) +.map(vertexAssignments::get) +.anyMatch( +taskStateAssignment -> { +final int oldParallelism = +stateAssignment +.oldState + .get(stateAssignment.inputOperatorID) +.getParallelism(); +return oldParallelism +== taskStateAssignment.executionJobVertex +.getParallelism(); +}); Review Comment: Bumping: > Maybe in StateAssignmentOperationTest create a unit test that has one FULL and one something else, and assert that the assigned states are as they should be? > Does this bug have a test coverage? I mean, either was there some test failing or have you added a new test to cover for a future regression? sorry to bother you again, but the unit test that you have added still doesn't have test coverage. When I try running your previous version of the code: ``` boolean noNeedRescale = stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() .map(JobEdge::getDownstreamSubtaskStateMapper) .anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) && stateAssignment.executionJobVertex.getInputs().stream() .map(IntermediateResult::getProducer) .map(vertexAssignments::get) .anyMatch( taskStateAssignment -> { final int oldParallelism = stateAssignment .oldState .get(stateAssignment.inputOperatorID) .getParallelism(); return oldParallelism == taskStateAssignment.executionJobVertex .getParallelism(); }); if (inputState.getParallelism() == executionJobVertex.getParallelism() && !noNeedRescale) { stateAssignment.inputChannelStates.putAll( toInstanceMap(stateAssignment.inputOperatorID, inputOperatorState)); return; } ``` the tests in `StateAssignmentOperationTest` are still green. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
pnowojski commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1606820986 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ## @@ -421,7 +423,27 @@ public void reDistributeInputChannelStates(TaskStateAssignment stateAssignment) stateAssignment.oldState.get(stateAssignment.inputOperatorID); final List> inputOperatorState = splitBySubtasks(inputState, OperatorSubtaskState::getInputChannelState); -if (inputState.getParallelism() == executionJobVertex.getParallelism()) { + +boolean noNeedRescale = + stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() +.map(JobEdge::getDownstreamSubtaskStateMapper) +.anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) +&& stateAssignment.executionJobVertex.getInputs().stream() +.map(IntermediateResult::getProducer) +.map(vertexAssignments::get) +.anyMatch( +taskStateAssignment -> { +final int oldParallelism = +stateAssignment +.oldState + .get(stateAssignment.inputOperatorID) +.getParallelism(); +return oldParallelism +== taskStateAssignment.executionJobVertex +.getParallelism(); +}); Review Comment: > Does this bug have a test coverage? I mean, either was there some test failing or have you added a new test to cover for a future regression? Hey, sorry to bother you again, but the unit test that you have added still doesn't have test coverage. When I try running your previous version of the code: ``` boolean noNeedRescale = stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() .map(JobEdge::getDownstreamSubtaskStateMapper) .anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) && stateAssignment.executionJobVertex.getInputs().stream() .map(IntermediateResult::getProducer) .map(vertexAssignments::get) .anyMatch( taskStateAssignment -> { final int oldParallelism = stateAssignment .oldState .get(stateAssignment.inputOperatorID) .getParallelism(); return oldParallelism == taskStateAssignment.executionJobVertex .getParallelism(); }); if (inputState.getParallelism() == executionJobVertex.getParallelism() && !noNeedRescale) { stateAssignment.inputChannelStates.putAll( toInstanceMap(stateAssignment.inputOperatorID, inputOperatorState)); return; } ``` the tests in `StateAssignmentOperationTest` are still green. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]
superdiaodiao commented on code in PR #24773: URL: https://github.com/apache/flink/pull/24773#discussion_r1606797248 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/UrlDecodeFunction.java: ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; + +import javax.annotation.Nullable; + +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +/** Implementation of {@link BuiltInFunctionDefinitions#URL_DECODE}. */ +@Internal +public class UrlDecodeFunction extends BuiltInScalarFunction { + +public UrlDecodeFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.URL_DECODE, context); +} + +public @Nullable StringData eval(StringData value) { +final Charset charset = StandardCharsets.UTF_8; +try { +return StringData.fromString(URLDecoder.decode(value.toString(), charset.name())); +} catch (UnsupportedEncodingException e) { +throw new RuntimeException( +"Failed to decode value: " + value + " with charset: " + charset.name(), e); +} catch (RuntimeException e) { +return value; +} Review Comment: @davidradl In the Calcite, the author didn't demonstrate the reason and the origin text is: https://github.com/apache/calcite/pull/3318#discussion_r1372603727 While in the Spark, it is different from the Calcite, although the Calcite said that it can be served for Spark(which made me think they have the same action at the first time). In fact, I read its code and tested, Spark will throw an exception instead of returning the origin value in this case. So what's Flink team's expectation? I will follow your guide. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]
superdiaodiao commented on code in PR #24773: URL: https://github.com/apache/flink/pull/24773#discussion_r1606797248 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/UrlDecodeFunction.java: ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; + +import javax.annotation.Nullable; + +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +/** Implementation of {@link BuiltInFunctionDefinitions#URL_DECODE}. */ +@Internal +public class UrlDecodeFunction extends BuiltInScalarFunction { + +public UrlDecodeFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.URL_DECODE, context); +} + +public @Nullable StringData eval(StringData value) { +final Charset charset = StandardCharsets.UTF_8; +try { +return StringData.fromString(URLDecoder.decode(value.toString(), charset.name())); +} catch (UnsupportedEncodingException e) { +throw new RuntimeException( +"Failed to decode value: " + value + " with charset: " + charset.name(), e); +} catch (RuntimeException e) { +return value; +} Review Comment: @davidradl In the Calcite, the author didn't demonstrate the reason and the origin text is: https://github.com/apache/calcite/pull/3318#discussion_r1372603727 While in the Spark, it is different from the Calcite, although the Calcite said that it can be served for Spark(which made me think they have the same action at the first time). In fact, I read its code and tested, Spark will throw an exception instead of returning the origin value in this case. So what's Flink team's expectation? I will follow your guide. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Fix ingress example in docs [flink-kubernetes-operator]
yardenshoham opened a new pull request, #828: URL: https://github.com/apache/flink-kubernetes-operator/pull/828 In the "Simple domain based routing" example, there's no `/` so the ingress doesn't work by default -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]
superdiaodiao commented on code in PR #24773: URL: https://github.com/apache/flink/pull/24773#discussion_r1606797248 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/UrlDecodeFunction.java: ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; + +import javax.annotation.Nullable; + +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +/** Implementation of {@link BuiltInFunctionDefinitions#URL_DECODE}. */ +@Internal +public class UrlDecodeFunction extends BuiltInScalarFunction { + +public UrlDecodeFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.URL_DECODE, context); +} + +public @Nullable StringData eval(StringData value) { +final Charset charset = StandardCharsets.UTF_8; +try { +return StringData.fromString(URLDecoder.decode(value.toString(), charset.name())); +} catch (UnsupportedEncodingException e) { +throw new RuntimeException( +"Failed to decode value: " + value + " with charset: " + charset.name(), e); +} catch (RuntimeException e) { +return value; +} Review Comment: @davidradl In the Calcite, the author didn't demonstrate the reason and the origin text is: https://github.com/apache/calcite/pull/3318#discussion_r1372603727 While in the Spark, it is different with the Calcite, although the Calcite said that it can be served for Spark. In fact, I read its code and tested, Spark will throw an exception instead of returning the origin value in this case. So what's Flink team's expectation? I will follow your guide. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]
superdiaodiao commented on code in PR #24773: URL: https://github.com/apache/flink/pull/24773#discussion_r1606797248 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/UrlDecodeFunction.java: ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; + +import javax.annotation.Nullable; + +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +/** Implementation of {@link BuiltInFunctionDefinitions#URL_DECODE}. */ +@Internal +public class UrlDecodeFunction extends BuiltInScalarFunction { + +public UrlDecodeFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.URL_DECODE, context); +} + +public @Nullable StringData eval(StringData value) { +final Charset charset = StandardCharsets.UTF_8; +try { +return StringData.fromString(URLDecoder.decode(value.toString(), charset.name())); +} catch (UnsupportedEncodingException e) { +throw new RuntimeException( +"Failed to decode value: " + value + " with charset: " + charset.name(), e); +} catch (RuntimeException e) { +return value; +} Review Comment: @davidradl In the Calcite, the author didn't demonstrate the reason and the origin text is: https://github.com/apache/calcite/pull/3318#discussion_r1372603727 While in the Spark, it is different with the Calcite, although the Calcite said that it can be served for Spark. In fact, Spark will throw an exception instead of returning the origin value in this case. So what's Flink team's expectation? I will follow your guide. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]
superdiaodiao commented on code in PR #24773: URL: https://github.com/apache/flink/pull/24773#discussion_r1606797248 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/UrlDecodeFunction.java: ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; + +import javax.annotation.Nullable; + +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +/** Implementation of {@link BuiltInFunctionDefinitions#URL_DECODE}. */ +@Internal +public class UrlDecodeFunction extends BuiltInScalarFunction { + +public UrlDecodeFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.URL_DECODE, context); +} + +public @Nullable StringData eval(StringData value) { +final Charset charset = StandardCharsets.UTF_8; +try { +return StringData.fromString(URLDecoder.decode(value.toString(), charset.name())); +} catch (UnsupportedEncodingException e) { +throw new RuntimeException( +"Failed to decode value: " + value + " with charset: " + charset.name(), e); +} catch (RuntimeException e) { +return value; +} Review Comment: In the Calcite, the author didn't demonstrate the reason and the origin text is: https://github.com/apache/calcite/pull/3318#discussion_r1372603727 While in the Spark, it is different with the Calcite, although the Calcite said that it can be served for Spark. In fact, Spark will throw an exception instead of returning the origin value in this case. So what's Flink team's expectation? I will follow your guide. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]
superdiaodiao commented on code in PR #24773: URL: https://github.com/apache/flink/pull/24773#discussion_r1606776660 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/UrlEncodeFunction.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +/** Implementation of {@link BuiltInFunctionDefinitions#URL_ENCODE}. */ +@Internal +public class UrlEncodeFunction extends BuiltInScalarFunction { + +public UrlEncodeFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.URL_ENCODE, context); +} + +public @Nullable StringData eval(StringData url) { +if (url == null) { +return null; +} +final Charset charset = StandardCharsets.UTF_8; +String value; + +try { +value = URLEncoder.encode(url.toString(), charset.name()); +} catch (UnsupportedEncodingException e) { +throw new FlinkRuntimeException( +"Failed to encode URL: " + url + " with charset: " + charset.name(), e); Review Comment: ok, I will add -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]
davidradl commented on code in PR #24773: URL: https://github.com/apache/flink/pull/24773#discussion_r1606752091 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/UrlEncodeFunction.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +/** Implementation of {@link BuiltInFunctionDefinitions#URL_ENCODE}. */ +@Internal +public class UrlEncodeFunction extends BuiltInScalarFunction { + +public UrlEncodeFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.URL_ENCODE, context); +} + +public @Nullable StringData eval(StringData url) { +if (url == null) { +return null; +} +final Charset charset = StandardCharsets.UTF_8; +String value; + +try { +value = URLEncoder.encode(url.toString(), charset.name()); +} catch (UnsupportedEncodingException e) { +throw new FlinkRuntimeException( +"Failed to encode URL: " + url + " with charset: " + charset.name(), e); Review Comment: Nit: I suggest changing the Exception message to a little more explicit "...with charset:" to .. due to an unsupported encoding with charset:" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]
davidradl commented on code in PR #24773: URL: https://github.com/apache/flink/pull/24773#discussion_r1606749183 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/UrlDecodeFunction.java: ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; + +import javax.annotation.Nullable; + +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +/** Implementation of {@link BuiltInFunctionDefinitions#URL_DECODE}. */ +@Internal +public class UrlDecodeFunction extends BuiltInScalarFunction { + +public UrlDecodeFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.URL_DECODE, context); +} + +public @Nullable StringData eval(StringData value) { +final Charset charset = StandardCharsets.UTF_8; +try { +return StringData.fromString(URLDecoder.decode(value.toString(), charset.name())); +} catch (UnsupportedEncodingException e) { +throw new RuntimeException( +"Failed to decode value: " + value + " with charset: " + charset.name(), e); +} catch (RuntimeException e) { +return value; +} Review Comment: @superdiaodiao Do we know why Calcite/Spark do this? It seems to be a strange design choice as we will go forward with this string not decoded, but the subsequent code would expect it to be decoded and is likely to get unexpected results; which would be tricky to debug. I notice that encode does not do this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org