[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15
wangyang0918 commented on PR #195: URL: https://github.com/apache/flink-kubernetes-operator/pull/195#issuecomment-1120140585 I have done some manual tests. And it works well for recover the missing deployment. I have last one question which need to be discussed more. When `SHUTDOWN_ON_APPLICATION_FINISH` is configured to false, then it is the operator's responsibility to clean up the K8s resources. However, I do not find such logics. It means that if the job reached a globally terminal state(e.g. failed, finished) or suspended manually(changing the CR `.spec.job.state`), we will have the residual K8s resources. In my opinion, the operator needs to do the clean-up after fetching the job status and store it into the CR status successfully. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] openinx commented on a diff in pull request #110: [FLINK-27517] Introduce rolling file writer to write one record each time for append-only table.
openinx commented on code in PR #110: URL: https://github.com/apache/flink-table-store/pull/110#discussion_r867300460 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java: ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.table.store.file.writer; + +import org.apache.flink.table.store.file.data.DataFileMeta; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Supplier; + +/** + * Writer to roll over to a new file if the current size exceed the target file size. + * + * @param record data type. + */ +public class RollingFileWriter implements FileWriter> { Review Comment: Yes, will remove the `RollingFile` in the next updating PR, and re-implement the Manifest writer by using the new writer API. ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/BaseFileWriter.java: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.table.store.file.writer; + +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.store.file.data.DataFileMeta; +import org.apache.flink.table.store.file.utils.FileUtils; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * The abstracted base file writer implementation for {@link FileWriter}. + * + * @param record data type. + */ +public abstract class BaseFileWriter implements FileWriter { + +private final Path path; + +private long recordCount; +private FSDataOutputStream currentOut; +private BulkWriter currentWriter; + +private boolean closed = false; + +public BaseFileWriter(BulkWriter.Factory writerFactory, Path path) throws IOException { +this.path = path; + +this.recordCount = 0; +this.currentOut = path.getFileSystem().create(path, FileSystem.WriteMode.NO_OVERWRITE); +this.currentWriter = writerFactory.create(currentOut); +} + +@Override +public void write(T row) throws IOException { +currentWriter.addElement(row); +recordCount += 1; +} + +@Override +public long recordCount() { +return recordCount; +} + +@Override +public long length() throws IOException { +return currentOut.getPos(); +} + +@Override +public void flush() throws IOException { +currentWriter.flush(); +} + +protected abstract DataFileMeta createDataFileMeta(Path path) throws IOException; + +@Override +public void abort() { Review Comment: It's true. The abort method should always call the close() inside it. That's why I designed the idempotent close() implementation... ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java: ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0
[GitHub] [flink] ChengkaiYang2022 commented on pull request #19498: [FLINK-26588][docs]Translate the new CAST documentation to Chinese
ChengkaiYang2022 commented on PR #19498: URL: https://github.com/apache/flink/pull/19498#issuecomment-1120129625 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15
wangyang0918 commented on code in PR #195: URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r867299875 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java: ## @@ -255,4 +259,50 @@ public static boolean shouldRollBack( .minus(readinessTimeout) .isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp())); } + +public static boolean deploymentRecoveryEnabled(Configuration conf) { +return conf.getOptional( + KubernetesOperatorConfigOptions.OPERATOR_RECOVER_JM_DEPLOYMENT_ENABLED) +.orElse( +conf.get(FlinkConfigBuilder.FLINK_VERSION) Review Comment: nit: `? true : false` is unnecessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] ChengkaiYang2022 commented on pull request #19498: [FLINK-26588][docs]Translate the new CAST documentation to Chinese
ChengkaiYang2022 commented on PR #19498: URL: https://github.com/apache/flink/pull/19498#issuecomment-1120128815 @flinkbot -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15
wangyang0918 commented on code in PR #195: URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r867299797 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java: ## @@ -211,6 +220,22 @@ protected boolean isSuspendedJob(FlinkDeployment deployment) { && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED; } +private void onMissingDeployment(FlinkDeployment deployment) { +String err = "Missing JobManager deployment"; +logger.error(err); Review Comment: My bad. We might not need to add namespace and name here. MDC add this. We also do not need this for the event. It is duplicated. ``` wangyang-pc:flink-kubernetes-operator danrtsey.wy$ kubectl get events | grep 'flinkdeployment/flink-example-statemachine' 11h Error Missing JobManager deployment for default/flink-example-statemachine flinkdeployment/flink-example-statemachine Missing JobManager deployment for default/flink-example-statemachine ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15
wangyang0918 commented on code in PR #195: URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r867299527 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java: ## @@ -211,6 +220,22 @@ protected boolean isSuspendedJob(FlinkDeployment deployment) { && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED; } +private void onMissingDeployment(FlinkDeployment deployment) { +String err = "Missing JobManager deployment"; +logger.error(err); +Event event = +DeploymentFailedException.asEvent( +new DeploymentFailedException( + DeploymentFailedException.COMPONENT_JOBMANAGER, "Error", err), +deployment); +kubernetesClient Review Comment: I think it is due to `lastTransitionTime` and `lastUpdateTime` is null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] chucheng92 commented on pull request #19654: [FLINK-27529][connector/common] Replace HybridSourceSplitEnumerator readerSourceIndex to safely Integer check
chucheng92 commented on PR #19654: URL: https://github.com/apache/flink/pull/19654#issuecomment-1120124506 @tweise Hi, tweise can u help me to reviewing this pr ? thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] chucheng92 commented on pull request #19654: [FLINK-27529][connector/common] Replace HybridSourceSplitEnumerator readerSourceIndex to safely Integer check
chucheng92 commented on PR #19654: URL: https://github.com/apache/flink/pull/19654#issuecomment-1120124033 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27529) HybridSourceSplitEnumerator sourceIndex using error Integer check
[ https://issues.apache.org/jira/browse/FLINK-27529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ran Tao updated FLINK-27529: Description: Currently HybridSourceSplitEnumerator check readerSourceIndex using Integer type but == operator. In some case, it will cause error(Integer == only works fine in [-128,127]) we can use Integer.equals instead. {code:java} @Override public Map registeredReaders() { // TODO: not start enumerator until readers are ready? Map readers = realContext.registeredReaders(); if (readers.size() != readerSourceIndex.size()) { return filterRegisteredReaders(readers); } Integer lastIndex = null; for (Integer sourceIndex : readerSourceIndex.values()) { if (lastIndex != null && lastIndex != sourceIndex) { return filterRegisteredReaders(readers); } lastIndex = sourceIndex; } return readers; } private Map filterRegisteredReaders(Map readers) { Map readersForSource = new HashMap<>(readers.size()); for (Map.Entry e : readers.entrySet()) { if (readerSourceIndex.get(e.getKey()) == (Integer) sourceIndex) { readersForSource.put(e.getKey(), e.getValue()); } } return readersForSource; } {code} {code:java} private void sendSwitchSourceEvent(int subtaskId, int sourceIndex) { readerSourceIndex.put(subtaskId, sourceIndex); .. } {code} was: Currently HybridSourceSplitEnumerator check readerSourceIndex using Integer type but == operator. In some case, it will cause error(Integer == only works fine in [-128,127]) we can use Integer.equals instead. But actually readerSourceIndex is primitive int intrinsically,so we can change Integer to int to check sourceIndex instead of Integer.equals method. it will be more elegant. {code:java} @Override public Map registeredReaders() { // TODO: not start enumerator until readers are ready? Map readers = realContext.registeredReaders(); if (readers.size() != readerSourceIndex.size()) { return filterRegisteredReaders(readers); } Integer lastIndex = null; for (Integer sourceIndex : readerSourceIndex.values()) { if (lastIndex != null && lastIndex != sourceIndex) { return filterRegisteredReaders(readers); } lastIndex = sourceIndex; } return readers; } private Map filterRegisteredReaders(Map readers) { Map readersForSource = new HashMap<>(readers.size()); for (Map.Entry e : readers.entrySet()) { if (readerSourceIndex.get(e.getKey()) == (Integer) sourceIndex) { readersForSource.put(e.getKey(), e.getValue()); } } return readersForSource; } {code} {code:java} private void sendSwitchSourceEvent(int subtaskId, int sourceIndex) { readerSourceIndex.put(subtaskId, sourceIndex); .. } {code} > HybridSourceSplitEnumerator sourceIndex using error Integer check > - > > Key: FLINK-27529 > URL: https://issues.apache.org/jira/browse/FLINK-27529 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.15.0, 1.14.4, 1.15.1 >Reporter: Ran Tao >Assignee: Ran Tao >Priority: Major > Labels: pull-request-available > > Currently HybridSourceSplitEnumerator check readerSourceIndex using Integer > type but == operator. In some case, it will cause error(Integer == only > works fine in [-128,127]) we can use Integer.equals instead. > {code:java} > @Override > public Map registeredReaders() { > // TODO: not start enumerator until readers are ready? > Map readers = > realContext.registeredReaders(); > if (readers.size() != readerSourceIndex.size()) { > return filterRegisteredReaders(readers); > } > Integer lastIndex = null; > for (Integer sourceIndex : readerSourceIndex.values()) { > if (lastIndex != null && lastIndex != sourceIndex) { > return filterRegisteredReaders(readers); > } > lastIndex = sourceIndex; > } > return readers; > } > private Map filterRegisteredReaders(Map ReaderInfo> readers) { > Map readersForSource = new > HashMap<>(readers.size()); > for (Map.Entry e : readers.entrySet()) { >
[GitHub] [flink] baisui1981 commented on pull request #17521: [FLINK-24558][API/DataStream]make parent ClassLoader variable which c…
baisui1981 commented on PR #17521: URL: https://github.com/apache/flink/pull/17521#issuecomment-1120122341 @RocMarshal @zentol @AHeise could you please give me some suggestion ,how to continue processing for this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] ChengkaiYang2022 commented on pull request #19498: [FLINK-26588][docs]Translate the new CAST documentation to Chinese
ChengkaiYang2022 commented on PR #19498: URL: https://github.com/apache/flink/pull/19498#issuecomment-1120121765 > The CI test fails. Could you rebase to the latest master and trigger the test again? Okay,I will try to rebase -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-27538) Change flink.version 1.15-SNAPSHOT to 1.15.0 in table store
Jingsong Lee created FLINK-27538: Summary: Change flink.version 1.15-SNAPSHOT to 1.15.0 in table store Key: FLINK-27538 URL: https://issues.apache.org/jira/browse/FLINK-27538 Project: Flink Issue Type: Improvement Components: Table Store Reporter: Jingsong Lee Fix For: table-store-0.2.0 * change flink.version * Use flink docker in E2eTestBase -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] fsk119 commented on pull request #19498: [FLINK-26588][docs]Translate the new CAST documentation to Chinese
fsk119 commented on PR #19498: URL: https://github.com/apache/flink/pull/19498#issuecomment-1120116520 The CI test fails. Could you rebase to the latest master and trigger the test again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #110: [FLINK-27517] Introduce rolling file writer to write one record each time for append-only table.
JingsongLi commented on code in PR #110: URL: https://github.com/apache/flink-table-store/pull/110#discussion_r867288653 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java: ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.table.store.file.writer; + +import org.apache.flink.table.store.file.data.DataFileMeta; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Supplier; + +/** + * Writer to roll over to a new file if the current size exceed the target file size. + * + * @param record data type. + */ +public class RollingFileWriter implements FileWriter> { Review Comment: It is better to rename `RollingFile` to `RollingFileWriter`? RollingFileWriter is here to replace RollingFile, and we should delete RollingFile ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java: ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.table.store.file.writer; + +import org.apache.flink.table.store.file.data.DataFileMeta; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Supplier; + +/** + * Writer to roll over to a new file if the current size exceed the target file size. + * + * @param record data type. + */ +public class RollingFileWriter implements FileWriter> { + +private final Supplier> writerFactory; +private final long targetFileSize; +private final List results; + +private BaseFileWriter currentWriter = null; +private long recordCount = 0; +private boolean closed = false; + +public RollingFileWriter(Supplier> writerFactory, long targetFileSize) { +this.writerFactory = writerFactory; +this.targetFileSize = targetFileSize; +this.results = new ArrayList<>(); +} + +@Override +public void write(T row) throws IOException { +// Open the current writer if write the first record or roll over happen before. +if (currentWriter == null) { +currentWriter = writerFactory.get(); +} + +currentWriter.write(row); +recordCount += 1; + +if (currentWriter.length() >= targetFileSize) { +currentWriter.close(); +results.add(currentWriter.result()); + +currentWriter = null; +} +} + +@Override +public long recordCount() { +return recordCount; +} + +@Override +public long length() throws IOException { +long lengthOfClosedFiles = results.stream().mapToLong(DataFileMeta::fileSize).sum(); +if (currentWriter != null) { +lengthOfClosedFiles += currentWriter.length(); Review Comment: `lengthOfClosedFiles` -> `totalLength`? Because it adds the unclosed writer. ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java: ## @@ -97,172 +99,176 @@ public DataFilePathFactory pathFactory() { */ public List write(CloseableIterator iterator, int level) throws Exception { -DataRollingFile rollingFile = -fileStatsExtractor == null -
[GitHub] [flink] Myasuka commented on pull request #19600: [FLINK-27433][tests] Relocate RocksDB's log back to its own database dir
Myasuka commented on PR #19600: URL: https://github.com/apache/flink/pull/19600#issuecomment-1120113084 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lsyldliu commented on pull request #19410: [FLINK-15635][table] Now EnvironmentSettings accepts the user ClassLoader
lsyldliu commented on PR #19410: URL: https://github.com/apache/flink/pull/19410#issuecomment-1120109452 ping @slinkydeveloper -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JackWangCS commented on pull request #19666: [FLINK-25950][Runtime] Delete retry mechanism from ZooKeeperUtils.deleteZNode
JackWangCS commented on PR #19666: URL: https://github.com/apache/flink/pull/19666#issuecomment-1120109399 > @JackWangCS It looks good to me Thank you for the code review, let us wait for the CI report. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] liuyongvs commented on pull request #19666: [FLINK-25950][Runtime] Delete retry mechanism from ZooKeeperUtils.deleteZNode
liuyongvs commented on PR #19666: URL: https://github.com/apache/flink/pull/19666#issuecomment-1120108825 @JackWangCS It looks good to me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JackWangCS commented on pull request #19666: [FLINK-25950][Runtime] Delete retry mechanism from ZooKeeperUtils.deleteZNode
JackWangCS commented on PR #19666: URL: https://github.com/apache/flink/pull/19666#issuecomment-1120108403 Hi @XComp , could you help to review this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #19666: [FLINK-25950][Runtime] Delete retry mechanism from ZooKeeperUtils.deleteZNode
flinkbot commented on PR #19666: URL: https://github.com/apache/flink/pull/19666#issuecomment-1120108334 ## CI report: * eeef4606a89d9137721fe4e43284167a76f911ed UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-25950) Delete retry mechanism from ZooKeeperUtils.deleteZNode
[ https://issues.apache.org/jira/browse/FLINK-25950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-25950: --- Labels: pull-request-available (was: ) > Delete retry mechanism from ZooKeeperUtils.deleteZNode > -- > > Key: FLINK-25950 > URL: https://issues.apache.org/jira/browse/FLINK-25950 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / Coordination >Affects Versions: 1.15.0 >Reporter: Matthias Pohl >Priority: Major > Labels: pull-request-available > > {{ZooKeeperUtils.deleteZNode}} implements a retry loop that is not necessary > for curator version 4.0.1+. This code can be cleaned up -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] JackWangCS opened a new pull request, #19666: [FLINK-25950][Runtime] Delete retry mechanism from ZooKeeperUtils.deleteZNode
JackWangCS opened a new pull request, #19666: URL: https://github.com/apache/flink/pull/19666 ## What is the purpose of the change ZooKeeperUtils.deleteZNode implements a retry loop that is not necessary for curator version 4.0.1+. We can use idempotent delete since we already upgraded Curator to 5.2.0. ## Brief change log - *Delete retry mechanism from ZooKeeperUtils.deleteZNode* ## Verifying this change This change is already covered by existing tests, such as ZooKeeperUtilsWriteLeaderInformationTest#testDeleteZNode(). ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] zhipeng93 commented on pull request #93: [FLINK-27091] Add Transformer and Estimator for LinearSVC
zhipeng93 commented on PR #93: URL: https://github.com/apache/flink-ml/pull/93#issuecomment-1120098160 @lindong28 @yunfengzhou-hub Can you help to review this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] RocMarshal commented on pull request #18928: [FLINK-25545][flink-clients][JUnit5 Migration] Module: flink-clients.
RocMarshal commented on PR #18928: URL: https://github.com/apache/flink/pull/18928#issuecomment-1120094338 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] dependabot[bot] commented on pull request #80: Bump checkstyle from 8.14 to 8.29
dependabot[bot] commented on PR #80: URL: https://github.com/apache/flink-ml/pull/80#issuecomment-1120089918 OK, I won't notify you again about this release, but will get in touch when a new version is available. If you'd rather skip all updates until the next major or minor version, let me know by commenting `@dependabot ignore this major version` or `@dependabot ignore this minor version`. If you change your mind, just re-open this PR and I'll resolve any conflicts on it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] lindong28 commented on pull request #80: Bump checkstyle from 8.14 to 8.29
lindong28 commented on PR #80: URL: https://github.com/apache/flink-ml/pull/80#issuecomment-1120089909 apache/flink-ml should use the same checkstyle as apache/flink, which is still using checkstyle 8.14. We will bump checkstyle version when apache/flink bump checkstyle version in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] lindong28 closed pull request #80: Bump checkstyle from 8.14 to 8.29
lindong28 closed pull request #80: Bump checkstyle from 8.14 to 8.29 URL: https://github.com/apache/flink-ml/pull/80 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-27496) Add test for rollback during application upgrade
[ https://issues.apache.org/jira/browse/FLINK-27496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-27496. -- Resolution: Won't Do > Add test for rollback during application upgrade > > > Key: FLINK-27496 > URL: https://issues.apache.org/jira/browse/FLINK-27496 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.0.0 >Reporter: Gyula Fora >Priority: Major > Labels: Starter > > With the improvements contained in FLINK-27468, it now should be possible to > trigger a rollback during an ongoing savepoint upgrade (when we are waiting > for the cluster to become upgradeable). > We should add a test case to cover this after FLINK-27468 is merged. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15
gyfora commented on code in PR #195: URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r867148302 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java: ## @@ -211,6 +220,22 @@ protected boolean isSuspendedJob(FlinkDeployment deployment) { && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED; } +private void onMissingDeployment(FlinkDeployment deployment) { +String err = "Missing JobManager deployment"; +logger.error(err); +Event event = +DeploymentFailedException.asEvent( +new DeploymentFailedException( + DeploymentFailedException.COMPONENT_JOBMANAGER, "Error", err), +deployment); +kubernetesClient Review Comment: I am not really sure how to fix this one -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-playgrounds] fredthomsen opened a new pull request, #28: Build pyflint image with compatible python version
fredthomsen opened a new pull request, #28: URL: https://github.com/apache/flink-playgrounds/pull/28 The underlying base image is now debian bullseye which ships with python3.9 and this version of python is not compatible with pyflink. Thus build and add python3.7 to image. Should be temporary until https://issues.apache.org/jira/browse/FLINK-27058 is resolved -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-docker] afedulov commented on a diff in pull request #116: Publish snapshot images
afedulov commented on code in PR #116: URL: https://github.com/apache/flink-docker/pull/116#discussion_r866998834 ## utils.sh: ## @@ -0,0 +1,59 @@ +#!/bin/bash -e + +IMAGE_REPO=$1 +echo "Image repository: $IMAGE_REPO" + +function image_tag() { +local dockerfile +dockerfile="$1" + +local variant minor_version +variant="$(basename "$(dirname "$dockerfile")")" + +echo "${variant}" +} + +function image_name() { +local image_tag +image_tag="$1" + +echo "${IMAGE_REPO}:${image_tag}" +} + +function build_image() { +local dockerfile +dockerfile="$1" + +local image_tag image_name dockerfile_dir +image_tag="$(image_tag "$dockerfile")" +image_name="$(image_name "$image_tag")" +dockerfile_dir="$(dirname "$dockerfile")" + +echo >&2 "===> Building ${image_tag} image..." +docker build -t "$image_name" "$dockerfile_dir" +} + +function publish_snapshots() { Review Comment: I guess you are right, I'll inline this logic into the pipeline. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-docker] afedulov commented on a diff in pull request #116: Publish snapshot images
afedulov commented on code in PR #116: URL: https://github.com/apache/flink-docker/pull/116#discussion_r866937358 ## .github/workflows/snapshot.yml: ## @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: "Publish SNAPSHOTs" + +on: + # schedule: + # - cron: '0 0 * * *' # Deploy every day + push: +branches: + - snapshot-dev-master + +env: + REGISTRY: ghcr.io + VERSION: 1.16-SNAPSHOT + +jobs: + ci: +uses: ./.github/workflows/ci.yml + snapshot: +needs: ci +runs-on: ubuntu-latest +permissions: + contents: read + packages: write +strategy: + matrix: +config: + - java_version: 8 + - java_version: 11 +steps: + - name: Set env +run: | + repo=${{ github.repository }}# apache/flink-docker + echo "IMAGE_REPO=$(echo ${repo##*/})" >> $GITHUB_ENV # apache + echo "OWNER=$(echo ${repo%%/*})" >> $GITHUB_ENV # flink-docker + - uses: actions/checkout@v3 + - name: Variables +run: | + echo "OWNER: $OWNER" + echo "IMAGE_REPO: $IMAGE_REPO" + - name: Prepare Dockerfiles +run: | + ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-${VERSION}-bin-scala_2.12.tgz; -j ${{ matrix.config.java_version }} -n ${VERSION}-scala_2.12-java${{ matrix.config.java_version }} Review Comment: I thought that we were going to snapshot.yaml to the dev-master branch. If I get you correctly, this won't work with the scheduled runs, because they can only be triggered from master. Is my understanding correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27537) Remove requirement for Async Sink's RequestEntryT to be serializable
[ https://issues.apache.org/jira/browse/FLINK-27537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-27537: --- Description: Currently, in AsyncSinkBase and it's dependent classes, e.g. the sink writer, element converter etc., the RequestEntryT generic type is required to be serializable. However, this requirement no longer holds and there is nothing that actually requires this. Proposed approach: * Remove the extends serializable from the generic type RequestEntryT was: Currently, in `AsyncSinkBase` and it's dependent classes, e.g. the sink writer, element converter etc., the `RequestEntryT` generic type is required to be `serializable`. However, this requirement no longer holds and there is nothing that actually requires this. Proposed approach: * Remove the `extends serializable` from the generic type `RequestEntryT` > Remove requirement for Async Sink's RequestEntryT to be serializable > > > Key: FLINK-27537 > URL: https://issues.apache.org/jira/browse/FLINK-27537 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.15.0 >Reporter: Zichen Liu >Priority: Major > > Currently, in AsyncSinkBase and it's dependent classes, e.g. the sink writer, > element converter etc., the RequestEntryT generic type is required to be > serializable. > However, this requirement no longer holds and there is nothing that actually > requires this. > Proposed approach: > * Remove the extends serializable from the generic type RequestEntryT -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27537) Remove requirement for Async Sink's RequestEntryT to be serializable
Zichen Liu created FLINK-27537: -- Summary: Remove requirement for Async Sink's RequestEntryT to be serializable Key: FLINK-27537 URL: https://issues.apache.org/jira/browse/FLINK-27537 Project: Flink Issue Type: Improvement Components: Connectors / Common Affects Versions: 1.15.0 Reporter: Zichen Liu Currently, in `AsyncSinkBase` and it's dependent classes, e.g. the sink writer, element converter etc., the `RequestEntryT` generic type is required to be `serializable`. However, this requirement no longer holds and there is nothing that actually requires this. Proposed approach: * Remove the `extends serializable` from the generic type `RequestEntryT` -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] JingGe commented on a diff in pull request #19664: [hotfix][doc] add content for using numRecordsSend and deprecating numRecordsOutErrors
JingGe commented on code in PR #19664: URL: https://github.com/apache/flink/pull/19664#discussion_r866974112 ## docs/content/release-notes/flink-1.15.md: ## @@ -406,6 +406,13 @@ updating the client dependency to a version >= 7.14.0 is required due to interna The old JDBC connector (indicated by `connector.type=jdbc` in DDL) has been removed. If not done already, users need to upgrade to the newer stack (indicated by `connector=jdbc` in DDL). + [FLINK-26126](https://issues.apache.org/jira/browse/FLINK-26126) Review Comment: Excellent! I forgot that. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15
gyfora commented on PR #195: URL: https://github.com/apache/flink-kubernetes-operator/pull/195#issuecomment-1119777605 @wangyang0918 of course, I really appreciate it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingGe commented on a diff in pull request #19664: [hotfix][doc] add content for using numRecordsSend and deprecating numRecordsOutErrors
JingGe commented on code in PR #19664: URL: https://github.com/apache/flink/pull/19664#discussion_r866974112 ## docs/content/release-notes/flink-1.15.md: ## @@ -406,6 +406,13 @@ updating the client dependency to a version >= 7.14.0 is required due to interna The old JDBC connector (indicated by `connector.type=jdbc` in DDL) has been removed. If not done already, users need to upgrade to the newer stack (indicated by `connector=jdbc` in DDL). + [FLINK-26126](https://issues.apache.org/jira/browse/FLINK-26126) Review Comment: Excellent! I forgot that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15
wangyang0918 commented on PR #195: URL: https://github.com/apache/flink-kubernetes-operator/pull/195#issuecomment-1119769786 If you do not mind, I will do some manual tests tomorrow and share the feedback then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15
wangyang0918 commented on code in PR #195: URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866949459 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java: ## @@ -57,7 +55,7 @@ public boolean observe(JobStatus jobStatus, Configuration deployedConfig, CTX ct clusterJobStatuses = new ArrayList<>(flinkService.listJobs(deployedConfig)); } catch (Exception e) { LOG.error("Exception while listing jobs", e); -jobStatus.setState(JOB_STATE_UNKNOWN); + jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name()); Review Comment: We could keep it as now and change later if necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15
wangyang0918 commented on code in PR #195: URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866948431 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java: ## @@ -112,30 +132,31 @@ protected FlinkConfigBuilder applyFlinkConfiguration() { } // Adapt default rest service type from 1.15+ -if (!effectiveConfig.contains(REST_SERVICE_EXPOSED_TYPE)) { -effectiveConfig.set( -REST_SERVICE_EXPOSED_TYPE, -KubernetesConfigOptions.ServiceExposedType.ClusterIP); -} +setDefaultConf( +REST_SERVICE_EXPOSED_TYPE, KubernetesConfigOptions.ServiceExposedType.ClusterIP); if (spec.getJob() != null) { -if (!effectiveConfig.contains(CANCEL_ENABLE)) { -// Set 'web.cancel.enable' to false for application deployments to avoid users -// accidentally cancelling jobs. -effectiveConfig.set(CANCEL_ENABLE, false); -} +// Set 'web.cancel.enable' to false for application deployments to avoid users +// accidentally cancelling jobs. +setDefaultConf(CANCEL_ENABLE, false); + // With last-state upgrade mode, set the default value of // 'execution.checkpointing.interval' // to 5 minutes when HA is enabled. -if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE -&& !effectiveConfig.contains( - ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL)) { -effectiveConfig.set( +if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE) { +setDefaultConf( ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, DEFAULT_CHECKPOINTING_INTERVAL); } + +// We need to keep the application clusters around for proper operator behaviour +effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false); +if (HighAvailabilityMode.isHighAvailabilityModeActivated(effectiveConfig)) { Review Comment: I got it. Multiple jobs could be submitted when HA disabled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15
gyfora commented on code in PR #195: URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866948228 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java: ## @@ -255,4 +259,40 @@ public static boolean shouldRollBack( .minus(readinessTimeout) .isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp())); } + +public static boolean deploymentRecoveryEnabled(Configuration conf) { +return conf.getOptional( + KubernetesOperatorConfigOptions.OPERATOR_RECOVER_JM_DEPLOYMENT_ENABLED) +.orElse( +conf.get(FlinkConfigBuilder.FLINK_VERSION) +.isNewerVersionThan(FlinkVersion.v1_14) +? true +: false); +} + +public static boolean jobManagerMissingForRunningDeployment(FlinkDeploymentStatus status) { +return status.getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState() +== JobState.RUNNING +&& status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING; +} + +public static boolean isJobInTerminalState(FlinkDeploymentStatus status) { +JobManagerDeploymentStatus deploymentStatus = status.getJobManagerDeploymentStatus(); +if (deploymentStatus == JobManagerDeploymentStatus.MISSING) { +return true; +} + +String jobState = status.getJobStatus().getState(); + +return deploymentStatus == JobManagerDeploymentStatus.READY +&& org.apache.flink.api.common.JobStatus.valueOf(jobState).isTerminalState(); Review Comment: Hm I see, I wasn't sure about that. I can change to globallyTerminal -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15
wangyang0918 commented on code in PR #195: URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866945827 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java: ## @@ -255,4 +259,40 @@ public static boolean shouldRollBack( .minus(readinessTimeout) .isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp())); } + +public static boolean deploymentRecoveryEnabled(Configuration conf) { +return conf.getOptional( + KubernetesOperatorConfigOptions.OPERATOR_RECOVER_JM_DEPLOYMENT_ENABLED) +.orElse( +conf.get(FlinkConfigBuilder.FLINK_VERSION) +.isNewerVersionThan(FlinkVersion.v1_14) +? true +: false); +} + +public static boolean jobManagerMissingForRunningDeployment(FlinkDeploymentStatus status) { +return status.getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState() +== JobState.RUNNING +&& status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING; +} + +public static boolean isJobInTerminalState(FlinkDeploymentStatus status) { +JobManagerDeploymentStatus deploymentStatus = status.getJobManagerDeploymentStatus(); +if (deploymentStatus == JobManagerDeploymentStatus.MISSING) { +return true; +} + +String jobState = status.getJobStatus().getState(); + +return deploymentStatus == JobManagerDeploymentStatus.READY +&& org.apache.flink.api.common.JobStatus.valueOf(jobState).isTerminalState(); Review Comment: IIUC, the `SUSPENDED` is not a persistent state and it will transit to other states(e.g. `RUNNING`, `FAILED) finally. For example, the JobManager lost leadership and gain back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15
gyfora commented on code in PR #195: URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866941979 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java: ## @@ -211,6 +220,22 @@ protected boolean isSuspendedJob(FlinkDeployment deployment) { && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED; } +private void onMissingDeployment(FlinkDeployment deployment) { +String err = "Missing JobManager deployment"; +logger.error(err); Review Comment: will do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15
gyfora commented on code in PR #195: URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866941717 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java: ## @@ -173,6 +174,16 @@ private void rollbackApplication(FlinkDeployment flinkApp) throws Exception { flinkApp.getMetadata(), rollbackSpec, rollbackConfig, kubernetesClient); } +private void recoverJmDeployment(FlinkDeployment deployment) throws Exception { +LOG.info("Missing Flink Cluster deployment, trying to recover..."); Review Comment: the name is included in the MDC already -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15
gyfora commented on code in PR #195: URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866940778 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java: ## @@ -57,7 +55,7 @@ public boolean observe(JobStatus jobStatus, Configuration deployedConfig, CTX ct clusterJobStatuses = new ArrayList<>(flinkService.listJobs(deployedConfig)); } catch (Exception e) { LOG.error("Exception while listing jobs", e); -jobStatus.setState(JOB_STATE_UNKNOWN); + jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name()); Review Comment: There are benefits to using one of the Flink JobStatus.states. We don't really care whether the Flink job is actually reconciling (whatever that means) or the operator is reconciling is the flink job that's why I chose it. The name fits :) We can change this later if we feel like it. We could also use simply `null` in these cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15
morhidi commented on code in PR #195: URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866939928 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java: ## @@ -112,30 +132,31 @@ protected FlinkConfigBuilder applyFlinkConfiguration() { } // Adapt default rest service type from 1.15+ -if (!effectiveConfig.contains(REST_SERVICE_EXPOSED_TYPE)) { -effectiveConfig.set( -REST_SERVICE_EXPOSED_TYPE, -KubernetesConfigOptions.ServiceExposedType.ClusterIP); -} +setDefaultConf( +REST_SERVICE_EXPOSED_TYPE, KubernetesConfigOptions.ServiceExposedType.ClusterIP); if (spec.getJob() != null) { -if (!effectiveConfig.contains(CANCEL_ENABLE)) { -// Set 'web.cancel.enable' to false for application deployments to avoid users -// accidentally cancelling jobs. -effectiveConfig.set(CANCEL_ENABLE, false); -} +// Set 'web.cancel.enable' to false for application deployments to avoid users +// accidentally cancelling jobs. +setDefaultConf(CANCEL_ENABLE, false); + // With last-state upgrade mode, set the default value of // 'execution.checkpointing.interval' // to 5 minutes when HA is enabled. -if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE -&& !effectiveConfig.contains( - ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL)) { -effectiveConfig.set( +if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE) { +setDefaultConf( ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, DEFAULT_CHECKPOINTING_INTERVAL); } + +// We need to keep the application clusters around for proper operator behaviour +effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false); +if (HighAvailabilityMode.isHighAvailabilityModeActivated(effectiveConfig)) { Review Comment: ```Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Submission of failed job in case of an application error ('execution.submit-failed-job-on-application-error') is not supported in non-HA setups. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15
morhidi commented on code in PR #195: URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866939928 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java: ## @@ -112,30 +132,31 @@ protected FlinkConfigBuilder applyFlinkConfiguration() { } // Adapt default rest service type from 1.15+ -if (!effectiveConfig.contains(REST_SERVICE_EXPOSED_TYPE)) { -effectiveConfig.set( -REST_SERVICE_EXPOSED_TYPE, -KubernetesConfigOptions.ServiceExposedType.ClusterIP); -} +setDefaultConf( +REST_SERVICE_EXPOSED_TYPE, KubernetesConfigOptions.ServiceExposedType.ClusterIP); if (spec.getJob() != null) { -if (!effectiveConfig.contains(CANCEL_ENABLE)) { -// Set 'web.cancel.enable' to false for application deployments to avoid users -// accidentally cancelling jobs. -effectiveConfig.set(CANCEL_ENABLE, false); -} +// Set 'web.cancel.enable' to false for application deployments to avoid users +// accidentally cancelling jobs. +setDefaultConf(CANCEL_ENABLE, false); + // With last-state upgrade mode, set the default value of // 'execution.checkpointing.interval' // to 5 minutes when HA is enabled. -if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE -&& !effectiveConfig.contains( - ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL)) { -effectiveConfig.set( +if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE) { +setDefaultConf( ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, DEFAULT_CHECKPOINTING_INTERVAL); } + +// We need to keep the application clusters around for proper operator behaviour +effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false); +if (HighAvailabilityMode.isHighAvailabilityModeActivated(effectiveConfig)) { Review Comment: ```Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Submission of failed job in case of an application error ('execution.submit-failed-job-on-applica │ │ tion-error') is not supported in non-HA setups. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15
morhidi commented on code in PR #195: URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866938596 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java: ## @@ -173,6 +174,16 @@ private void rollbackApplication(FlinkDeployment flinkApp) throws Exception { flinkApp.getMetadata(), rollbackSpec, rollbackConfig, kubernetesClient); } +private void recoverJmDeployment(FlinkDeployment deployment) throws Exception { +LOG.info("Missing Flink Cluster deployment, trying to recover..."); Review Comment: MDC adds this I guess, or you meant something else? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15
morhidi commented on code in PR #195: URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866938313 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java: ## @@ -211,6 +220,22 @@ protected boolean isSuspendedJob(FlinkDeployment deployment) { && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED; } +private void onMissingDeployment(FlinkDeployment deployment) { +String err = "Missing JobManager deployment"; +logger.error(err); Review Comment: MDC adds this I guess, or you meant something else? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15
gyfora commented on code in PR #195: URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866937604 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java: ## @@ -255,4 +259,40 @@ public static boolean shouldRollBack( .minus(readinessTimeout) .isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp())); } + +public static boolean deploymentRecoveryEnabled(Configuration conf) { +return conf.getOptional( + KubernetesOperatorConfigOptions.OPERATOR_RECOVER_JM_DEPLOYMENT_ENABLED) +.orElse( +conf.get(FlinkConfigBuilder.FLINK_VERSION) +.isNewerVersionThan(FlinkVersion.v1_14) +? true +: false); +} + +public static boolean jobManagerMissingForRunningDeployment(FlinkDeploymentStatus status) { +return status.getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState() +== JobState.RUNNING +&& status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING; +} + +public static boolean isJobInTerminalState(FlinkDeploymentStatus status) { +JobManagerDeploymentStatus deploymentStatus = status.getJobManagerDeploymentStatus(); +if (deploymentStatus == JobManagerDeploymentStatus.MISSING) { +return true; +} + +String jobState = status.getJobStatus().getState(); + +return deploymentStatus == JobManagerDeploymentStatus.READY +&& org.apache.flink.api.common.JobStatus.valueOf(jobState).isTerminalState(); Review Comment: The only LOCALLY terminal state is SUSPENDED. Which we can also use for updates. So I think `isTerminalState` is correct here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15
gyfora commented on code in PR #195: URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866934861 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java: ## @@ -112,30 +132,31 @@ protected FlinkConfigBuilder applyFlinkConfiguration() { } // Adapt default rest service type from 1.15+ -if (!effectiveConfig.contains(REST_SERVICE_EXPOSED_TYPE)) { -effectiveConfig.set( -REST_SERVICE_EXPOSED_TYPE, -KubernetesConfigOptions.ServiceExposedType.ClusterIP); -} +setDefaultConf( +REST_SERVICE_EXPOSED_TYPE, KubernetesConfigOptions.ServiceExposedType.ClusterIP); if (spec.getJob() != null) { -if (!effectiveConfig.contains(CANCEL_ENABLE)) { -// Set 'web.cancel.enable' to false for application deployments to avoid users -// accidentally cancelling jobs. -effectiveConfig.set(CANCEL_ENABLE, false); -} +// Set 'web.cancel.enable' to false for application deployments to avoid users +// accidentally cancelling jobs. +setDefaultConf(CANCEL_ENABLE, false); + // With last-state upgrade mode, set the default value of // 'execution.checkpointing.interval' // to 5 minutes when HA is enabled. -if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE -&& !effectiveConfig.contains( - ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL)) { -effectiveConfig.set( +if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE) { +setDefaultConf( ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, DEFAULT_CHECKPOINTING_INTERVAL); } + +// We need to keep the application clusters around for proper operator behaviour +effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false); +if (HighAvailabilityMode.isHighAvailabilityModeActivated(effectiveConfig)) { Review Comment: This feature is only enabled when HA is on. setting this config gives an exception otherwise on submission -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-docker] afedulov commented on a diff in pull request #116: Publish snapshot images
afedulov commented on code in PR #116: URL: https://github.com/apache/flink-docker/pull/116#discussion_r866934213 ## .github/workflows/snapshot.yml: ## @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: "Publish SNAPSHOTs" + +on: + # schedule: + # - cron: '0 0 * * *' # Deploy every day + push: +branches: + - snapshot-dev-master + +env: + REGISTRY: ghcr.io + VERSION: 1.16-SNAPSHOT + +jobs: + ci: +uses: ./.github/workflows/ci.yml Review Comment: @zentol I did not know about this limitation. Do you propose to skip the CI step? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-27520) Use admission-controller-framework in Webhook
[ https://issues.apache.org/jira/browse/FLINK-27520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532922#comment-17532922 ] Matyas Orhidi commented on FLINK-27520: --- The released version doesn't appear to be healthy: [https://repo1.maven.org/maven2/io/javaoperatorsdk/admission-controller-framework/0.1.0/] I'll talk to the developers to see what's going on. > Use admission-controller-framework in Webhook > - > > Key: FLINK-27520 > URL: https://issues.apache.org/jira/browse/FLINK-27520 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-0.1.0 >Reporter: Matyas Orhidi >Assignee: Matyas Orhidi >Priority: Major > > Use the released > [https://github.com/java-operator-sdk/admission-controller-framework] > instead of borrowed source codes in the Webhook module. > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27499) Bump base Flink version to 1.15.0
[ https://issues.apache.org/jira/browse/FLINK-27499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532920#comment-17532920 ] Yang Wang commented on FLINK-27499: --- What I mean is we could improve the e2e tests to cover all the supported version first. This will guard the behavior is not broken after bumping the bundled flink version to 1.15. > Bump base Flink version to 1.15.0 > - > > Key: FLINK-27499 > URL: https://issues.apache.org/jira/browse/FLINK-27499 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.0.0 >Reporter: Gyula Fora >Priority: Blocker > > With the 1.15.0 release out, we should bump our Flink dependency to 1.15 if > this does not interfere with the 1.14 compatibility. > [~wangyang0918] what do you think? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #195: [FLINK-27468] Recover missing deployments and other cancel/upgrade improvements for 1.15
wangyang0918 commented on code in PR #195: URL: https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866602275 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java: ## @@ -57,7 +55,7 @@ public boolean observe(JobStatus jobStatus, Configuration deployedConfig, CTX ct clusterJobStatuses = new ArrayList<>(flinkService.listJobs(deployedConfig)); } catch (Exception e) { LOG.error("Exception while listing jobs", e); -jobStatus.setState(JOB_STATE_UNKNOWN); + jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name()); Review Comment: I still hesitate to replace the `UNKNOWN` with `RECONCILING` since we could not differentiate whether the job is really reconciling or just failing to get the job status. ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java: ## @@ -173,6 +174,16 @@ private void rollbackApplication(FlinkDeployment flinkApp) throws Exception { flinkApp.getMetadata(), rollbackSpec, rollbackConfig, kubernetesClient); } +private void recoverJmDeployment(FlinkDeployment deployment) throws Exception { +LOG.info("Missing Flink Cluster deployment, trying to recover..."); Review Comment: We might also need to print the FlinkDeployment name here. ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java: ## @@ -112,30 +132,31 @@ protected FlinkConfigBuilder applyFlinkConfiguration() { } // Adapt default rest service type from 1.15+ -if (!effectiveConfig.contains(REST_SERVICE_EXPOSED_TYPE)) { -effectiveConfig.set( -REST_SERVICE_EXPOSED_TYPE, -KubernetesConfigOptions.ServiceExposedType.ClusterIP); -} +setDefaultConf( +REST_SERVICE_EXPOSED_TYPE, KubernetesConfigOptions.ServiceExposedType.ClusterIP); if (spec.getJob() != null) { -if (!effectiveConfig.contains(CANCEL_ENABLE)) { -// Set 'web.cancel.enable' to false for application deployments to avoid users -// accidentally cancelling jobs. -effectiveConfig.set(CANCEL_ENABLE, false); -} +// Set 'web.cancel.enable' to false for application deployments to avoid users +// accidentally cancelling jobs. +setDefaultConf(CANCEL_ENABLE, false); + // With last-state upgrade mode, set the default value of // 'execution.checkpointing.interval' // to 5 minutes when HA is enabled. -if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE -&& !effectiveConfig.contains( - ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL)) { -effectiveConfig.set( +if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE) { +setDefaultConf( ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, DEFAULT_CHECKPOINTING_INTERVAL); } + +// We need to keep the application clusters around for proper operator behaviour +effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false); +if (HighAvailabilityMode.isHighAvailabilityModeActivated(effectiveConfig)) { Review Comment: Why do we not need to submit a failed job when HA disabled? ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java: ## @@ -211,6 +220,22 @@ protected boolean isSuspendedJob(FlinkDeployment deployment) { && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED; } +private void onMissingDeployment(FlinkDeployment deployment) { +String err = "Missing JobManager deployment"; +logger.error(err); +Event event = +DeploymentFailedException.asEvent( +new DeploymentFailedException( + DeploymentFailedException.COMPONENT_JOBMANAGER, "Error", err), +deployment); +kubernetesClient Review Comment: The age of the missing deloyment event is unknown. ``` Events: Type Reason AgeFrom Message -- --- ErrorMissing JobManager deploymentJobManagerDeployment Missing JobManager deployment ``` ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java: ## @@ -211,6 +220,22 @@
[GitHub] [flink] TennyZhuang commented on pull request #19665: [hotfix][doc] Fix the mismatch result in Window Join example
TennyZhuang commented on PR #19665: URL: https://github.com/apache/flink/pull/19665#issuecomment-1119711845 > Could you help to update the Chinese documentation which is located in `docs/content.zh/docs/dev/table/sql/queries/window-join.md`? fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #19665: [hotfix] [doc] Fix the mismatch result in Window Join example
flinkbot commented on PR #19665: URL: https://github.com/apache/flink/pull/19665#issuecomment-1119689445 ## CI report: * 0b519b6fa7ebd3a6fa33e9393ac29355f27cf01e UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TennyZhuang opened a new pull request, #19665: [hotfix] [doc] Fix the mismatch result in Window Join example
TennyZhuang opened a new pull request, #19665: URL: https://github.com/apache/flink/pull/19665 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* The result of FULL JOIN example is incorrect, `L.window_start` and `L.window_end` will become NULL while join condition not matched. Use `COALESCE` to ensure that every entry of resultset has non-null `window_start` and `window_end` value. ## Brief change log ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: No ## Documentation - Does this pull request introduce a new feature? (yes / **no**) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fapaul commented on a diff in pull request #19664: [hotfix][doc] add content for using numRecordsSend and deprecating numRecordsOutErrors
fapaul commented on code in PR #19664: URL: https://github.com/apache/flink/pull/19664#discussion_r866877842 ## docs/content/release-notes/flink-1.15.md: ## @@ -406,6 +406,13 @@ updating the client dependency to a version >= 7.14.0 is required due to interna The old JDBC connector (indicated by `connector.type=jdbc` in DDL) has been removed. If not done already, users need to upgrade to the newer stack (indicated by `connector=jdbc` in DDL). + [FLINK-26126](https://issues.apache.org/jira/browse/FLINK-26126) + +New metric `numRecordsSend` has been introduced to denote the number of records sent to the external system +while the numRecordsOut will be used to denote the number of records transferred between tasks. Review Comment: ```suggestion while the numRecordsOut will be used to denote the number of records transferred between sink tasks. ``` ## docs/content/release-notes/flink-1.15.md: ## @@ -406,6 +406,13 @@ updating the client dependency to a version >= 7.14.0 is required due to interna The old JDBC connector (indicated by `connector.type=jdbc` in DDL) has been removed. If not done already, users need to upgrade to the newer stack (indicated by `connector=jdbc` in DDL). + [FLINK-26126](https://issues.apache.org/jira/browse/FLINK-26126) Review Comment: I think it would be good to add a separate headline as "Extensible unified Sink uses new metric to capture outgoing records". ## docs/content/release-notes/flink-1.15.md: ## @@ -406,6 +406,13 @@ updating the client dependency to a version >= 7.14.0 is required due to interna The old JDBC connector (indicated by `connector.type=jdbc` in DDL) has been removed. If not done already, users need to upgrade to the newer stack (indicated by `connector=jdbc` in DDL). + [FLINK-26126](https://issues.apache.org/jira/browse/FLINK-26126) + +New metric `numRecordsSend` has been introduced to denote the number of records sent to the external system +while the numRecordsOut will be used to denote the number of records transferred between tasks. + +Considering naming consistency, `numRecordsOutErrors` is deprecated, please use `numRecordsSendErrors` instead. Review Comment: WDYT about adding two different paragraphs? The first describes the perspective of a connector user that now needs to use `numRecordsSend` `numRecordsSendErrors` to monitor the records going to the external system. The second paragraph holds information for connector developers that have to eventually update their connectors to use the new metrics because `numRecordsOurErrors` is deprecated and `numRecordsOut` has a different meaning. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] dependabot[bot] commented on pull request #81: Bump log4j-core from 2.17.0 to 2.17.1
dependabot[bot] commented on PR #81: URL: https://github.com/apache/flink-ml/pull/81#issuecomment-1119681778 OK, I won't notify you again about this release, but will get in touch when a new version is available. If you'd rather skip all updates until the next major or minor version, let me know by commenting `@dependabot ignore this major version` or `@dependabot ignore this minor version`. If you change your mind, just re-open this PR and I'll resolve any conflicts on it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] lindong28 commented on pull request #81: Bump log4j-core from 2.17.0 to 2.17.1
lindong28 commented on PR #81: URL: https://github.com/apache/flink-ml/pull/81#issuecomment-1119681735 The change has been made in https://github.com/apache/flink-ml/pull/81. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] lindong28 closed pull request #81: Bump log4j-core from 2.17.0 to 2.17.1
lindong28 closed pull request #81: Bump log4j-core from 2.17.0 to 2.17.1 URL: https://github.com/apache/flink-ml/pull/81 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] dependabot[bot] closed pull request #84: Bump hadoop-common from 2.4.1 to 3.2.3 in /flink-ml-iteration
dependabot[bot] closed pull request #84: Bump hadoop-common from 2.4.1 to 3.2.3 in /flink-ml-iteration URL: https://github.com/apache/flink-ml/pull/84 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] dependabot[bot] commented on pull request #84: Bump hadoop-common from 2.4.1 to 3.2.3 in /flink-ml-iteration
dependabot[bot] commented on PR #84: URL: https://github.com/apache/flink-ml/pull/84#issuecomment-1119678290 Looks like org.apache.hadoop:hadoop-common is no longer updatable, so this is no longer needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] dependabot[bot] closed pull request #85: Bump hadoop-common from 2.4.1 to 3.2.3 in /flink-ml-core
dependabot[bot] closed pull request #85: Bump hadoop-common from 2.4.1 to 3.2.3 in /flink-ml-core URL: https://github.com/apache/flink-ml/pull/85 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] dependabot[bot] commented on pull request #85: Bump hadoop-common from 2.4.1 to 3.2.3 in /flink-ml-core
dependabot[bot] commented on PR #85: URL: https://github.com/apache/flink-ml/pull/85#issuecomment-1119678250 Looks like org.apache.hadoop:hadoop-common is no longer updatable, so this is no longer needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] zhipeng93 commented on pull request #94: [hotfix] Bump dependency versions
zhipeng93 commented on PR #94: URL: https://github.com/apache/flink-ml/pull/94#issuecomment-1119673384 Thanks for the PR. LGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] zhipeng93 merged pull request #94: [hotfix] Bump dependency versions
zhipeng93 merged PR #94: URL: https://github.com/apache/flink-ml/pull/94 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #192: [FLINK-27097] Document custom validator implementations
gyfora commented on code in PR #192: URL: https://github.com/apache/flink-kubernetes-operator/pull/192#discussion_r866867020 ## docs/content/docs/operations/validator.md: ## @@ -0,0 +1,85 @@ +--- +title: "Validator" +weight: 5 +type: docs +aliases: +- /operations/validator.html +--- + + +# Custom `FlinkResourceValidator` implementation + +`FlinkResourceValidator`, an interface for validating the resources of `FlinkDeployment` and `FlinkSessionJob`, is a pluggable component based on the [Plugins](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins) mechanism. During development, we can custom the implementation of `FlinkResourceValidator` and make sure to retain the service definition in `META-INF/services`. Review Comment: typo: `we can custom the implementation` -> `we can customize the implementation` ## docs/content/docs/operations/validator.md: ## @@ -0,0 +1,85 @@ +--- +title: "Validator" +weight: 5 +type: docs +aliases: +- /operations/validator.html +--- + + +# Custom `FlinkResourceValidator` implementation + +`FlinkResourceValidator`, an interface for validating the resources of `FlinkDeployment` and `FlinkSessionJob`, is a pluggable component based on the [Plugins](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins) mechanism. During development, we can custom the implementation of `FlinkResourceValidator` and make sure to retain the service definition in `META-INF/services`. +The following steps demonstrate how to develop and use a custom validator. + +1. Implement `FlinkResourceValidator` interface: +```java +package org.apache.flink.kubernetes.operator.validation; + +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob; + +import java.util.Optional; + +/** Custom validator implementation of {@link FlinkResourceValidator}. */ +public class CustomValidator implements FlinkResourceValidator { + +@Override +public Optional validateDeployment(FlinkDeployment deployment) { +return Optional.empty(); Review Comment: Can we demonstrate some actual custom validation here? A simple if check with an error would be enough -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fapaul commented on pull request #19658: [FLINK-27480][kafka] Explain possible metrics InstanceAlreadyExistsException in docs
fapaul commented on PR #19658: URL: https://github.com/apache/flink/pull/19658#issuecomment-1119659178 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-27262) Enrich validator for FlinkSessionJob
[ https://issues.apache.org/jira/browse/FLINK-27262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-27262. -- Resolution: Fixed merged to main: 6ff7cf31c61d50a1459b518cfae86fc65e3c5810 > Enrich validator for FlinkSessionJob > > > Key: FLINK-27262 > URL: https://issues.apache.org/jira/browse/FLINK-27262 > Project: Flink > Issue Type: Sub-task > Components: Kubernetes Operator >Reporter: Yang Wang >Assignee: Aitozi >Priority: Major > Labels: pull-request-available > > We need to enrich the validator to cover FlinkSesionJob. > At least we could have the following rules. > * Upgrade mode is savepoint, then {{state.savepoints.dir}} should be > configured in session FlinkDeployment -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-kubernetes-operator] gyfora merged pull request #184: [FLINK-27262] Enrich validator for FlinkSessionJob
gyfora merged PR #184: URL: https://github.com/apache/flink-kubernetes-operator/pull/184 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] RocMarshal commented on a diff in pull request #18983: [FLINK-25543][flink-yarn] [JUnit5 Migration] Module: flink-yarn
RocMarshal commented on code in PR #18983: URL: https://github.com/apache/flink/pull/18983#discussion_r866856860 ## flink-yarn/src/test/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourcePriorityAdapterTest.java: ## @@ -206,40 +199,49 @@ public void testExternalResource() { final Map resultExternalResources = ResourceInformationReflector.INSTANCE.getExternalResources(resource); -assertThat(resultExternalResources.size(), is(1)); -assertThat( - resultExternalResources.get(SUPPORTED_EXTERNAL_RESOURCE_CONFIG_KEY), -is(SUPPORTED_EXTERNAL_RESOURCE_MAX)); +assertThat(resultExternalResources).hasSize(1); + assertThat(resultExternalResources.get(SUPPORTED_EXTERNAL_RESOURCE_CONFIG_KEY)) +.isEqualTo(SUPPORTED_EXTERNAL_RESOURCE_MAX); } -@Test(expected = IllegalStateException.class) -public void testExternalResourceFailExceedMax() { +@Test +void testExternalResourceFailExceedMax() { assumeTrue(isExternalResourceSupported()); - -getAdapterWithExternalResources( -SUPPORTED_EXTERNAL_RESOURCE_NAME, SUPPORTED_EXTERNAL_RESOURCE_CONFIG_KEY) -.getPriorityAndResource( - TASK_EXECUTOR_PROCESS_SPEC_WITH_EXTERNAL_RESOURCE_EXCEED_MAX); +assertThatThrownBy( +() -> +getAdapterWithExternalResources( + SUPPORTED_EXTERNAL_RESOURCE_NAME, + SUPPORTED_EXTERNAL_RESOURCE_CONFIG_KEY) Review Comment: Done. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] CrynetLogistics commented on a diff in pull request #517: [FLINK-24370] Addition of blog post describing features and usage of the Async Sink…
CrynetLogistics commented on code in PR #517: URL: https://github.com/apache/flink-web/pull/517#discussion_r866853715 ## _posts/2022-03-16-async-sink-base.md: ## @@ -0,0 +1,169 @@ +--- +layout: post +title: "The Generic Asynchronous Base Sink" +date: 2022-04-30 16:00:00 +authors: +- CrynetLogistics: + name: "Zichen Liu" +excerpt: An overview of the new AsyncBaseSink and how to use it for building your own concrete sink +--- + +Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing. + +This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), an abstract sink with a number of common functionalities extracted. + +This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn't offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests. + +This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of `AsyncSinkBase` focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies. + +The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests. + +In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink. + +{% toc %} + +# Adding the base sink as a dependency + +In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax: + +```xml + + org.apache.flink + flink-connector-base + ${flink.version} + +``` + +# The Public Interfaces of AsyncSinkBase + +## Generic Types + +`` – type of elements in a DataStream that should be passed to the sink + +`` – type of a payload containing the element and additional metadata that is required to submit a single element to the destination + + +## Element Converter Interface + +[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java) + +```java +public interface ElementConverter extends Serializable { +RequestEntryT apply(InputT element, SinkWriter.Context context); +} +``` +The concrete sink implementation should provide a way to convert from an element in the DataStream to the payload type that contains all the additional metadata required to submit that element to the destination by the sink. Ideally, this would be encapsulated from the end user since it allows concrete sink implementers to adapt to changes in the destination API without breaking end user code. + +## Sink Writer Interface + +[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java) + +```java +public abstract class AsyncSinkWriter +implements StatefulSink.StatefulSinkWriter> { +// ... +protected abstract void submitRequestEntries( +List requestEntries, Consumer> requestResult); Review Comment: https://issues.apache.org/jira/browse/FLINK-27536 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-27536) Rename method parameter in AsyncSinkWriter
Zichen Liu created FLINK-27536: -- Summary: Rename method parameter in AsyncSinkWriter Key: FLINK-27536 URL: https://issues.apache.org/jira/browse/FLINK-27536 Project: Flink Issue Type: Improvement Components: Connectors / Common Affects Versions: 1.15.0 Reporter: Zichen Liu Change the abstract method's parameter naming in AsyncSinkWriter. From Consumer> requestResult to Consumer> requestToRetry or something similar. This is because the consumer here is supposed to accept a list of requests that need to be retried. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] rovboyko commented on pull request #19648: [FLINK-27438][table-planner] Fix the constructing a map array
rovboyko commented on PR #19648: URL: https://github.com/apache/flink/pull/19648#issuecomment-1119639236 > @MartijnVisser Would be great to backport it to 1.15 and I guess 1.14 as well. Yes, we discussed it, I'll do it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-27528) Introduce a new configuration option 'compact.rescale-bucket' for FileStore
[ https://issues.apache.org/jira/browse/FLINK-27528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-27528. Assignee: Jane Chan Resolution: Fixed master: d0576e1bbd7f798d68db398593c8368376743203 > Introduce a new configuration option 'compact.rescale-bucket' for FileStore > --- > > Key: FLINK-27528 > URL: https://issues.apache.org/jira/browse/FLINK-27528 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Affects Versions: table-store-0.2.0 >Reporter: Jane Chan >Assignee: Jane Chan >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.2.0 > > > This config key controls the behavior for {{{}ALTER TABLE ... COMPACT{}}}. > When {{compact.rescale-bucket}} is false, it indicates the compaction will > rewrite data according to the bucket number, which is read from manifest > meta. The commit will only add/delete files; o.w. it suggests the compaction > will read bucket number from catalog meta, and the commit will overwrite the > whole partition directory. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] fapaul commented on pull request #19649: [FLINK-27487][kafka] Only forward measurable Kafka metrics and ignore others
fapaul commented on PR #19649: URL: https://github.com/apache/flink/pull/19649#issuecomment-1119628108 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi merged pull request #109: [FLINK-27528] Introduce a new configuration option 'compact.rescale-bucket' for FileStore
JingsongLi merged PR #109: URL: https://github.com/apache/flink-table-store/pull/109 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] hililiwei commented on pull request #17798: [hotfix] Fix comment typos in AsyncIOExample
hililiwei commented on PR #17798: URL: https://github.com/apache/flink/pull/17798#issuecomment-1119617629 I edited the description and title of PR again. Please take a look. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #19664: [hotfix][doc] add content for using numRecordsSend and deprecating numRecordsOutErrors
flinkbot commented on PR #19664: URL: https://github.com/apache/flink/pull/19664#issuecomment-1119609351 ## CI report: * d7a71c91e7c3ec0799b12a33694715ac8ccedfcb UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingGe opened a new pull request, #19664: [hotfix][doc] add content for using numRecordsSend and deprecating numRecordsOutErrors
JingGe opened a new pull request, #19664: URL: https://github.com/apache/flink/pull/19664 ## What is the purpose of the change This PR will let Flink be aware of changes of metrics e.g. `numRecordsSend`, `numRecordsOutErrors`, etc. ## Brief change log - add the description of using `numRecordsSend` and deprecating `numRecordsOutErrors` GitHub PR ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] alpreu commented on a diff in pull request #19552: [FLINK-27185][connectors][formats] Convert format modules to assertj
alpreu commented on code in PR #19552: URL: https://github.com/apache/flink/pull/19552#discussion_r866808922 ## flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfigurationTest.java: ## @@ -55,8 +57,8 @@ public void customPropertiesSurviveSerializationDeserialization() final SerializableHadoopConfiguration deserializableConfigUnderTest = deserializeAndGetConfiguration(serializedConfigUnderTest); -Assert.assertThat( -deserializableConfigUnderTest.get(), hasTheSamePropertiesAs(configuration)); +assertThat(deserializableConfigUnderTest.get()) +.satisfies(matching(hasTheSamePropertiesAs(configuration))); Review Comment: As discussed offline, we decided to keep it as is because converting the Matchers into AssertJ Assertion classes introduces a lot of boilerplate and feels very clunky -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] dannycranmer commented on a diff in pull request #517: [FLINK-24370] Addition of blog post describing features and usage of the Async Sink…
dannycranmer commented on code in PR #517: URL: https://github.com/apache/flink-web/pull/517#discussion_r866808673 ## _posts/2022-03-16-async-sink-base.md: ## @@ -0,0 +1,169 @@ +--- +layout: post +title: "The Generic Asynchronous Base Sink" +date: 2022-04-30 16:00:00 +authors: +- CrynetLogistics: + name: "Zichen Liu" +excerpt: An overview of the new AsyncBaseSink and how to use it for building your own concrete sink +--- + +Flink sinks share a lot of similar behavior. Most sinks batch records according to user-defined buffering hints, sign requests, write them to the destination, retry unsuccessful or throttled requests, and participate in checkpointing. + +This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` (FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink), an abstract sink with a number of common functionalities extracted. + +This is a base implementation for asynchronous sinks, which you should use whenever you need to implement a sink that doesn't offer transactional capabilities. Adding support for a new destination now only requires a lightweight shim that implements the specific interfaces of the destination using a client that supports async requests. + +This common abstraction will reduce the effort required to maintain individual sinks that extend from this abstract sink, with bug fixes and improvements to the sink core benefiting all implementations that extend it. The design of `AsyncSinkBase` focuses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector-specific dependencies. + +The sink base is designed to participate in checkpointing to provide at-least-once semantics and can work directly with destinations that provide a client that supports asynchronous requests. + +In this post, we will go over the details of the AsyncSinkBase so that you can start using it to build your own concrete sink. + +{% toc %} + +# Adding the base sink as a dependency + +In order to use the base sink, you will need to add the following dependency to your project. The example below follows the Maven syntax: + +```xml + + org.apache.flink + flink-connector-base + ${flink.version} + +``` + +# The Public Interfaces of AsyncSinkBase + +## Generic Types + +`` – type of elements in a DataStream that should be passed to the sink + +`` – type of a payload containing the element and additional metadata that is required to submit a single element to the destination + + +## Element Converter Interface + +[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java) + +```java +public interface ElementConverter extends Serializable { +RequestEntryT apply(InputT element, SinkWriter.Context context); +} +``` +The concrete sink implementation should provide a way to convert from an element in the DataStream to the payload type that contains all the additional metadata required to submit that element to the destination by the sink. Ideally, this would be encapsulated from the end user since it allows concrete sink implementers to adapt to changes in the destination API without breaking end user code. + +## Sink Writer Interface + +[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java) + +```java +public abstract class AsyncSinkWriter +implements StatefulSink.StatefulSinkWriter> { +// ... +protected abstract void submitRequestEntries( +List requestEntries, Consumer> requestResult); +// ... +} +``` + +In this method, sink implementers should use the destination clients to submit `requestEntries` asynchronously to be written. + +Should any elements fail to be persisted, they should be requeued back in the buffer for retry using `requestResult.accept(...list of failed entries...)`. However, retrying any element that is known to be faulty and consistently failing, will result in that element being requeued forever, therefore a sensible strategy for determining what should be retried is highly recommended. If no errors were returned, we must indicate this with `requestResult.accept(Collections.emptyList())`. + +If at any point, it is determined that a fatal error has occurred and that we should throw a runtime exception from the sink, we can call `getFatalExceptionCons().accept(...);` from anywhere in the concrete sink writer. + +```java +public abstract class AsyncSinkWriter Review Comment: Java serialization is not actually used in the `AsyncSinkWriterStateSerializer`. It was in the original PR, which was rejected and subsequently reimplemented. Looks like `extends Serializable` was then not removed, which it should have been. This is a smell in the interface, since this
[GitHub] [flink] onyourhead commented on pull request #19628: [FLINK-27174][connector/kafka] Fix checking of bootstrapServers when already provided in producer Properties
onyourhead commented on PR #19628: URL: https://github.com/apache/flink/pull/19628#issuecomment-1119595202 Because there are some commits behind the master branch, the code of the backport is a little different. And `KafkaSinkBuilderTest` only exists in the master branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on pull request #19586: [FLINK-26824] [FLINK-27457] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions and support flush
echauchot commented on PR #19586: URL: https://github.com/apache/flink/pull/19586#issuecomment-1119594598 > Would you mind moving the flushing changes into another PR? I could be in that case the uncovered race condition won't be fixed. So the output formats tests would fail on cassandra 4.0 between the two PR merges. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #19663: [BP-1.15][FLINK-27174][connector/kafka] Fix checking of bootstrapServers when already provided in producer Properties
flinkbot commented on PR #19663: URL: https://github.com/apache/flink/pull/19663#issuecomment-1119593719 ## CI report: * 02a7c0597eab389212fe01fc4ece234e26ee9add UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #19662: [BP-1.14][FLINK-27174][connector/kafka] Fix checking of bootstrapServers when already provided in producer Properties
flinkbot commented on PR #19662: URL: https://github.com/apache/flink/pull/19662#issuecomment-1119593626 ## CI report: * 7284f2cf67175ec15e51bb096c3db9985a4b87b8 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] matriv commented on pull request #19648: [FLINK-27438][table-planner] Fix the constructing a map array
matriv commented on PR #19648: URL: https://github.com/apache/flink/pull/19648#issuecomment-1119593074 @MartijnVisser Would be great to backport it to 1.15 and I guess 1.14 as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] onyourhead opened a new pull request, #19663: [BP-1.15][FLINK-27174][connector/kafka] Fix checking of bootstrapServers when already provided in producer Properties
onyourhead opened a new pull request, #19663: URL: https://github.com/apache/flink/pull/19663 This is a backport PR for #19628 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] onyourhead opened a new pull request, #19662: [BP-1.14][FLINK-27174][connector/kafka] Fix checking of bootstrapServers when already provided in producer Properties
onyourhead opened a new pull request, #19662: URL: https://github.com/apache/flink/pull/19662 This is a backport PR for #19628 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] HuangZhenQiu commented on pull request #14678: [FLINK-20833][runtime] Add pluggable failure listener in job manager
HuangZhenQiu commented on PR #14678: URL: https://github.com/apache/flink/pull/14678#issuecomment-1119584546 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rovboyko commented on pull request #19648: [FLINK-27438][table-planner] Fix the constructing a map array
rovboyko commented on PR #19648: URL: https://github.com/apache/flink/pull/19648#issuecomment-1119581469 @matriv Thank you for such thorough review! I've fixed all and resolved all discussions. Could you please take a look once again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-27492) Flink table scala example does not including the scala-api jars
[ https://issues.apache.org/jira/browse/FLINK-27492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther reassigned FLINK-27492: Assignee: Timo Walther > Flink table scala example does not including the scala-api jars > --- > > Key: FLINK-27492 > URL: https://issues.apache.org/jira/browse/FLINK-27492 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.15.0, 1.16.0 >Reporter: Yun Gao >Assignee: Timo Walther >Priority: Critical > Fix For: 1.16.0, 1.15.1 > > > Currently it seems the flink-scala-api, flink-table-api-scala-bridge is not > including from the binary package[1]. However, currently the scala table > examples seems not include the scala-api classes in the generated jar, If we > start a standalone cluster from the binary distribution package and then > submit a table / sql job in scala, it would fail due to not found the > StreamTableEnvironment class. > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/configuration/advanced/#anatomy-of-table-dependencies -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] rovboyko commented on a diff in pull request #19648: [FLINK-27438][table-planner] Fix the constructing a map array
rovboyko commented on code in PR #19648: URL: https://github.com/apache/flink/pull/19648#discussion_r866791617 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala: ## @@ -187,4 +187,13 @@ class CalcTest extends TableTestBase { def testDecimalMapWithDifferentPrecision(): Unit = { util.verifyExecPlan("SELECT MAP['a', 0.12, 'b', 0.5]") } + + @Test + def testCastMapValueInsideArray(): Unit = { +util.verifyExecPlan( Review Comment: added ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala: ## @@ -181,4 +181,13 @@ class CalcTest extends TableTestBase { def testDecimalMapWithDifferentPrecision(): Unit = { util.verifyExecPlan("SELECT MAP['a', 0.12, 'b', 0.5]") } + + @Test + def testCastMapValueInsideArray(): Unit = { +util.verifyExecPlan( Review Comment: added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] zentol merged pull request #517: [FLINK-24370] Addition of blog post describing features and usage of the Async Sink…
zentol merged PR #517: URL: https://github.com/apache/flink-web/pull/517 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rovboyko commented on a diff in pull request #19648: [FLINK-27438][table-planner] Fix the constructing a map array
rovboyko commented on code in PR #19648: URL: https://github.com/apache/flink/pull/19648#discussion_r866791321 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/utils/SqlValidatorUtils.java: ## @@ -76,9 +80,27 @@ private static void adjustTypeForMultisetConstructor( } private static SqlNode castTo(SqlNode node, RelDataType type) { -return SqlStdOperatorTable.CAST.createCall( -SqlParserPos.ZERO, -node, - SqlTypeUtil.convertTypeToSpec(type).withNullable(type.isNullable())); +SqlDataTypeSpec dataTypeSpec; Review Comment: fixed ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/utils/SqlValidatorUtils.java: ## @@ -76,9 +80,27 @@ private static void adjustTypeForMultisetConstructor( } private static SqlNode castTo(SqlNode node, RelDataType type) { -return SqlStdOperatorTable.CAST.createCall( -SqlParserPos.ZERO, -node, - SqlTypeUtil.convertTypeToSpec(type).withNullable(type.isNullable())); +SqlDataTypeSpec dataTypeSpec; +if (SqlTypeUtil.isMap(type)) { +dataTypeSpec = getMapSqlDataTypeSpec(type); +} else { +dataTypeSpec = SqlTypeUtil.convertTypeToSpec(type).withNullable(type.isNullable()); +} + +return SqlStdOperatorTable.CAST.createCall(SqlParserPos.ZERO, node, dataTypeSpec); +} + +private static SqlDataTypeSpec getMapSqlDataTypeSpec(RelDataType type) { +SqlDataTypeSpec dataTypeSpec; Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27535) Optimize the unit test execution time
[ https://issues.apache.org/jira/browse/FLINK-27535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated FLINK-27535: - Affects Version/s: (was: ml-2.1.0) > Optimize the unit test execution time > - > > Key: FLINK-27535 > URL: https://issues.apache.org/jira/browse/FLINK-27535 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Dong Lin >Assignee: Yunfeng Zhou >Priority: Critical > > Currently `mvn package` takes 10 minutes to complete in Github actions. A lot > of time is spent in running unit tests for algorithms. For example, > LogisticRegressionTest takes 82 seconds and KMeansTest takes 43 seconds in > [1]. > This time appears to be more than expected. And it will considerably reduce > developer velocity if a developer needs to wait for hours to get test results > once we have 100+ algorithms in Flink ML. > We should understand why it takes 82 seconds to run e.g. > LogisticRegressionTest and see if there is a way to optimize the test > execution time. > [1] https://github.com/apache/flink-ml/runs/6319402103?check_suite_focus=true. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27535) Optimize the unit test execution time
[ https://issues.apache.org/jira/browse/FLINK-27535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated FLINK-27535: - Fix Version/s: ml-2.1.0 > Optimize the unit test execution time > - > > Key: FLINK-27535 > URL: https://issues.apache.org/jira/browse/FLINK-27535 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Dong Lin >Assignee: Yunfeng Zhou >Priority: Critical > Fix For: ml-2.1.0 > > > Currently `mvn package` takes 10 minutes to complete in Github actions. A lot > of time is spent in running unit tests for algorithms. For example, > LogisticRegressionTest takes 82 seconds and KMeansTest takes 43 seconds in > [1]. > This time appears to be more than expected. And it will considerably reduce > developer velocity if a developer needs to wait for hours to get test results > once we have 100+ algorithms in Flink ML. > We should understand why it takes 82 seconds to run e.g. > LogisticRegressionTest and see if there is a way to optimize the test > execution time. > [1] https://github.com/apache/flink-ml/runs/6319402103?check_suite_focus=true. -- This message was sent by Atlassian Jira (v8.20.7#820007)