[jira] [Comment Edited] (FLINK-22663) Release YARN resource very slow when cancel the job after some NodeManagers shutdown
[ https://issues.apache.org/jira/browse/FLINK-22663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17346599#comment-17346599 ] Jinhong Liu edited comment on FLINK-22663 at 5/18/21, 5:53 AM: --- [~fly_in_gis] Firstly, this issue occurs just at least one TaskManger is running on the Dead NoManager. Secondly, when the issue occurs, all the containers include the AppMaster cannot exit, not only the containers on the Dead NodeManager. Finally, I find a configuration that can help containers exit quickly, _taskmanager.registration.timeout_, the default value is 5 min. If I set it to 1 min, the containers can exit one minute later, but the AppMaster still needs about 10 mins to exit. was (Author: jinhongliu): [~fly_in_gis] Firstly, this issue occurs just at least one TaskManger is running on the Dead NoManager. Secondly, when the issue occurs, all the containers include the AppMaster cannot exit, not only the containers on the Dead NodeManager. Finally, I find a configuration that can help containers exit quickly, _taskmanager.registration.timeout_, the default value is 5 min. If I set it to 1 min, the containers exit after one minute later, but the AppMaster still need about 10 mins to exit. > Release YARN resource very slow when cancel the job after some NodeManagers > shutdown > > > Key: FLINK-22663 > URL: https://issues.apache.org/jira/browse/FLINK-22663 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.12.2 >Reporter: Jinhong Liu >Priority: Major > Labels: YARN > > When I test flink on YARN, there is a case that may cause some problems. > Hadoop Version: 2.7.3 > Flink Version: 1.12.2 > I deploy a flink job on YARN, when the job is running I stop one NodeManager, > after one or two minutes, the job is auto recovered. But in this situation, > if I cancel the job, the containers cannot be released immediately, there are > still some containers that are running include the app master. About 5 > minutes later, these containers exit, and about 10 minutes later the app > master exit. > I check the log of app master, seems it try to stop the containers on the > NodeManger which I have already stopped. > {code:java} > 2021-05-14 06:15:17,389 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job class > tv.freewheel.reporting.fastlane.Fastlane$ (da883ab39a7a82e4d45a3803bc77dd6f) > switched from state CANCELLING to CANCELED. > 2021-05-14 06:15:17,389 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Stopping > checkpoint coordinator for job da883ab39a7a82e4d45a3803bc77dd6f. > 2021-05-14 06:15:17,390 INFO > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - > Shutting down > 2021-05-14 06:15:17,408 INFO > org.apache.flink.runtime.dispatcher.MiniDispatcher [] - Job > da883ab39a7a82e4d45a3803bc77dd6f reached globally terminal state CANCELED. > 2021-05-14 06:15:17,409 INFO > org.apache.flink.runtime.dispatcher.MiniDispatcher [] - Shutting > down cluster with state CANCELED, jobCancelled: true, executionMode: DETACHED > 2021-05-14 06:15:17,409 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting > YarnJobClusterEntrypoint down with application status CANCELED. Diagnostics > null. > 2021-05-14 06:15:17,409 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting > down rest endpoint. > 2021-05-14 06:15:17,420 INFO org.apache.flink.runtime.jobmaster.JobMaster > [] - Stopping the JobMaster for job class > tv.freewheel.reporting.fastlane.Fastlane$(da883ab39a7a82e4d45a3803bc77dd6f). > 2021-05-14 06:15:17,422 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing > cache directory > /tmp/flink-web-af72a00c-0ddd-4e5e-a62c-8244d6caa552/flink-web-ui > 2021-05-14 06:15:17,432 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - > http://ip-10-23-19-197.ec2.internal:43811 lost leadership > 2021-05-14 06:15:17,432 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down > complete. > 2021-05-14 06:15:17,436 INFO > org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - > Shut down cluster because application is in CANCELED, diagnostics null. > 2021-05-14 06:15:17,436 INFO org.apache.flink.yarn.YarnResourceManagerDriver > [] - Unregister application from the YARN Resource Manager with > final status KILLED. > 2021-05-14 06:15:17,458 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Suspending > SlotPool. > 2021-05-14 06:15:17,458 INFO
[jira] [Commented] (FLINK-22663) Release YARN resource very slow when cancel the job after some NodeManagers shutdown
[ https://issues.apache.org/jira/browse/FLINK-22663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17346599#comment-17346599 ] Jinhong Liu commented on FLINK-22663: - [~fly_in_gis] Firstly, this issue occurs just at least one TaskManger is running on the Dead NoManager. Secondly, when the issue occurs, all the containers include the AppMaster cannot exit, not only the containers on the Dead NodeManager. Finally, I find a configuration that can help containers exit quickly, _taskmanager.registration.timeout_, the default value is 5 min. If I set it to 1 min, the containers exit after one minute later, but the AppMaster still need about 10 mins to exit. > Release YARN resource very slow when cancel the job after some NodeManagers > shutdown > > > Key: FLINK-22663 > URL: https://issues.apache.org/jira/browse/FLINK-22663 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.12.2 >Reporter: Jinhong Liu >Priority: Major > Labels: YARN > > When I test flink on YARN, there is a case that may cause some problems. > Hadoop Version: 2.7.3 > Flink Version: 1.12.2 > I deploy a flink job on YARN, when the job is running I stop one NodeManager, > after one or two minutes, the job is auto recovered. But in this situation, > if I cancel the job, the containers cannot be released immediately, there are > still some containers that are running include the app master. About 5 > minutes later, these containers exit, and about 10 minutes later the app > master exit. > I check the log of app master, seems it try to stop the containers on the > NodeManger which I have already stopped. > {code:java} > 2021-05-14 06:15:17,389 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job class > tv.freewheel.reporting.fastlane.Fastlane$ (da883ab39a7a82e4d45a3803bc77dd6f) > switched from state CANCELLING to CANCELED. > 2021-05-14 06:15:17,389 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Stopping > checkpoint coordinator for job da883ab39a7a82e4d45a3803bc77dd6f. > 2021-05-14 06:15:17,390 INFO > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - > Shutting down > 2021-05-14 06:15:17,408 INFO > org.apache.flink.runtime.dispatcher.MiniDispatcher [] - Job > da883ab39a7a82e4d45a3803bc77dd6f reached globally terminal state CANCELED. > 2021-05-14 06:15:17,409 INFO > org.apache.flink.runtime.dispatcher.MiniDispatcher [] - Shutting > down cluster with state CANCELED, jobCancelled: true, executionMode: DETACHED > 2021-05-14 06:15:17,409 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting > YarnJobClusterEntrypoint down with application status CANCELED. Diagnostics > null. > 2021-05-14 06:15:17,409 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting > down rest endpoint. > 2021-05-14 06:15:17,420 INFO org.apache.flink.runtime.jobmaster.JobMaster > [] - Stopping the JobMaster for job class > tv.freewheel.reporting.fastlane.Fastlane$(da883ab39a7a82e4d45a3803bc77dd6f). > 2021-05-14 06:15:17,422 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing > cache directory > /tmp/flink-web-af72a00c-0ddd-4e5e-a62c-8244d6caa552/flink-web-ui > 2021-05-14 06:15:17,432 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - > http://ip-10-23-19-197.ec2.internal:43811 lost leadership > 2021-05-14 06:15:17,432 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down > complete. > 2021-05-14 06:15:17,436 INFO > org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - > Shut down cluster because application is in CANCELED, diagnostics null. > 2021-05-14 06:15:17,436 INFO org.apache.flink.yarn.YarnResourceManagerDriver > [] - Unregister application from the YARN Resource Manager with > final status KILLED. > 2021-05-14 06:15:17,458 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Suspending > SlotPool. > 2021-05-14 06:15:17,458 INFO org.apache.flink.runtime.jobmaster.JobMaster > [] - Close ResourceManager connection > 493862ba148679a4f16f7de5ffaef665: Stopping JobMaster for job class > tv.freewheel.reporting.fastlane.Fastlane$(da883ab39a7a82e4d45a3803bc77dd6f).. > 2021-05-14 06:15:17,458 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Stopping > SlotPool. > 2021-05-14 06:15:17,482 INFO > org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl[] - Waiting for > application to be successfully unregistered. > 2021-05-14 06:15:17,566 INFO
[GitHub] [flink] lirui-apache commented on a change in pull request #15653: [FLINK-22329][hive] Inject current ugi credentials into jobconf when getting file split in hive connector
lirui-apache commented on a change in pull request #15653: URL: https://github.com/apache/flink/pull/15653#discussion_r634060636 ## File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/util/JobConfUtilsTest.java ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive.util; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +/** Test for {@link JobConfUtils}. */ +public class JobConfUtilsTest { Review comment: Yeah. I think we don't need it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-22552) Rebase StateFun on Flink 1.13
[ https://issues.apache.org/jira/browse/FLINK-22552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai reassigned FLINK-22552: --- Assignee: Tzu-Li (Gordon) Tai > Rebase StateFun on Flink 1.13 > - > > Key: FLINK-22552 > URL: https://issues.apache.org/jira/browse/FLINK-22552 > Project: Flink > Issue Type: Improvement > Components: Stateful Functions >Reporter: Igal Shilman >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: statefun-3.1.0 > > > Following the recent release of Flink 1.13, StateFun main branch needs to be > rebased on that version. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #15940: [FLINK-12295][table] Fix comments in MinAggFunction and MaxAggFunction
flinkbot edited a comment on pull request #15940: URL: https://github.com/apache/flink/pull/15940#issuecomment-842844923 ## CI report: * 34caeffd3991e4526a4b04c367abf37d12b9c523 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18061) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] tweise commented on pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
tweise commented on pull request #15924: URL: https://github.com/apache/flink/pull/15924#issuecomment-842849181 @SteNicholas thanks for the feedback! I will update the PR once we gather some more input. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] tweise commented on pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
tweise commented on pull request #15924: URL: https://github.com/apache/flink/pull/15924#issuecomment-842848588 > @tweise , thanks for creating pull request for `HybridSource` basic implementation. IMO, `SwitchableSource` and `SwitchableSplitEnumerator` mentioned in FLIP-150 could be introduced here to control the switching behavior more appropriately and add switchable source more conveniently. Please see discussion on the mailing list. In short, I don't think it is reasonable to mandate special "Switchable" interfaces for a source to be used with HybridSource since many use cases don't benefit from it, including the most basic scenario with fixed start positions. >What's more, does `HybridSourceITCase` need to verify the switching between FileSource and KafkaSource? That might be better achieved with an end to end test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] tweise commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
tweise commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r634044007 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** Hybrid source reader that delegates to the actual current source reader. */ +public class HybridSourceReader +implements SourceReader> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceReader.class); +private static final int SOURCE_READER_FINISHED_EVENT_DELAY = 250; +private SourceReaderContext readerContext; +private List> realReaders; +private int currentSourceIndex = -1; +private long lastCheckpointId = -1; +private SourceReader currentReader; +private long lastReaderFinishedMs; + +public HybridSourceReader( +SourceReaderContext readerContext, List> readers) { +this.readerContext = readerContext; +this.realReaders = readers; +} + +@Override +public void start() { +setCurrentReader(0); +} + +@Override +public InputStatus pollNext(ReaderOutput output) throws Exception { +InputStatus status = currentReader.pollNext(output); +if (status == InputStatus.END_OF_INPUT) { +// trap END_OF_INPUT if this wasn't the final reader +LOG.debug( +"End of input subtask={} sourceIndex={} {}", +readerContext.getIndexOfSubtask(), +currentSourceIndex, +currentReader); +if (currentSourceIndex + 1 < realReaders.size()) { +// signal coordinator to advance readers +long currentMillis = System.currentTimeMillis(); +if (lastReaderFinishedMs + SOURCE_READER_FINISHED_EVENT_DELAY < currentMillis) { +lastReaderFinishedMs = currentMillis; +readerContext.sendSourceEventToCoordinator( +new SourceReaderFinishedEvent(currentSourceIndex, lastCheckpointId)); +} +// more data will be available from the next reader +return InputStatus.MORE_AVAILABLE; +} +} +return status; +} + +@Override +public List> snapshotState(long checkpointId) { +this.lastCheckpointId = checkpointId; +List state = currentReader.snapshotState(checkpointId); +return wrappedSplits(currentSourceIndex, state); +} + +public static List> wrappedSplits( +int readerIndex, List state) { +List> wrappedSplits = new ArrayList<>(state.size()); +for (SplitT split : state) { +wrappedSplits.add(new HybridSourceSplit<>(readerIndex, split)); +} +return wrappedSplits; +} + +public static List unwrappedSplits( +List> splits) { +List unwrappedSplits = new ArrayList<>(splits.size()); +for (HybridSourceSplit split : splits) { +unwrappedSplits.add(split.getWrappedSplit()); +} +return unwrappedSplits; +} + +@Override +public CompletableFuture isAvailable() { +return currentReader.isAvailable(); +} + +@Override +public void addSplits(List> splits) { +LOG.info( +"### Adding splits subtask={} sourceIndex={} {} {}", +readerContext.getIndexOfSubtask(), +currentSourceIndex, +
[GitHub] [flink] tweise commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
tweise commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r634043488 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** Hybrid source reader that delegates to the actual current source reader. */ +public class HybridSourceReader +implements SourceReader> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceReader.class); +private static final int SOURCE_READER_FINISHED_EVENT_DELAY = 250; Review comment: Delay is required to not flood the coordinator with finished events when the current reader reaches end of input. Yes, it can be made configurable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #15940: [FLINK-12295][table] Fix comments in MinAggFunction and MaxAggFunction
flinkbot commented on pull request #15940: URL: https://github.com/apache/flink/pull/15940#issuecomment-842844923 ## CI report: * 34caeffd3991e4526a4b04c367abf37d12b9c523 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] tweise commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
tweise commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r634042830 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** Hybrid source reader that delegates to the actual current source reader. */ Review comment: The `HybridSourceReader` references the reader for the currently active underlying source. That reader changes when sources are switched. Will update javadoc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] tweise commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
tweise commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r634042326 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** Hybrid source that switches underlying sources based on configurable source chain. */ +public class HybridSource implements Source { + +private final SourceChain sourceChain; + +public HybridSource(SourceChain sourceChain) { +this.sourceChain = sourceChain; +} + +@Override +public Boundedness getBoundedness() { +for (Tuple2, ?> t : sourceChain.sources) { Review comment: Will fix that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] tweise commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
tweise commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r634041985 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** Hybrid source that switches underlying sources based on configurable source chain. */ +public class HybridSource implements Source { + +private final SourceChain sourceChain; + +public HybridSource(SourceChain sourceChain) { Review comment: `SourceChain` is intended for the user to optionally provide a way to define sources with runtime position handover. It provides type safety for the checkpoint conversion function. An example how that would look like is here: https://github.com/apache/flink/pull/15924/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R50 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-15733) Add min number memory pages check for BytesHashMap
[ https://issues.apache.org/jira/browse/FLINK-15733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-15733. Resolution: Invalid > Add min number memory pages check for BytesHashMap > -- > > Key: FLINK-15733 > URL: https://issues.apache.org/jira/browse/FLINK-15733 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: Jingsong Lee >Priority: Major > > We should check the min number memory and throw a clear exception, instead of > a confused NullPointException. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on pull request #15940: [FLINK-12295][table] Fix comments in MinAggFunction and MaxAggFunction
flinkbot commented on pull request #15940: URL: https://github.com/apache/flink/pull/15940#issuecomment-842815707 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 34caeffd3991e4526a4b04c367abf37d12b9c523 (Tue May 18 04:28:24 UTC 2021) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-12295) avoid to call retractExpressions method of max/min function in retractable aggregate code-gen
[ https://issues.apache.org/jira/browse/FLINK-12295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12295: --- Labels: auto-unassigned pull-request-available (was: auto-unassigned) > avoid to call retractExpressions method of max/min function in retractable > aggregate code-gen > - > > Key: FLINK-12295 > URL: https://issues.apache.org/jira/browse/FLINK-12295 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: godfrey he >Assignee: Jingsong Lee >Priority: Major > Labels: auto-unassigned, pull-request-available > Fix For: 1.14.0 > > > after {{FlinkRelMdModifiedMonotonicity}} introduced, a max/min function whose > result value is modified increasing/decreasing could ignore retraction > message. We could choose regular max/min function instead of retract max/min > function in code-gen. Currently, this requires the regular max/min function > must implements {{retractExpressions}} method and do not throw any > exceptions. A better approach is the retractable aggregate operator does not > call {{retractExpressions}} method for max/min function. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] JingsongLi opened a new pull request #15940: [FLINK-12295][table] Fix comments in MinAggFunction and MaxAggFunction
JingsongLi opened a new pull request #15940: URL: https://github.com/apache/flink/pull/15940 ## What is the purpose of the change Fix comments in `MaxAggFunction.retractExpressions`, return `Expression[0]` is right. ``` See optimization in FlinkRelMdModifiedMonotonicity. MaxAggFunction can ignore retraction message: SQL: SELECT MAX(cnt), SUM(cnt) FROM (SELECT count(a) as cnt FROM T GROUP BY b) The cnt is modified increasing, so the MAX(cnt) can ignore retraction message. But this doesn't mean that the node won't receive the retraction message, because there are other aggregate operators that need retraction message, such as SUM(cnt). ``` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) no - The serializers: (yes / no / don't know) no - The runtime per-record code paths (performance sensitive): (yes / no / don't know) no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know) no - The S3 file system connector: (yes / no / don't know) no ## Documentation - Does this pull request introduce a new feature? (yes / no) no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-12295) avoid to call retractExpressions method of max/min function in retractable aggregate code-gen
[ https://issues.apache.org/jira/browse/FLINK-12295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-12295: Assignee: Jingsong Lee > avoid to call retractExpressions method of max/min function in retractable > aggregate code-gen > - > > Key: FLINK-12295 > URL: https://issues.apache.org/jira/browse/FLINK-12295 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: godfrey he >Assignee: Jingsong Lee >Priority: Major > Labels: auto-unassigned > > after {{FlinkRelMdModifiedMonotonicity}} introduced, a max/min function whose > result value is modified increasing/decreasing could ignore retraction > message. We could choose regular max/min function instead of retract max/min > function in code-gen. Currently, this requires the regular max/min function > must implements {{retractExpressions}} method and do not throw any > exceptions. A better approach is the retractable aggregate operator does not > call {{retractExpressions}} method for max/min function. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-12295) avoid to call retractExpressions method of max/min function in retractable aggregate code-gen
[ https://issues.apache.org/jira/browse/FLINK-12295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated FLINK-12295: - Fix Version/s: 1.14.0 > avoid to call retractExpressions method of max/min function in retractable > aggregate code-gen > - > > Key: FLINK-12295 > URL: https://issues.apache.org/jira/browse/FLINK-12295 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: godfrey he >Assignee: Jingsong Lee >Priority: Major > Labels: auto-unassigned > Fix For: 1.14.0 > > > after {{FlinkRelMdModifiedMonotonicity}} introduced, a max/min function whose > result value is modified increasing/decreasing could ignore retraction > message. We could choose regular max/min function instead of retract max/min > function in code-gen. Currently, this requires the regular max/min function > must implements {{retractExpressions}} method and do not throw any > exceptions. A better approach is the retractable aggregate operator does not > call {{retractExpressions}} method for max/min function. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] JackWangCS commented on pull request #14841: [FLINK-21232][YARN][Common] Introduce pluggable Hadoop delegation token providers
JackWangCS commented on pull request #14841: URL: https://github.com/apache/flink/pull/14841#issuecomment-842810784 Hi @zuston , `TokenCache.obtainTokensForNamenodes` need to pass in a set of paths. In the context of `HadoopFSDelegationTokenProvider`, it's a bit complex to constrcut the paths(both from uploader and extra paths to access) and it's unnecessary to do it. So HadoopFSDelegationTokenProvider use the `fs.addDelegationTokens` to obtain token directly(). In fact, Spark also use this method. `TokenCache.obtainTokensForNamenodes` also call the `fs.addDelegationTokens` to obtain tokens in its implementation. See: https://github.com/hanborq/hadoop/blob/99babec26a5f42021c0ec2a39ee7dbad4387b176/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java#L115 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zuston commented on a change in pull request #15653: [FLINK-22329][hive] Inject current ugi credentials into jobconf when getting file split in hive connector
zuston commented on a change in pull request #15653: URL: https://github.com/apache/flink/pull/15653#discussion_r634020126 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java ## @@ -114,17 +117,19 @@ public DynamicTableSource createDynamicTableSource(Context context) { STREAMING_SOURCE_PARTITION_INCLUDE.key(), STREAMING_SOURCE_PARTITION_INCLUDE .defaultValue())); +JobConf jobConf = new JobConf(hiveConf); +addCredentialsIntoJobConf(jobConf); Review comment: Got it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zuston commented on a change in pull request #15653: [FLINK-22329][hive] Inject current ugi credentials into jobconf when getting file split in hive connector
zuston commented on a change in pull request #15653: URL: https://github.com/apache/flink/pull/15653#discussion_r634020059 ## File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/util/JobConfUtilsTest.java ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive.util; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +/** Test for {@link JobConfUtils}. */ +public class JobConfUtilsTest { Review comment: Do I need to remove JobConfUtilsTest test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zuston commented on pull request #14841: [FLINK-21232][YARN][Common] Introduce pluggable Hadoop delegation token providers
zuston commented on pull request #14841: URL: https://github.com/apache/flink/pull/14841#issuecomment-842804878 @JackWangCS Thanks for your reply promptly. Sorry, i don't know why not reusing the `TokenCache.obtainTokensForNamenodes` to obtain fs delegation tokens. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-22663) Release YARN resource very slow when cancel the job after some NodeManagers shutdown
[ https://issues.apache.org/jira/browse/FLINK-22663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17346568#comment-17346568 ] Yang Wang commented on FLINK-22663: --- Thanks for sharing more information. Only when the JobManager and at least one TaskManager are running on the dead NodeManager, then this issue occurs. The root cause is {{NMClientImpl#stop}} is trying to cleaning up all running containers, including the ones on the dead NodeManager. Actually, I am afraid it is not a bug of Flink. Because Flink has deregistered the application successfully, but Yarn did not kill all the containers. A possible improvement might be not cleaning up the running containers when stopping NM client. However, it also has side effects. Moreover, the reason why "yarn application -kill appid" could work is the JobManager received a shutdown request from YARN ResourceManager and then kill itself. I believe you could find some logs like "ResourceManagerException: Received shutdown request from YARN ResourceManager". > Release YARN resource very slow when cancel the job after some NodeManagers > shutdown > > > Key: FLINK-22663 > URL: https://issues.apache.org/jira/browse/FLINK-22663 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.12.2 >Reporter: Jinhong Liu >Priority: Major > Labels: YARN > > When I test flink on YARN, there is a case that may cause some problems. > Hadoop Version: 2.7.3 > Flink Version: 1.12.2 > I deploy a flink job on YARN, when the job is running I stop one NodeManager, > after one or two minutes, the job is auto recovered. But in this situation, > if I cancel the job, the containers cannot be released immediately, there are > still some containers that are running include the app master. About 5 > minutes later, these containers exit, and about 10 minutes later the app > master exit. > I check the log of app master, seems it try to stop the containers on the > NodeManger which I have already stopped. > {code:java} > 2021-05-14 06:15:17,389 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job class > tv.freewheel.reporting.fastlane.Fastlane$ (da883ab39a7a82e4d45a3803bc77dd6f) > switched from state CANCELLING to CANCELED. > 2021-05-14 06:15:17,389 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Stopping > checkpoint coordinator for job da883ab39a7a82e4d45a3803bc77dd6f. > 2021-05-14 06:15:17,390 INFO > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - > Shutting down > 2021-05-14 06:15:17,408 INFO > org.apache.flink.runtime.dispatcher.MiniDispatcher [] - Job > da883ab39a7a82e4d45a3803bc77dd6f reached globally terminal state CANCELED. > 2021-05-14 06:15:17,409 INFO > org.apache.flink.runtime.dispatcher.MiniDispatcher [] - Shutting > down cluster with state CANCELED, jobCancelled: true, executionMode: DETACHED > 2021-05-14 06:15:17,409 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting > YarnJobClusterEntrypoint down with application status CANCELED. Diagnostics > null. > 2021-05-14 06:15:17,409 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting > down rest endpoint. > 2021-05-14 06:15:17,420 INFO org.apache.flink.runtime.jobmaster.JobMaster > [] - Stopping the JobMaster for job class > tv.freewheel.reporting.fastlane.Fastlane$(da883ab39a7a82e4d45a3803bc77dd6f). > 2021-05-14 06:15:17,422 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing > cache directory > /tmp/flink-web-af72a00c-0ddd-4e5e-a62c-8244d6caa552/flink-web-ui > 2021-05-14 06:15:17,432 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - > http://ip-10-23-19-197.ec2.internal:43811 lost leadership > 2021-05-14 06:15:17,432 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down > complete. > 2021-05-14 06:15:17,436 INFO > org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - > Shut down cluster because application is in CANCELED, diagnostics null. > 2021-05-14 06:15:17,436 INFO org.apache.flink.yarn.YarnResourceManagerDriver > [] - Unregister application from the YARN Resource Manager with > final status KILLED. > 2021-05-14 06:15:17,458 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Suspending > SlotPool. > 2021-05-14 06:15:17,458 INFO org.apache.flink.runtime.jobmaster.JobMaster > [] - Close ResourceManager connection > 493862ba148679a4f16f7de5ffaef665: Stopping JobMaster for job class >
[GitHub] [flink] zuston commented on a change in pull request #14841: [FLINK-21232][YARN][Common] Introduce pluggable Hadoop delegation token providers
zuston commented on a change in pull request #14841: URL: https://github.com/apache/flink/pull/14841#discussion_r634018947 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/security/HadoopFSDelegationTokenProvider.java ## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.security; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ConfigUtils; +import org.apache.flink.runtime.security.delegationtokens.HadoopDelegationTokenConfiguration; +import org.apache.flink.runtime.security.delegationtokens.HadoopDelegationTokenProvider; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.function.FunctionUtils; +import org.apache.flink.yarn.configuration.YarnConfigOptions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.Master; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** Delegation token provider implementation for Hadoop FileSystems. */ +public class HadoopFSDelegationTokenProvider implements HadoopDelegationTokenProvider { + +private static final Logger LOG = +LoggerFactory.getLogger(HadoopFSDelegationTokenProvider.class); + +private HadoopDelegationTokenConfiguration hadoopDelegationTokenConf; + +@Override +public String serviceName() { +return "hadoopfs"; +} + +@Override +public void init(final HadoopDelegationTokenConfiguration conf) { +this.hadoopDelegationTokenConf = conf; +} + +@Override +public boolean delegationTokensRequired() { +return UserGroupInformation.isSecurityEnabled(); +} + +@Override +public Optional obtainDelegationTokens(final Credentials credentials) { +try { +Set fileSystemsToAccess = getFileSystemsToAccess(); + +final String renewer = getTokenRenewer(hadoopDelegationTokenConf.getHadoopConf()); +fileSystemsToAccess.forEach( +fs -> { +try { +LOG.info("Getting FS token for: {} with renewer {}", fs, renewer); +fs.addDelegationTokens(renewer, credentials); +} catch (IOException e) { +LOG.warn("Failed to get token for {}.", fs); Review comment: But it looks that you catch the IOException. Besides, i don't know why not reuse the original code to obtain delegation tokens, like `TokenCache.obtainTokensForNamenodes` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-22691) On Aliyun ECS, docker deployed flink (1.13.0), which was scanned to Mirai DDOS Trojan
Magina.Shan created FLINK-22691: --- Summary: On Aliyun ECS, docker deployed flink (1.13.0), which was scanned to Mirai DDOS Trojan Key: FLINK-22691 URL: https://issues.apache.org/jira/browse/FLINK-22691 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.13.0 Reporter: Magina.Shan Attachments: image-2021-05-18-11-45-19-572.png, image-2021-05-18-11-46-26-463.png On Aliyun ECS, docker deployed flink (1.13.0), which was scanned to Mirai DDOS Trojan. The path is: /opt/flink/Winbox. The Aliyun ticket says it is a flink vulnerability. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #15703: [FLINK-22275][table] Support random past for timestamp types in datagen connector
flinkbot edited a comment on pull request #15703: URL: https://github.com/apache/flink/pull/15703#issuecomment-823980125 ## CI report: * 6b16c2c8b503361eb3da3fd01edd7d1f1aafed62 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16957) * c6072f7b0900838affe40092f8798fe11d14a7b4 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18060) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-21986) taskmanager native memory not release timely after restart
[ https://issues.apache.org/jira/browse/FLINK-21986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17346558#comment-17346558 ] Yun Tang commented on FLINK-21986: -- [~Feifan Wang], could you tell me what the docker image, the parallelism, the related memory configuration and what operations you would take when you run flink-21986-regular-join-test-case? It seems I did not reproduce the problem of memory continue growing up after failover restore when running flink-21986-regular-join-test-case. > taskmanager native memory not release timely after restart > -- > > Key: FLINK-21986 > URL: https://issues.apache.org/jira/browse/FLINK-21986 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.11.3, 1.12.1, 1.13.0 > Environment: flink version:1.12.1 > run :yarn session > job type:mock source -> regular join > > checkpoint interval: 3m > Taskmanager memory : 16G > >Reporter: Feifan Wang >Assignee: Feifan Wang >Priority: Critical > Labels: pull-request-available > Fix For: 1.11.4, 1.13.0, 1.12.3 > > Attachments: 82544.svg, image-2021-03-25-15-53-44-214.png, > image-2021-03-25-16-07-29-083.png, image-2021-03-26-11-46-06-828.png, > image-2021-03-26-11-47-21-388.png > > > I run a regular join job with flink_1.12.1 , and find taskmanager native > memory not release timely after restart cause by exceeded checkpoint > tolerable failure threshold. > *problem job information:* > # job first restart cause by exceeded checkpoint tolerable failure threshold. > # then taskmanager be killed by yarn many times > # in this case,tm heap is set to 7.68G,bug all tm heap size is under 4.2G > !image-2021-03-25-15-53-44-214.png|width=496,height=103! > # nonheap size increase after restart,but still under 160M. > > !https://km.sankuai.com/api/file/cdn/706284607/716474606?contentType=1=false=false|width=493,height=102! > # taskmanager process memory increase 3-4G after restart(this figure show > one of taskmanager) > !image-2021-03-25-16-07-29-083.png|width=493,height=107! > > *my guess:* > [RocksDB > wiki|https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management] > mentioned :Many of the Java Objects used in the RocksJava API will be backed > by C++ objects for which the Java Objects have ownership. As C++ has no > notion of automatic garbage collection for its heap in the way that Java > does, we must explicitly free the memory used by the C++ objects when we are > finished with them. > So, is it possible that RocksDBStateBackend not call > AbstractNativeReference#close() to release memory use by RocksDB C++ Object ? > *I make a change:* > Actively call System.gc() and System.runFinalization() every minute. > *And run this test again:* > # taskmanager process memory no obvious increase > !image-2021-03-26-11-46-06-828.png|width=495,height=93! > # job run for several days,and restart many times,but no taskmanager killed > by yarn like before > > *Summary:* > # first,there is some native memory can not release timely after restart in > this situation > # I guess it maybe RocksDB C++ object,but I hive not check it from source > code of RocksDBStateBackend > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #15939: [FLINK-15390][Connectors/ORC][Connectors/Hive]List/Map/Struct types support for vectorized orc reader
flinkbot edited a comment on pull request #15939: URL: https://github.com/apache/flink/pull/15939#issuecomment-842780777 ## CI report: * 605cdc74e9e601e47c941403361470384dafeaf0 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18059) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15936: [FLINK-22683][runtime] Fix the null or incorrect value of total Flink…
flinkbot edited a comment on pull request #15936: URL: https://github.com/apache/flink/pull/15936#issuecomment-842214419 ## CI report: * a1feff3c7c34e4ef0ef939d70e1f389f030de721 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18031) * e46df619d0b485fea75e9c36514be22887514314 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18058) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15703: [FLINK-22275][table] Support random past for timestamp types in datagen connector
flinkbot edited a comment on pull request #15703: URL: https://github.com/apache/flink/pull/15703#issuecomment-823980125 ## CI report: * 6b16c2c8b503361eb3da3fd01edd7d1f1aafed62 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16957) * c6072f7b0900838affe40092f8798fe11d14a7b4 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun] tzulitai commented on pull request #230: [FLINK-22468] Fix Java 11 build by adding dependency to javax.annotations
tzulitai commented on pull request #230: URL: https://github.com/apache/flink-statefun/pull/230#issuecomment-842790470 @knaufk thanks for the review and trying this out. I'm not merging this just yet because we should make this a profile that is enabled specifically for Java 11 builds. Or, the other way around, where we build with Java 11 by default and remove the javax dependency when building with Java 8. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JasonLeeCoding commented on pull request #15926: [FLINK-22655]When using -i option to initialize SQL Client session It should be possible to annotate the script with --
JasonLeeCoding commented on pull request #15926: URL: https://github.com/apache/flink/pull/15926#issuecomment-842786948 I got an error java.lang.AssertionError testCancelExecutionInNonInteractiveMode throws an exception at line 303 of the code Can anyone help me take a look ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-22689) Table API Documentation Row-Based Operations Example Fails
[ https://issues.apache.org/jira/browse/FLINK-22689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yunfeng Zhou updated FLINK-22689: - Description: I wrote the following program according to the example code provided in [Documentation/Table API/Row-based operations|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/tableapi/#row-based-operations] {code:java} public class TableUDF { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); Table input = tEnv.fromValues( DataTypes.of("ROW"), Row.of("name") ); ScalarFunction func = new MyMapFunction(); tEnv.registerFunction("func", func); Table table = input .map(call("func", $("c")).as("a", "b")); // exception occurs here table.execute().print(); } public static class MyMapFunction extends ScalarFunction { public Row eval(String a) { return Row.of(a, "pre-" + a); } @Override public TypeInformation getResultType(Class[] signature) { return Types.ROW(Types.STRING, Types.STRING); } } } {code} The code above would throw an exception like this: {code} Exception in thread "main" org.apache.flink.table.api.ValidationException: Only a scalar function can be used in the map operator. at org.apache.flink.table.operations.utils.OperationTreeBuilder.map(OperationTreeBuilder.java:480) at org.apache.flink.table.api.internal.TableImpl.map(TableImpl.java:519) at org.apache.flink.ml.common.function.TableUDFBug.main(TableUDF.java:29) {code} The core of the program above is identical to that provided in flink documentation, but it cannot function correctly. This might affect users who want to use custom function with table API. was: I wrote the following program according to the example code provided in [Documentation/Table API/Row-based operations|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/tableapi/#row-based-operations] public class TableUDF { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); Table input = tEnv.fromValues( DataTypes.of("ROW"), Row.of("name") ); ScalarFunction func = new MyMapFunction(); tEnv.registerFunction("func", func); Table table = input .map(call("func", $("c")).as("a", "b")); // exception occurs here table.execute().print(); } public static class MyMapFunction extends ScalarFunction { public Row eval(String a) { return Row.of(a, "pre-" + a); } @Override public TypeInformation getResultType(Class[] signature) { return Types.ROW(Types.STRING, Types.STRING); } } } The code above would throw an exception like this: Exception in thread "main" org.apache.flink.table.api.ValidationException: Only a scalar function can be used in the map operator. at org.apache.flink.table.operations.utils.OperationTreeBuilder.map(OperationTreeBuilder.java:480) at org.apache.flink.table.api.internal.TableImpl.map(TableImpl.java:519) at org.apache.flink.ml.common.function.TableUDFBug.main(TableUDF.java:29) The core of the program above is identical to that provided in flink documentation, but it cannot function correctly. This might affect users who want to use custom function with table API. > Table API Documentation Row-Based Operations Example Fails > -- > > Key: FLINK-22689 > URL: https://issues.apache.org/jira/browse/FLINK-22689 > Project: Flink > Issue Type: Bug > Components: Table SQL / Ecosystem >Affects Versions: 1.12.1 >Reporter: Yunfeng Zhou >Priority: Major > > I wrote the following program according to the example code provided in > [Documentation/Table API/Row-based > operations|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/tableapi/#row-based-operations] > {code:java} > public class TableUDF { > public static void main(String[] args) { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); > > Table input = tEnv.fromValues( > DataTypes.of("ROW"), > Row.of("name") > ); > > ScalarFunction func = new MyMapFunction(); > tEnv.registerFunction("func", func); >
[jira] [Commented] (FLINK-22686) Incompatible subtask mappings while resuming from unaligned checkpoints
[ https://issues.apache.org/jira/browse/FLINK-22686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17346548#comment-17346548 ] ChangZhuo Chen (陳昌倬) commented on FLINK-22686: -- [~arvid] * The topology is in attachments topology_1.png, topology_2.png, topology_3.png (From top to bottom) * All operators are replaced with their base class name due to company policy. * The `EnrichedData` in topology is a `map` to change data format from Kafka. > Incompatible subtask mappings while resuming from unaligned checkpoints > --- > > Key: FLINK-22686 > URL: https://issues.apache.org/jira/browse/FLINK-22686 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.13.0 >Reporter: Arvid Heise >Priority: Blocker > Fix For: 1.13.1 > > Attachments: topology_1.png, topology_2.png, topology_3.png > > > A user > [reported|https://lists.apache.org/x/list.html?u...@flink.apache.org:lte=1M:Flink%201.13.0%20reactive%20mode:%20Job%20stop%20and%20cannot%20restore%20from%20checkpoint] > that he encountered an internal error while resuming during reactive mode. > There isn't an immediate connection to reactive mode, so it's safe to assume > that one rescaling case was not covered. > {noformat} > Caused by: java.lang.IllegalStateException: Incompatible subtask mappings: > are multiple operators ingesting/producing intermediate results with varying > degrees of parallelism?Found RescaleMappings{mappings=[[0, 1, 2, 3, 4, 5, 6, > 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, > 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, > 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, > 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, > 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, > 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, > 117, 118, 119], [120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, > 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, > 147, 148, 149], [150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, > 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, > 177, 178, 179], [180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, > 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, > 207, 208, 209]]} and RescaleMappings{mappings=[[0, 7, 14, 21, 28, 35, 42, 49, > 56, 63, 70, 77, 84, 91, 98, 105, 112, 119, 126, 133, 140, 147, 154, 161, 168, > 175, 182, 189, 196, 203], [1, 8, 15, 22, 29, 36, 43, 50, 57, 64, 71, 78, 85, > 92, 99, 106, 113, 120, 127, 134, 141, 148, 155, 162, 169, 176, 183, 190, 197, > 204], [2, 9, 16, 23, 30, 37, 44, 51, 58, 65, 72, 79, 86, 93, 100, 107, 114, > 121, 128, 135, 142, 149, 156, 163, 170, 177, 184, 191, 198, 205], [3, 10, 17, > 24, 31, 38, 45, 52, 59, 66, 73, 80, 87, 94, 101, 108, 115, 122, 129, 136, > 143, 150, 157, 164, 171, 178, 185, 192, 199, 206], [4, 11, 18, 25, 32, 39, > 46, 53, 60, 67, 74, 81, 88, 95, 102, 109, 116, 123, 130, 137, 144, 151, 158, > 165, 172, 179, 186, 193, 200, 207], [5, 12, 19, 26, 33, 40, 47, 54, 61, 68, > 75, 82, 89, 96, 103, 110, 117, 124, 131, 138, 145, 152, 159, 166, 173, 180, > 187, 194, 201, 208], [6, 13, 20, 27, 34, 41, 48, 55, 62, 69, 76, 83, 90, 97, > 104, 111, 118, 125, 132, 139, 146, 153, 160, 167, 174, 181, 188, 195, 202, > 209]]}. > at > org.apache.flink.runtime.checkpoint.TaskStateAssignment.checkSubtaskMapping(TaskStateAssignment.java:322) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.TaskStateAssignment.getInputMapping(TaskStateAssignment.java:306) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeInputChannelStates(StateAssignmentOperation.java:409) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:193) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:139) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1566) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1646) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at >
[jira] [Created] (FLINK-22690) kerberos integration with flink,kerberos tokens will expire
kevinsun created FLINK-22690: Summary: kerberos integration with flink,kerberos tokens will expire Key: FLINK-22690 URL: https://issues.apache.org/jira/browse/FLINK-22690 Project: Flink Issue Type: Bug Components: Deployment / YARN Reporter: kevinsun flink on yarn job. flink sink hdfs,hive,hbase... ,some times later,kerberos tokens will expire. error: Failed to find any Kerberos tgt eg*:StreamingFileSink* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22686) Incompatible subtask mappings while resuming from unaligned checkpoints
[ https://issues.apache.org/jira/browse/FLINK-22686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ChangZhuo Chen (陳昌倬) updated FLINK-22686: - Attachment: topology_3.png topology_2.png topology_1.png > Incompatible subtask mappings while resuming from unaligned checkpoints > --- > > Key: FLINK-22686 > URL: https://issues.apache.org/jira/browse/FLINK-22686 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.13.0 >Reporter: Arvid Heise >Priority: Blocker > Fix For: 1.13.1 > > Attachments: topology_1.png, topology_2.png, topology_3.png > > > A user > [reported|https://lists.apache.org/x/list.html?u...@flink.apache.org:lte=1M:Flink%201.13.0%20reactive%20mode:%20Job%20stop%20and%20cannot%20restore%20from%20checkpoint] > that he encountered an internal error while resuming during reactive mode. > There isn't an immediate connection to reactive mode, so it's safe to assume > that one rescaling case was not covered. > {noformat} > Caused by: java.lang.IllegalStateException: Incompatible subtask mappings: > are multiple operators ingesting/producing intermediate results with varying > degrees of parallelism?Found RescaleMappings{mappings=[[0, 1, 2, 3, 4, 5, 6, > 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, > 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, > 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, > 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, > 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, > 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, > 117, 118, 119], [120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, > 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, > 147, 148, 149], [150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, > 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, > 177, 178, 179], [180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, > 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, > 207, 208, 209]]} and RescaleMappings{mappings=[[0, 7, 14, 21, 28, 35, 42, 49, > 56, 63, 70, 77, 84, 91, 98, 105, 112, 119, 126, 133, 140, 147, 154, 161, 168, > 175, 182, 189, 196, 203], [1, 8, 15, 22, 29, 36, 43, 50, 57, 64, 71, 78, 85, > 92, 99, 106, 113, 120, 127, 134, 141, 148, 155, 162, 169, 176, 183, 190, 197, > 204], [2, 9, 16, 23, 30, 37, 44, 51, 58, 65, 72, 79, 86, 93, 100, 107, 114, > 121, 128, 135, 142, 149, 156, 163, 170, 177, 184, 191, 198, 205], [3, 10, 17, > 24, 31, 38, 45, 52, 59, 66, 73, 80, 87, 94, 101, 108, 115, 122, 129, 136, > 143, 150, 157, 164, 171, 178, 185, 192, 199, 206], [4, 11, 18, 25, 32, 39, > 46, 53, 60, 67, 74, 81, 88, 95, 102, 109, 116, 123, 130, 137, 144, 151, 158, > 165, 172, 179, 186, 193, 200, 207], [5, 12, 19, 26, 33, 40, 47, 54, 61, 68, > 75, 82, 89, 96, 103, 110, 117, 124, 131, 138, 145, 152, 159, 166, 173, 180, > 187, 194, 201, 208], [6, 13, 20, 27, 34, 41, 48, 55, 62, 69, 76, 83, 90, 97, > 104, 111, 118, 125, 132, 139, 146, 153, 160, 167, 174, 181, 188, 195, 202, > 209]]}. > at > org.apache.flink.runtime.checkpoint.TaskStateAssignment.checkSubtaskMapping(TaskStateAssignment.java:322) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.TaskStateAssignment.getInputMapping(TaskStateAssignment.java:306) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeInputChannelStates(StateAssignmentOperation.java:409) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:193) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:139) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1566) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1646) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:163) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at >
[jira] [Created] (FLINK-22689) Table API Documentation Row-Based Operations Example Fails
Yunfeng Zhou created FLINK-22689: Summary: Table API Documentation Row-Based Operations Example Fails Key: FLINK-22689 URL: https://issues.apache.org/jira/browse/FLINK-22689 Project: Flink Issue Type: Bug Components: Table SQL / Ecosystem Affects Versions: 1.12.1 Reporter: Yunfeng Zhou I wrote the following program according to the example code provided in [Documentation/Table API/Row-based operations|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/tableapi/#row-based-operations] public class TableUDF { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); Table input = tEnv.fromValues( DataTypes.of("ROW"), Row.of("name") ); ScalarFunction func = new MyMapFunction(); tEnv.registerFunction("func", func); Table table = input .map(call("func", $("c")).as("a", "b")); // exception occurs here table.execute().print(); } public static class MyMapFunction extends ScalarFunction { public Row eval(String a) { return Row.of(a, "pre-" + a); } @Override public TypeInformation getResultType(Class[] signature) { return Types.ROW(Types.STRING, Types.STRING); } } } The code above would throw an exception like this: Exception in thread "main" org.apache.flink.table.api.ValidationException: Only a scalar function can be used in the map operator. at org.apache.flink.table.operations.utils.OperationTreeBuilder.map(OperationTreeBuilder.java:480) at org.apache.flink.table.api.internal.TableImpl.map(TableImpl.java:519) at org.apache.flink.ml.common.function.TableUDFBug.main(TableUDF.java:29) The core of the program above is identical to that provided in flink documentation, but it cannot function correctly. This might affect users who want to use custom function with table API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] beyond1920 commented on a change in pull request #14830: [FLINK-20487][table-planner-blink] Remove restriction on StreamPhysicalGroupWindowAggregate which only supports insert-only in
beyond1920 commented on a change in pull request #14830: URL: https://github.com/apache/flink/pull/14830#discussion_r634001661 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala ## @@ -470,20 +470,21 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti case _: StreamPhysicalGroupAggregate | _: StreamPhysicalGroupTableAggregate | _: StreamPhysicalLimit | _: StreamPhysicalPythonGroupAggregate | - _: StreamPhysicalPythonGroupTableAggregate => -// Aggregate, TableAggregate and Limit requires update_before if there are updates + _: StreamPhysicalPythonGroupTableAggregate | Review comment: I use three sub-classes as previous version. I thought author maybe have reason to use three sub-classes instead of `StreamPhysicalGroupWindowAggregateBase `, for example, prevent the misuse of the parent class when adding a new child class in the future ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-22686) Incompatible subtask mappings while resuming from unaligned checkpoints
[ https://issues.apache.org/jira/browse/FLINK-22686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ChangZhuo Chen (陳昌倬) updated FLINK-22686: - Attachment: (was: 1.png) > Incompatible subtask mappings while resuming from unaligned checkpoints > --- > > Key: FLINK-22686 > URL: https://issues.apache.org/jira/browse/FLINK-22686 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.13.0 >Reporter: Arvid Heise >Priority: Blocker > Fix For: 1.13.1 > > > A user > [reported|https://lists.apache.org/x/list.html?u...@flink.apache.org:lte=1M:Flink%201.13.0%20reactive%20mode:%20Job%20stop%20and%20cannot%20restore%20from%20checkpoint] > that he encountered an internal error while resuming during reactive mode. > There isn't an immediate connection to reactive mode, so it's safe to assume > that one rescaling case was not covered. > {noformat} > Caused by: java.lang.IllegalStateException: Incompatible subtask mappings: > are multiple operators ingesting/producing intermediate results with varying > degrees of parallelism?Found RescaleMappings{mappings=[[0, 1, 2, 3, 4, 5, 6, > 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, > 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, > 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, > 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, > 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, > 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, > 117, 118, 119], [120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, > 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, > 147, 148, 149], [150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, > 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, > 177, 178, 179], [180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, > 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, > 207, 208, 209]]} and RescaleMappings{mappings=[[0, 7, 14, 21, 28, 35, 42, 49, > 56, 63, 70, 77, 84, 91, 98, 105, 112, 119, 126, 133, 140, 147, 154, 161, 168, > 175, 182, 189, 196, 203], [1, 8, 15, 22, 29, 36, 43, 50, 57, 64, 71, 78, 85, > 92, 99, 106, 113, 120, 127, 134, 141, 148, 155, 162, 169, 176, 183, 190, 197, > 204], [2, 9, 16, 23, 30, 37, 44, 51, 58, 65, 72, 79, 86, 93, 100, 107, 114, > 121, 128, 135, 142, 149, 156, 163, 170, 177, 184, 191, 198, 205], [3, 10, 17, > 24, 31, 38, 45, 52, 59, 66, 73, 80, 87, 94, 101, 108, 115, 122, 129, 136, > 143, 150, 157, 164, 171, 178, 185, 192, 199, 206], [4, 11, 18, 25, 32, 39, > 46, 53, 60, 67, 74, 81, 88, 95, 102, 109, 116, 123, 130, 137, 144, 151, 158, > 165, 172, 179, 186, 193, 200, 207], [5, 12, 19, 26, 33, 40, 47, 54, 61, 68, > 75, 82, 89, 96, 103, 110, 117, 124, 131, 138, 145, 152, 159, 166, 173, 180, > 187, 194, 201, 208], [6, 13, 20, 27, 34, 41, 48, 55, 62, 69, 76, 83, 90, 97, > 104, 111, 118, 125, 132, 139, 146, 153, 160, 167, 174, 181, 188, 195, 202, > 209]]}. > at > org.apache.flink.runtime.checkpoint.TaskStateAssignment.checkSubtaskMapping(TaskStateAssignment.java:322) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.TaskStateAssignment.getInputMapping(TaskStateAssignment.java:306) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeInputChannelStates(StateAssignmentOperation.java:409) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:193) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:139) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1566) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1646) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:163) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:138) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at >
[jira] [Updated] (FLINK-22686) Incompatible subtask mappings while resuming from unaligned checkpoints
[ https://issues.apache.org/jira/browse/FLINK-22686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ChangZhuo Chen (陳昌倬) updated FLINK-22686: - Attachment: (was: 3.png) > Incompatible subtask mappings while resuming from unaligned checkpoints > --- > > Key: FLINK-22686 > URL: https://issues.apache.org/jira/browse/FLINK-22686 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.13.0 >Reporter: Arvid Heise >Priority: Blocker > Fix For: 1.13.1 > > > A user > [reported|https://lists.apache.org/x/list.html?u...@flink.apache.org:lte=1M:Flink%201.13.0%20reactive%20mode:%20Job%20stop%20and%20cannot%20restore%20from%20checkpoint] > that he encountered an internal error while resuming during reactive mode. > There isn't an immediate connection to reactive mode, so it's safe to assume > that one rescaling case was not covered. > {noformat} > Caused by: java.lang.IllegalStateException: Incompatible subtask mappings: > are multiple operators ingesting/producing intermediate results with varying > degrees of parallelism?Found RescaleMappings{mappings=[[0, 1, 2, 3, 4, 5, 6, > 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, > 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, > 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, > 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, > 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, > 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, > 117, 118, 119], [120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, > 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, > 147, 148, 149], [150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, > 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, > 177, 178, 179], [180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, > 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, > 207, 208, 209]]} and RescaleMappings{mappings=[[0, 7, 14, 21, 28, 35, 42, 49, > 56, 63, 70, 77, 84, 91, 98, 105, 112, 119, 126, 133, 140, 147, 154, 161, 168, > 175, 182, 189, 196, 203], [1, 8, 15, 22, 29, 36, 43, 50, 57, 64, 71, 78, 85, > 92, 99, 106, 113, 120, 127, 134, 141, 148, 155, 162, 169, 176, 183, 190, 197, > 204], [2, 9, 16, 23, 30, 37, 44, 51, 58, 65, 72, 79, 86, 93, 100, 107, 114, > 121, 128, 135, 142, 149, 156, 163, 170, 177, 184, 191, 198, 205], [3, 10, 17, > 24, 31, 38, 45, 52, 59, 66, 73, 80, 87, 94, 101, 108, 115, 122, 129, 136, > 143, 150, 157, 164, 171, 178, 185, 192, 199, 206], [4, 11, 18, 25, 32, 39, > 46, 53, 60, 67, 74, 81, 88, 95, 102, 109, 116, 123, 130, 137, 144, 151, 158, > 165, 172, 179, 186, 193, 200, 207], [5, 12, 19, 26, 33, 40, 47, 54, 61, 68, > 75, 82, 89, 96, 103, 110, 117, 124, 131, 138, 145, 152, 159, 166, 173, 180, > 187, 194, 201, 208], [6, 13, 20, 27, 34, 41, 48, 55, 62, 69, 76, 83, 90, 97, > 104, 111, 118, 125, 132, 139, 146, 153, 160, 167, 174, 181, 188, 195, 202, > 209]]}. > at > org.apache.flink.runtime.checkpoint.TaskStateAssignment.checkSubtaskMapping(TaskStateAssignment.java:322) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.TaskStateAssignment.getInputMapping(TaskStateAssignment.java:306) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeInputChannelStates(StateAssignmentOperation.java:409) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:193) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:139) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1566) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1646) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:163) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:138) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at >
[jira] [Updated] (FLINK-22686) Incompatible subtask mappings while resuming from unaligned checkpoints
[ https://issues.apache.org/jira/browse/FLINK-22686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ChangZhuo Chen (陳昌倬) updated FLINK-22686: - Attachment: (was: 2.png) > Incompatible subtask mappings while resuming from unaligned checkpoints > --- > > Key: FLINK-22686 > URL: https://issues.apache.org/jira/browse/FLINK-22686 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.13.0 >Reporter: Arvid Heise >Priority: Blocker > Fix For: 1.13.1 > > > A user > [reported|https://lists.apache.org/x/list.html?u...@flink.apache.org:lte=1M:Flink%201.13.0%20reactive%20mode:%20Job%20stop%20and%20cannot%20restore%20from%20checkpoint] > that he encountered an internal error while resuming during reactive mode. > There isn't an immediate connection to reactive mode, so it's safe to assume > that one rescaling case was not covered. > {noformat} > Caused by: java.lang.IllegalStateException: Incompatible subtask mappings: > are multiple operators ingesting/producing intermediate results with varying > degrees of parallelism?Found RescaleMappings{mappings=[[0, 1, 2, 3, 4, 5, 6, > 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, > 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, > 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, > 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, > 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, > 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, > 117, 118, 119], [120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, > 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, > 147, 148, 149], [150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, > 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, > 177, 178, 179], [180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, > 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, > 207, 208, 209]]} and RescaleMappings{mappings=[[0, 7, 14, 21, 28, 35, 42, 49, > 56, 63, 70, 77, 84, 91, 98, 105, 112, 119, 126, 133, 140, 147, 154, 161, 168, > 175, 182, 189, 196, 203], [1, 8, 15, 22, 29, 36, 43, 50, 57, 64, 71, 78, 85, > 92, 99, 106, 113, 120, 127, 134, 141, 148, 155, 162, 169, 176, 183, 190, 197, > 204], [2, 9, 16, 23, 30, 37, 44, 51, 58, 65, 72, 79, 86, 93, 100, 107, 114, > 121, 128, 135, 142, 149, 156, 163, 170, 177, 184, 191, 198, 205], [3, 10, 17, > 24, 31, 38, 45, 52, 59, 66, 73, 80, 87, 94, 101, 108, 115, 122, 129, 136, > 143, 150, 157, 164, 171, 178, 185, 192, 199, 206], [4, 11, 18, 25, 32, 39, > 46, 53, 60, 67, 74, 81, 88, 95, 102, 109, 116, 123, 130, 137, 144, 151, 158, > 165, 172, 179, 186, 193, 200, 207], [5, 12, 19, 26, 33, 40, 47, 54, 61, 68, > 75, 82, 89, 96, 103, 110, 117, 124, 131, 138, 145, 152, 159, 166, 173, 180, > 187, 194, 201, 208], [6, 13, 20, 27, 34, 41, 48, 55, 62, 69, 76, 83, 90, 97, > 104, 111, 118, 125, 132, 139, 146, 153, 160, 167, 174, 181, 188, 195, 202, > 209]]}. > at > org.apache.flink.runtime.checkpoint.TaskStateAssignment.checkSubtaskMapping(TaskStateAssignment.java:322) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.TaskStateAssignment.getInputMapping(TaskStateAssignment.java:306) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeInputChannelStates(StateAssignmentOperation.java:409) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:193) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:139) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1566) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1646) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:163) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:138) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at >
[jira] [Updated] (FLINK-22686) Incompatible subtask mappings while resuming from unaligned checkpoints
[ https://issues.apache.org/jira/browse/FLINK-22686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ChangZhuo Chen (陳昌倬) updated FLINK-22686: - Attachment: 3.png 2.png 1.png > Incompatible subtask mappings while resuming from unaligned checkpoints > --- > > Key: FLINK-22686 > URL: https://issues.apache.org/jira/browse/FLINK-22686 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.13.0 >Reporter: Arvid Heise >Priority: Blocker > Fix For: 1.13.1 > > Attachments: 1.png, 2.png, 3.png > > > A user > [reported|https://lists.apache.org/x/list.html?u...@flink.apache.org:lte=1M:Flink%201.13.0%20reactive%20mode:%20Job%20stop%20and%20cannot%20restore%20from%20checkpoint] > that he encountered an internal error while resuming during reactive mode. > There isn't an immediate connection to reactive mode, so it's safe to assume > that one rescaling case was not covered. > {noformat} > Caused by: java.lang.IllegalStateException: Incompatible subtask mappings: > are multiple operators ingesting/producing intermediate results with varying > degrees of parallelism?Found RescaleMappings{mappings=[[0, 1, 2, 3, 4, 5, 6, > 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, > 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, > 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, > 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, > 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, > 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, > 117, 118, 119], [120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, > 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, > 147, 148, 149], [150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, > 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, > 177, 178, 179], [180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, > 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, > 207, 208, 209]]} and RescaleMappings{mappings=[[0, 7, 14, 21, 28, 35, 42, 49, > 56, 63, 70, 77, 84, 91, 98, 105, 112, 119, 126, 133, 140, 147, 154, 161, 168, > 175, 182, 189, 196, 203], [1, 8, 15, 22, 29, 36, 43, 50, 57, 64, 71, 78, 85, > 92, 99, 106, 113, 120, 127, 134, 141, 148, 155, 162, 169, 176, 183, 190, 197, > 204], [2, 9, 16, 23, 30, 37, 44, 51, 58, 65, 72, 79, 86, 93, 100, 107, 114, > 121, 128, 135, 142, 149, 156, 163, 170, 177, 184, 191, 198, 205], [3, 10, 17, > 24, 31, 38, 45, 52, 59, 66, 73, 80, 87, 94, 101, 108, 115, 122, 129, 136, > 143, 150, 157, 164, 171, 178, 185, 192, 199, 206], [4, 11, 18, 25, 32, 39, > 46, 53, 60, 67, 74, 81, 88, 95, 102, 109, 116, 123, 130, 137, 144, 151, 158, > 165, 172, 179, 186, 193, 200, 207], [5, 12, 19, 26, 33, 40, 47, 54, 61, 68, > 75, 82, 89, 96, 103, 110, 117, 124, 131, 138, 145, 152, 159, 166, 173, 180, > 187, 194, 201, 208], [6, 13, 20, 27, 34, 41, 48, 55, 62, 69, 76, 83, 90, 97, > 104, 111, 118, 125, 132, 139, 146, 153, 160, 167, 174, 181, 188, 195, 202, > 209]]}. > at > org.apache.flink.runtime.checkpoint.TaskStateAssignment.checkSubtaskMapping(TaskStateAssignment.java:322) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.TaskStateAssignment.getInputMapping(TaskStateAssignment.java:306) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeInputChannelStates(StateAssignmentOperation.java:409) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:193) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:139) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1566) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1646) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:163) > ~[flink-dist_2.12-1.13.0.jar:1.13.0] > at > org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:138) >
[GitHub] [flink] flinkbot commented on pull request #15939: [FLINK-15390][Connectors/ORC][Connectors/Hive]List/Map/Struct types support for vectorized orc reader
flinkbot commented on pull request #15939: URL: https://github.com/apache/flink/pull/15939#issuecomment-842780777 ## CI report: * 605cdc74e9e601e47c941403361470384dafeaf0 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15937: [FLINK-22653][python][table-planner-blink] Support StreamExecPythonOverAggregate json serialization/deserialization
flinkbot edited a comment on pull request #15937: URL: https://github.com/apache/flink/pull/15937#issuecomment-842291900 ## CI report: * bef596d85790f56b524ac4a8345a0312bbba1659 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18044) * fb83501c51fcedd527d011d4275285913c42e5d0 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18057) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15936: [FLINK-22683][runtime] Fix the null or incorrect value of total Flink…
flinkbot edited a comment on pull request #15936: URL: https://github.com/apache/flink/pull/15936#issuecomment-842214419 ## CI report: * a1feff3c7c34e4ef0ef939d70e1f389f030de721 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18031) * e46df619d0b485fea75e9c36514be22887514314 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14830: [FLINK-20487][table-planner-blink] Remove restriction on StreamPhysicalGroupWindowAggregate which only supports insert-only input nod
flinkbot edited a comment on pull request #14830: URL: https://github.com/apache/flink/pull/14830#issuecomment-770878871 ## CI report: * bc2786c856a33b6629ee3d6dc55458c35860dded Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18056) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18026) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19481) Add support for a flink native GCS FileSystem
[ https://issues.apache.org/jira/browse/FLINK-19481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17346542#comment-17346542 ] Xintong Song commented on FLINK-19481: -- {quote}The runtime complexity of having the additional Hadoop layer will likely be strictly worse. This is because each layer has it's own configuration and things like thread pooling, pool sizes, buffering, and other non-trivial tuning parameters. {quote} I'm not sure about this. Looking into o.a.f.runtime.fs.hdfs.HadoopFileSystem, the Flink filesystem is practically a layer of API mappings around the Hadoop filesystem. It might be true that the parameters to be tuned are separated into different layers, but I wonder how many extra parameters, thus complexity, are introduced due to the additional layer. Shouldn't the total amount of parameters be the same? {quote}In my experience the more native (fewer layers of abstraction) you can achieve the better the result. {quote} I admit that, if we are building the GCS file system from the ground up, the less layers the better. # GCS SDK -> Hadoop FileSystem -> Flink FileSystem # GCS SDK -> Flink FileSystem However, we don't have to build everything from the ground up. In the first path above, there are already off-the-shelf solution for both mappings (google connector for sdk -> hadoop fs, and o.a.f.runtime.fs.hdfs.HadoopFileSystem for hadoop-> flink). It requires almost no extra efforts in addition to assembling existing artifacts. On the other hand, in the second path we need to implement a brand new file system, which seems to be re-inventing the wheel. {quote}It seems from reading the comments here though that a good solution would be a hybrid of Ben's work on the native GCS Filesystem combined with Galen's work on the RecoverableWriter. {quote} Unless there're more inputs on why we should have a native GCS file system, I'm leaning towards not introducing such a native implementation based on the discussion so far. > Add support for a flink native GCS FileSystem > - > > Key: FLINK-19481 > URL: https://issues.apache.org/jira/browse/FLINK-19481 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem, FileSystems >Affects Versions: 1.12.0 >Reporter: Ben Augarten >Priority: Minor > Labels: auto-deprioritized-major > > Currently, GCS is supported but only by using the hadoop connector[1] > > The objective of this improvement is to add support for checkpointing to > Google Cloud Storage with the Flink File System, > > This would allow the `gs://` scheme to be used for savepointing and > checkpointing. Long term, it would be nice if we could use the GCS FileSystem > as a source and sink in flink jobs as well. > > Long term, I hope that implementing a flink native GCS FileSystem will > simplify usage of GCS because the hadoop FileSystem ends up bringing in many > unshaded dependencies. > > [1] > [https://github.com/GoogleCloudDataproc/hadoop-connectors|https://github.com/GoogleCloudDataproc/hadoop-connectors)] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on pull request #15939: [FLINK-15390][Connectors/ORC][Connectors/Hive]List/Map/Struct types support for vectorized orc reader
flinkbot commented on pull request #15939: URL: https://github.com/apache/flink/pull/15939#issuecomment-842774944 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 605cdc74e9e601e47c941403361470384dafeaf0 (Tue May 18 02:23:05 UTC 2021) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wangwei1025 opened a new pull request #15939: [FLINK-15390][Connectors/ORC][Connectors/Hive]List/Map/Struct types support for vectorized orc reader
wangwei1025 opened a new pull request #15939: URL: https://github.com/apache/flink/pull/15939 ## What is the purpose of the change *This pull request support vectorized orc reader with nested types (List/Map/Struct).* ## Brief change log - *Add OrcArrayColumnVector,OrcMapColumnVector,OrcRowColumnVector to support vectorized orc reader with nested types * - *Add ColumnarMapData to support map type data* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: - *Add integration tests (OrcFileSystemITCase.testNestedTypes) that validate orc connector support List/Map/Struct types in SQL* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no)no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no)no - The serializers: (yes / no / don't know)no - The runtime per-record code paths (performance sensitive): (yes / no / don't know)no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know)no - The S3 file system connector: (yes / no / don't know)no ## Documentation - Does this pull request introduce a new feature? (yes / no) no - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15937: [FLINK-22653][python][table-planner-blink] Support StreamExecPythonOverAggregate json serialization/deserialization
flinkbot edited a comment on pull request #15937: URL: https://github.com/apache/flink/pull/15937#issuecomment-842291900 ## CI report: * bef596d85790f56b524ac4a8345a0312bbba1659 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18044) * fb83501c51fcedd527d011d4275285913c42e5d0 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-22688) Root Exception can not be shown on Web UI in Flink 1.13.0
[ https://issues.apache.org/jira/browse/FLINK-22688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17346533#comment-17346533 ] Yangze Guo commented on FLINK-22688: cc [~mapohl] who can shed some light on this issue. > Root Exception can not be shown on Web UI in Flink 1.13.0 > - > > Key: FLINK-22688 > URL: https://issues.apache.org/jira/browse/FLINK-22688 > Project: Flink > Issue Type: Bug >Reporter: Gary Wu >Priority: Major > > Hi, > > We have upgraded our Flink applications to 1.13.0 but we found that Root > Exception can not be shown on Web UI with an internal server error message. > After opening a browser development console and trace the message, we found > that there is an exception in job manager: > > _{color:#00}2021-05-12 13:30:45,589 ERROR > org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler [] - Unhandled > exception.{color}_ > _{color:#00}java.lang.IllegalArgumentException: The location must not be > null for a non-global failure.{color}_ > _{color:#00}at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) > ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ > _{color:#00}at > org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.assertLocalExceptionInfo(JobExceptionsHandler.java:218) > ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ > _{color:#00}at > org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createRootExceptionInfo(JobExceptionsHandler.java:191) > ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ > _{color:#00}at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > ~[?:?]{color}_ > _{color:#00}at > java.util.stream.SliceOps$1$1.accept(SliceOps.java:199) ~[?:?]{color}_ > _{color:#00}at > java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1632) > ~[?:?]{color}_ > _{color:#00}at > java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127) > ~[?:?]{color}_ > _{color:#00}at > java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502) > ~[?:?]{color}_ > _{color:#00}at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488) > ~[?:?]{color}_ > _{color:#00}at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > ~[?:?]{color}_ > _{color:#00}at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) > ~[?:?]{color}_ > _{color:#00}at > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > ~[?:?]{color}_ > _{color:#00}at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) > ~[?:?]{color}_ > _{color:#00}at > org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createJobExceptionHistory(JobExceptionsHandler.java:169) > ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ > _{color:#00}at > org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createJobExceptionsInfo(JobExceptionsHandler.java:154) > ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ > _{color:#00}at > org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.handleRequest(JobExceptionsHandler.java:101) > ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ > _{color:#00}at > org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.handleRequest(JobExceptionsHandler.java:63) > ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ > _{color:#00}at > org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$0(AbstractExecutionGraphHandler.java:87) > ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ > _{color:#00}at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) > [?:?]{color}_ > _{color:#00}at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) > [?:?]{color}_ > _{color:#00}at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > [?:?]{color}_ > _{color:#00}at > java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]{color}_ > _{color:#00}at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) > [?:?]{color}_ > _{color:#00}at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > [?:?]{color}_ > _{color:#00}at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > [?:?]{color}_ > _{color:#00}at java.lang.Thread.run(Thread.java:834) [?:?]{color}_ > > I see there are some exceptions in task managers and I remember the kind of > exception can be shown in UI in version 1.12.1 : > > _2021-05-18
[GitHub] [flink] flinkbot edited a comment on pull request #14830: [FLINK-20487][table-planner-blink] Remove restriction on StreamPhysicalGroupWindowAggregate which only supports insert-only input nod
flinkbot edited a comment on pull request #14830: URL: https://github.com/apache/flink/pull/14830#issuecomment-770878871 ## CI report: * bc2786c856a33b6629ee3d6dc55458c35860dded Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18026) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18056) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-22663) Release YARN resource very slow when cancel the job after some NodeManagers shutdown
[ https://issues.apache.org/jira/browse/FLINK-22663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17346530#comment-17346530 ] Jinhong Liu commented on FLINK-22663: - [~fly_in_gis] # I shut down one NodeMnager when the job is running. # Then I cancel the job with the Filnk API, the problem will occur. At this point, If the restart the NodeManager, the problem will go away. # If I don't cancel the job but kill the job with YARN API, there is no problem. So, I think this is a bug of Flink AppMaster, it will try to connect the dead NodeManger to release the resources. > Release YARN resource very slow when cancel the job after some NodeManagers > shutdown > > > Key: FLINK-22663 > URL: https://issues.apache.org/jira/browse/FLINK-22663 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.12.2 >Reporter: Jinhong Liu >Priority: Major > Labels: YARN > > When I test flink on YARN, there is a case that may cause some problems. > Hadoop Version: 2.7.3 > Flink Version: 1.12.2 > I deploy a flink job on YARN, when the job is running I stop one NodeManager, > after one or two minutes, the job is auto recovered. But in this situation, > if I cancel the job, the containers cannot be released immediately, there are > still some containers that are running include the app master. About 5 > minutes later, these containers exit, and about 10 minutes later the app > master exit. > I check the log of app master, seems it try to stop the containers on the > NodeManger which I have already stopped. > {code:java} > 2021-05-14 06:15:17,389 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job class > tv.freewheel.reporting.fastlane.Fastlane$ (da883ab39a7a82e4d45a3803bc77dd6f) > switched from state CANCELLING to CANCELED. > 2021-05-14 06:15:17,389 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Stopping > checkpoint coordinator for job da883ab39a7a82e4d45a3803bc77dd6f. > 2021-05-14 06:15:17,390 INFO > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - > Shutting down > 2021-05-14 06:15:17,408 INFO > org.apache.flink.runtime.dispatcher.MiniDispatcher [] - Job > da883ab39a7a82e4d45a3803bc77dd6f reached globally terminal state CANCELED. > 2021-05-14 06:15:17,409 INFO > org.apache.flink.runtime.dispatcher.MiniDispatcher [] - Shutting > down cluster with state CANCELED, jobCancelled: true, executionMode: DETACHED > 2021-05-14 06:15:17,409 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting > YarnJobClusterEntrypoint down with application status CANCELED. Diagnostics > null. > 2021-05-14 06:15:17,409 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting > down rest endpoint. > 2021-05-14 06:15:17,420 INFO org.apache.flink.runtime.jobmaster.JobMaster > [] - Stopping the JobMaster for job class > tv.freewheel.reporting.fastlane.Fastlane$(da883ab39a7a82e4d45a3803bc77dd6f). > 2021-05-14 06:15:17,422 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing > cache directory > /tmp/flink-web-af72a00c-0ddd-4e5e-a62c-8244d6caa552/flink-web-ui > 2021-05-14 06:15:17,432 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - > http://ip-10-23-19-197.ec2.internal:43811 lost leadership > 2021-05-14 06:15:17,432 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down > complete. > 2021-05-14 06:15:17,436 INFO > org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - > Shut down cluster because application is in CANCELED, diagnostics null. > 2021-05-14 06:15:17,436 INFO org.apache.flink.yarn.YarnResourceManagerDriver > [] - Unregister application from the YARN Resource Manager with > final status KILLED. > 2021-05-14 06:15:17,458 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Suspending > SlotPool. > 2021-05-14 06:15:17,458 INFO org.apache.flink.runtime.jobmaster.JobMaster > [] - Close ResourceManager connection > 493862ba148679a4f16f7de5ffaef665: Stopping JobMaster for job class > tv.freewheel.reporting.fastlane.Fastlane$(da883ab39a7a82e4d45a3803bc77dd6f).. > 2021-05-14 06:15:17,458 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Stopping > SlotPool. > 2021-05-14 06:15:17,482 INFO > org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl[] - Waiting for > application to be successfully unregistered. > 2021-05-14 06:15:17,566 INFO org.apache.flink.runtime.history.FsJobArchivist > [] - Job da883ab39a7a82e4d45a3803bc77dd6f has been archived at
[GitHub] [flink] yittg commented on pull request #15501: [FLINK-22054][k8s] Using a shared watcher for ConfigMap watching
yittg commented on pull request #15501: URL: https://github.com/apache/flink/pull/15501#issuecomment-842765986 Hi @wangyang0918 , does this change still make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] yittg commented on pull request #15561: [FLINK-20695][ha] Clean up ha data for job if globally terminated
yittg commented on pull request #15561: URL: https://github.com/apache/flink/pull/15561#issuecomment-842765542 Hi @tillrohrmann , would you merge it ? Just remind you to confirm you didn't miss it, :) thanks again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-22651) Support StreamExecPythonGroupAggregate json serialization/deserialization
[ https://issues.apache.org/jira/browse/FLINK-22651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo updated FLINK-22651: - Fix Version/s: 1.14.0 > Support StreamExecPythonGroupAggregate json serialization/deserialization > - > > Key: FLINK-22651 > URL: https://issues.apache.org/jira/browse/FLINK-22651 > Project: Flink > Issue Type: Sub-task > Components: API / Python, Table SQL / Planner >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Major > Labels: pull-request-available > Fix For: 1.14.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (FLINK-22651) Support StreamExecPythonGroupAggregate json serialization/deserialization
[ https://issues.apache.org/jira/browse/FLINK-22651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo resolved FLINK-22651. -- Resolution: Done Merged into master via 06dec012763c055746b635070d6bed49bb30686c > Support StreamExecPythonGroupAggregate json serialization/deserialization > - > > Key: FLINK-22651 > URL: https://issues.apache.org/jira/browse/FLINK-22651 > Project: Flink > Issue Type: Sub-task > Components: API / Python, Table SQL / Planner >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] HuangXingBo closed pull request #15928: [FLINK-22651][python][table-planner-blink] Support StreamExecPythonGroupAggregate json serialization/deserialization
HuangXingBo closed pull request #15928: URL: https://github.com/apache/flink/pull/15928 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] godfreyhe commented on pull request #14830: [FLINK-20487][table-planner-blink] Remove restriction on StreamPhysicalGroupWindowAggregate which only supports insert-only input node
godfreyhe commented on pull request #14830: URL: https://github.com/apache/flink/pull/14830#issuecomment-842764474 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] godfreyhe commented on a change in pull request #14830: [FLINK-20487][table-planner-blink] Remove restriction on StreamPhysicalGroupWindowAggregate which only supports insert-only inp
godfreyhe commented on a change in pull request #14830: URL: https://github.com/apache/flink/pull/14830#discussion_r633973171 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala ## @@ -470,20 +470,21 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti case _: StreamPhysicalGroupAggregate | _: StreamPhysicalGroupTableAggregate | _: StreamPhysicalLimit | _: StreamPhysicalPythonGroupAggregate | - _: StreamPhysicalPythonGroupTableAggregate => -// Aggregate, TableAggregate and Limit requires update_before if there are updates + _: StreamPhysicalPythonGroupTableAggregate | Review comment: nit: use `StreamPhysicalGroupWindowAggregateBase` instead of three sub-classes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] yittg commented on pull request #15768: [FLINK-22451][table] Support (*) as parameter of UDFs in Table API
yittg commented on pull request #15768: URL: https://github.com/apache/flink/pull/15768#issuecomment-842762480 @wuchong Hi, how's everything going? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-22688) Root Exception can not be shown on Web UI in Flink 1.13.0
[ https://issues.apache.org/jira/browse/FLINK-22688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17346523#comment-17346523 ] Gary Wu commented on FLINK-22688: - Hi, [~karmagyz] and [~maguowei], I have created a bug for the issue discussed in the mailing list. Thank you in advance! > Root Exception can not be shown on Web UI in Flink 1.13.0 > - > > Key: FLINK-22688 > URL: https://issues.apache.org/jira/browse/FLINK-22688 > Project: Flink > Issue Type: Bug >Reporter: Gary Wu >Priority: Major > > Hi, > > We have upgraded our Flink applications to 1.13.0 but we found that Root > Exception can not be shown on Web UI with an internal server error message. > After opening a browser development console and trace the message, we found > that there is an exception in job manager: > > _{color:#00}2021-05-12 13:30:45,589 ERROR > org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler [] - Unhandled > exception.{color}_ > _{color:#00}java.lang.IllegalArgumentException: The location must not be > null for a non-global failure.{color}_ > _{color:#00}at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) > ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ > _{color:#00}at > org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.assertLocalExceptionInfo(JobExceptionsHandler.java:218) > ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ > _{color:#00}at > org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createRootExceptionInfo(JobExceptionsHandler.java:191) > ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ > _{color:#00}at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > ~[?:?]{color}_ > _{color:#00}at > java.util.stream.SliceOps$1$1.accept(SliceOps.java:199) ~[?:?]{color}_ > _{color:#00}at > java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1632) > ~[?:?]{color}_ > _{color:#00}at > java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127) > ~[?:?]{color}_ > _{color:#00}at > java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502) > ~[?:?]{color}_ > _{color:#00}at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488) > ~[?:?]{color}_ > _{color:#00}at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > ~[?:?]{color}_ > _{color:#00}at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) > ~[?:?]{color}_ > _{color:#00}at > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > ~[?:?]{color}_ > _{color:#00}at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) > ~[?:?]{color}_ > _{color:#00}at > org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createJobExceptionHistory(JobExceptionsHandler.java:169) > ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ > _{color:#00}at > org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createJobExceptionsInfo(JobExceptionsHandler.java:154) > ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ > _{color:#00}at > org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.handleRequest(JobExceptionsHandler.java:101) > ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ > _{color:#00}at > org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.handleRequest(JobExceptionsHandler.java:63) > ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ > _{color:#00}at > org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$0(AbstractExecutionGraphHandler.java:87) > ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ > _{color:#00}at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) > [?:?]{color}_ > _{color:#00}at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) > [?:?]{color}_ > _{color:#00}at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > [?:?]{color}_ > _{color:#00}at > java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]{color}_ > _{color:#00}at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) > [?:?]{color}_ > _{color:#00}at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > [?:?]{color}_ > _{color:#00}at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > [?:?]{color}_ > _{color:#00}at java.lang.Thread.run(Thread.java:834) [?:?]{color}_ > > I see there are some exceptions in task managers and I remember the kind of >
[jira] [Created] (FLINK-22688) Root Exception can not be shown on Web UI in Flink 1.13.0
Gary Wu created FLINK-22688: --- Summary: Root Exception can not be shown on Web UI in Flink 1.13.0 Key: FLINK-22688 URL: https://issues.apache.org/jira/browse/FLINK-22688 Project: Flink Issue Type: Bug Reporter: Gary Wu Hi, We have upgraded our Flink applications to 1.13.0 but we found that Root Exception can not be shown on Web UI with an internal server error message. After opening a browser development console and trace the message, we found that there is an exception in job manager: _{color:#00}2021-05-12 13:30:45,589 ERROR org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler [] - Unhandled exception.{color}_ _{color:#00}java.lang.IllegalArgumentException: The location must not be null for a non-global failure.{color}_ _{color:#00}at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ _{color:#00}at org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.assertLocalExceptionInfo(JobExceptionsHandler.java:218) ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ _{color:#00}at org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createRootExceptionInfo(JobExceptionsHandler.java:191) ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ _{color:#00}at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) ~[?:?]{color}_ _{color:#00}at java.util.stream.SliceOps$1$1.accept(SliceOps.java:199) ~[?:?]{color}_ _{color:#00}at java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1632) ~[?:?]{color}_ _{color:#00}at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127) ~[?:?]{color}_ _{color:#00}at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502) ~[?:?]{color}_ _{color:#00}at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488) ~[?:?]{color}_ _{color:#00}at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[?:?]{color}_ _{color:#00}at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) ~[?:?]{color}_ _{color:#00}at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]{color}_ _{color:#00}at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) ~[?:?]{color}_ _{color:#00}at org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createJobExceptionHistory(JobExceptionsHandler.java:169) ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ _{color:#00}at org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createJobExceptionsInfo(JobExceptionsHandler.java:154) ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ _{color:#00}at org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.handleRequest(JobExceptionsHandler.java:101) ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ _{color:#00}at org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.handleRequest(JobExceptionsHandler.java:63) ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ _{color:#00}at org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$0(AbstractExecutionGraphHandler.java:87) ~[flink-dist_2.12-1.13.0.jar:1.13.0]{color}_ _{color:#00}at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) [?:?]{color}_ _{color:#00}at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) [?:?]{color}_ _{color:#00}at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]{color}_ _{color:#00}at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]{color}_ _{color:#00}at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]{color}_ _{color:#00}at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]{color}_ _{color:#00}at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]{color}_ _{color:#00}at java.lang.Thread.run(Thread.java:834) [?:?]{color}_ I see there are some exceptions in task managers and I remember the kind of exception can be shown in UI in version 1.12.1 : _2021-05-18 00:50:30,261 WARN org.apache.flink.runtime.taskmanager.Task [] - xxx (23/90)#13 (c345fb009b5d93628b5a6d890c8f4226) switched from RUNNING to FAILED with failure cause: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager '10.194.65.3/10.194.65.3:44273'. This might indicate that the remote task manager was lost._ _at
[GitHub] [flink] flinkbot edited a comment on pull request #15812: remove slotpoolImpl
flinkbot edited a comment on pull request #15812: URL: https://github.com/apache/flink/pull/15812#issuecomment-829441551 ## CI report: * fb9f54a13d3f63e93c1088c72acfd6de86eff36c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18051) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14841: [FLINK-21232][YARN][Common] Introduce pluggable Hadoop delegation token providers
flinkbot edited a comment on pull request #14841: URL: https://github.com/apache/flink/pull/14841#issuecomment-772182984 ## CI report: * 60e427103cca73ac1fc29fb18893e3996edb1c7a UNKNOWN * b88e8b4068b23de76bfcba85fb1373b478a5 UNKNOWN * 5a2467236a4ef8c211a9d4a9d9092d91cff1b23d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18050) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17346460#comment-17346460 ] Eduardo Winpenny Tejedor commented on FLINK-22483: -- Thanks [~trohrmann] , I'll give this a try. > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Priority: Critical > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) > ~[?:1.8.0_282] > at > java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701) > ~[?:1.8.0_282] > at >
[GitHub] [flink] flinkbot edited a comment on pull request #15771: [FLINK-18934] Idle stream does not advance watermark in connected stream
flinkbot edited a comment on pull request #15771: URL: https://github.com/apache/flink/pull/15771#issuecomment-826939649 ## CI report: * e4b89d72d1b75ae4229b0c88476cf17a2ff56551 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18049) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15904: [FLINK-22646] Unregister DeclarativeSlotManager metrics, before suspending
flinkbot edited a comment on pull request #15904: URL: https://github.com/apache/flink/pull/15904#issuecomment-839686217 ## CI report: * fb7357fd2c051f77190f860384e4ef3a51df0716 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18048) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15858: [FLINK-22067][tests] Wait for vertices to start using API
flinkbot edited a comment on pull request #15858: URL: https://github.com/apache/flink/pull/15858#issuecomment-834799316 ## CI report: * 536a3f0e34b5840b019cf571cf85d4459b52c3a7 UNKNOWN * 15328613e51f7830fc15ff7ba9ebdb3ebe7b12eb Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18046) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15937: [FLINK-22653][python][table-planner-blink] Support StreamExecPythonOverAggregate json serialization/deserialization
flinkbot edited a comment on pull request #15937: URL: https://github.com/apache/flink/pull/15937#issuecomment-842291900 ## CI report: * bef596d85790f56b524ac4a8345a0312bbba1659 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18044) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15928: [FLINK-22651][python][table-planner-blink] Support StreamExecPythonGroupAggregate json serialization/deserialization
flinkbot edited a comment on pull request #15928: URL: https://github.com/apache/flink/pull/15928#issuecomment-842073143 ## CI report: * b1ef7090e913827bbbe1442cc05ade7cec149558 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18043) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15812: remove slotpoolImpl
flinkbot edited a comment on pull request #15812: URL: https://github.com/apache/flink/pull/15812#issuecomment-829441551 ## CI report: * 03f0eb7244fcd3c670bf55fd3f692e370af6c864 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17963) * fb9f54a13d3f63e93c1088c72acfd6de86eff36c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=18051) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15812: remove slotpoolImpl
flinkbot edited a comment on pull request #15812: URL: https://github.com/apache/flink/pull/15812#issuecomment-829441551 ## CI report: * 03f0eb7244fcd3c670bf55fd3f692e370af6c864 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17963) * fb9f54a13d3f63e93c1088c72acfd6de86eff36c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] HuangXingBo closed pull request #15934: [FLINK-22652][python][table-planner-blink] Support StreamExecPythonGroupWindowAggregate json serialization/deserialization
HuangXingBo closed pull request #15934: URL: https://github.com/apache/flink/pull/15934 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15929: [FLINK-15390][Connectors/ORC]List/Map/Struct types support for vectorized orc reader
flinkbot edited a comment on pull request #15929: URL: https://github.com/apache/flink/pull/15929#issuecomment-842090480 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on pull request #15832: [FLINK-22494][ha] Introduce PossibleInconsistentStateException
XComp commented on pull request #15832: URL: https://github.com/apache/flink/pull/15832#issuecomment-841813005 The [reported build failure](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17928=logs=b2f046ab-ae17-5406-acdc-240be7e870e4=93e5ae06-d194-513d-ba8d-150ef6da1d7c=9225) happened due to the unrelated issue [FLINK-22595](https://issues.apache.org/jira/browse/FLINK-22595?focusedCommentId=17345694=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17345694) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15936: [FLINK-22683][runtime] Fix the null or incorrect value of total Flink…
flinkbot edited a comment on pull request #15936: URL: https://github.com/apache/flink/pull/15936#issuecomment-842214419 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise commented on a change in pull request #15905: [FLINK-22623][hbase] Drop BatchTableSource HBaseTableSource/Sink and related classes
AHeise commented on a change in pull request #15905: URL: https://github.com/apache/flink/pull/15905#discussion_r633698769 ## File path: flink-python/pyflink/table/descriptors.py ## @@ -33,7 +33,6 @@ 'FileSystem', 'Kafka', 'Elasticsearch', -'HBase', Review comment: Does this mean, you cannot use HBase from Python anymore? Why is it different than the other connectors? ## File path: flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java ## @@ -131,7 +120,7 @@ private static void createHBaseTable1() throws IOException { Timestamp.valueOf("2019-08-18 19:00:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:00:00"), -new BigDecimal(12345678.0001))); +new BigDecimal("12345678.0001"))); Review comment: These changes should probably go to a different commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15937: [FLINK-22653][python][table-planner-blink] Support StreamExecPythonOverAggregate json serialization/deserialization
flinkbot edited a comment on pull request #15937: URL: https://github.com/apache/flink/pull/15937#issuecomment-842291900 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] SteNicholas commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
SteNicholas commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r633194823 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** Hybrid source reader that delegates to the actual current source reader. */ Review comment: The comment of `HybridSourceReader` is confused because there is no concept of current source reader. ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** Hybrid source that switches underlying sources based on configurable source chain. */ +public class HybridSource implements Source { + +private final SourceChain sourceChain; + +public HybridSource(SourceChain sourceChain) { Review comment: IMO, `SourceChain` shouldn't be generated from user side, which could be constructed in `HybridSource`. The constructor of `HybridSource` could be initial source or source list. ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *
[GitHub] [flink] flinkbot edited a comment on pull request #15938: [FLINK-11103][runtime] Set a configurable default uncaught exception handler for all entrypoints
flinkbot edited a comment on pull request #15938: URL: https://github.com/apache/flink/pull/15938#issuecomment-842314070 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dawidwys commented on pull request #15771: [FLINK-18934] Idle stream does not advance watermark in connected stream
dawidwys commented on pull request #15771: URL: https://github.com/apache/flink/pull/15771#issuecomment-842324286 I tried addressing your comments. I am still not 100% sure about the short ACTIVE/IDLE cycle or should we rather let records be generated but halt watermarks forwarding. Do you mind taking another look @AHeise ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] godfreyhe commented on a change in pull request #15913: [FLINK-22649][python][table-planner-blink] Support StreamExecPythonCalc json serialization/deserialization
godfreyhe commented on a change in pull request #15913: URL: https://github.com/apache/flink/pull/15913#discussion_r633181871 ## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest.java ## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.Before; +import org.junit.Test; + +/** Test json serialization/deserialization for calc. */ +public class PythonCalcJsonPlanTest extends TableTestBase { + +private StreamTableTestUtil util; +private TableEnvironment tEnv; + +@Before +public void setup() { +util = streamTestUtil(TableConfig.getDefault()); +tEnv = util.getTableEnv(); + +String srcTableDdl = +"CREATE TABLE MyTable (\n" ++ " a bigint,\n" ++ " b int not null,\n" ++ " c varchar,\n" ++ " d timestamp(3)\n" ++ ") with (\n" ++ " 'connector' = 'values',\n" ++ " 'bounded' = 'false')"; +tEnv.executeSql(srcTableDdl); +} + +@Test +public void testPythonCalc() { +tEnv.createTemporaryFunction("pyFunc", new PythonScalarFunction("pyFunc")); +String sinkTableDdl = +"CREATE TABLE MySink (\n" ++ " a bigint,\n" ++ " b int\n" ++ ") with (\n" ++ " 'connector' = 'values',\n" ++ " 'table-sink-class' = 'DEFAULT')"; +tEnv.executeSql(sinkTableDdl); +util.verifyJsonPlan("insert into MySink select a, pyFunc(b, b) from MyTable"); +} Review comment: it's better add a test with filter ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCalc.scala ## @@ -49,8 +51,10 @@ class BatchPhysicalPythonCalc( } override def translateToExecNode(): ExecNode[_] = { +val projection = calcProgram.getProjectList.map(calcProgram.expandLocalRef) Review comment: nit: it's better to check the condition in `calcProgram` is empty ? ## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest.java ## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.Before; +import org.junit.Test; + +/** Test json
[GitHub] [flink] loniecc closed pull request #15930: [FLINK-22679] code optimization:Transformation
loniecc closed pull request #15930: URL: https://github.com/apache/flink/pull/15930 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #15924: [FLINK-22670][FLIP-150] Hybrid source baseline
flinkbot commented on pull request #15924: URL: https://github.com/apache/flink/pull/15924#issuecomment-841938388 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zuston commented on a change in pull request #14841: [FLINK-21232][YARN][Common] Introduce pluggable Hadoop delegation token providers
zuston commented on a change in pull request #14841: URL: https://github.com/apache/flink/pull/14841#discussion_r633250018 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/security/HadoopDelegationTokenManager.java ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.security; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.security.delegationtokens.HadoopDelegationTokenConfiguration; +import org.apache.flink.runtime.security.delegationtokens.HadoopDelegationTokenProvider; + +import org.apache.hadoop.security.Credentials; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.ServiceLoader; + +/** + * HadoopDelegationTokenManager is responsible for managing delegation tokens. It can be used to + * obtain delegation tokens by calling `obtainDelegationTokens` method. + */ +public class HadoopDelegationTokenManager { +private static final Logger LOG = LoggerFactory.getLogger(HadoopDelegationTokenManager.class); + +private final HadoopDelegationTokenConfiguration hadoopDelegationTokenConf; +private final List delegationTokenProviders; + +public HadoopDelegationTokenManager( +HadoopDelegationTokenConfiguration hadoopDelegationTokenConf) { +this.hadoopDelegationTokenConf = hadoopDelegationTokenConf; +delegationTokenProviders = loadProviders(); +} + +/** + * Obtain delegation tokens using HadoopDelegationProviders, and store them in the give + * credentials. + * + * @param credentials Credentials object where to store the delegation tokens. + */ +public void obtainDelegationTokens(Credentials credentials) { +delegationTokenProviders.forEach( +provider -> { +if (provider.delegationTokensRequired()) { +provider.obtainDelegationTokens(credentials); +} else { +LOG.info( +"Service {} does not need to require a token,", +provider.serviceName()); +} +}); +} + +private List loadProviders() { +ServiceLoader serviceLoader = +ServiceLoader.load(HadoopDelegationTokenProvider.class); + +List providers = new ArrayList<>(); + +Iterator providerIterator = serviceLoader.iterator(); +providerIterator.forEachRemaining( +provider -> { +try { +provider.init(hadoopDelegationTokenConf); Review comment: Add logs that indicates which is hadoop delegation token provider loaded? ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ## @@ -197,13 +196,19 @@ private static LocalResource registerLocalResource( } public static void setTokensFor( -ContainerLaunchContext amContainer, List paths, Configuration conf) +org.apache.flink.configuration.Configuration flinkConf, +ContainerLaunchContext amContainer, +Configuration hadoopConf) throws IOException { Credentials credentials = new Credentials(); -// for HDFS -TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf); -// for HBase -obtainTokenForHBase(credentials, conf); + +// obtain tokens from HadoopDelegationTokenProviders Review comment: Need to be compatible with https://issues.apache.org/jira/browse/FLINK-21700# ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/security/HadoopFSDelegationTokenProvider.java ## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file
[GitHub] [flink] dmvk commented on pull request #15904: [FLINK-22646] Unregister DeclarativeSlotManager metrics, before suspending
dmvk commented on pull request #15904: URL: https://github.com/apache/flink/pull/15904#issuecomment-842354471 Hi Chesnay, thanks for the review! Nice catch with the `suspend`, I'll move the metrics de-registration there I'll try to make the test more robust / understandable tonight and cover `FineGrainedSlotManager` as well. It would be great if we could make this into 1.13.1 release, as this is a regression from 1.12.x branch. > I would suggest to change the test such that it has a separate thread querying the metric. You'd need to intercept the initial registration, and cancel the loop once the metric is being unregistered. It's little bit unclear to me, how separate thread helps to test this. The only code path, that it was supposed to stress is accessing metrics in `MetricRegistry#unregister` (this is the actual issue, we run into in Beam runner). To make test more robust: - I'll assert that register / un-register for particular metric has actually been called. (eg. for `MetricNames#TASK_SLOTS_AVAILABLE`). This should also solve the possible problem with new metrics in the future. - I can assert metric value during un-register, to make sure we still have access to a correct value. > it exhibits behavior from the metric registry that in this form doesn't exist; it never throws exceptions nor will it ever call back into the metric when it is being unregistered. I'm not sure what exactly you mean by this comment. This behavior is exactly what [Beam FileReporter](https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java#L72) does in portable runner. If you think, there is a misuse of the API on Beam's side, please let me know. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong merged pull request #15914: [FLINK-22658][table-common] Remove Deprecated util class TableConnectorUtil
wuchong merged pull request #15914: URL: https://github.com/apache/flink/pull/15914 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15799: [FLINK-22434] Store suspended execution graphs on termination to keep…
flinkbot edited a comment on pull request #15799: URL: https://github.com/apache/flink/pull/15799#issuecomment-828982589 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #15929: [FLINK-15390][Connectors/ORC]List/Map/Struct types support for vectorized orc reader
flinkbot commented on pull request #15929: URL: https://github.com/apache/flink/pull/15929#issuecomment-842078203 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #15934: [FLINK-22652][python][table-planner-blink] Support StreamExecPythonGroupWindowAggregate json serialization/deserialization
flinkbot commented on pull request #15934: URL: https://github.com/apache/flink/pull/15934#issuecomment-842141649 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JackWangCS commented on pull request #14841: [FLINK-21232][YARN][Common] Introduce pluggable Hadoop delegation token providers
JackWangCS commented on pull request #14841: URL: https://github.com/apache/flink/pull/14841#issuecomment-842484467 Hi @zuston , thanks for you review! I have made some changes based on your comment, ccould you please take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
flinkbot edited a comment on pull request #15924: URL: https://github.com/apache/flink/pull/15924#issuecomment-841943851 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pnowojski commented on a change in pull request #15885: [FLINK-22376][runtime] RecoveredChannelStateHandler recycles the buffer if it was created inside and doesn't recycle if it was
pnowojski commented on a change in pull request #15885: URL: https://github.com/apache/flink/pull/15885#discussion_r633394247 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java ## @@ -120,38 +120,36 @@ public void checkpointStopped(long checkpointId) { } public void onRecoveredStateBuffer(Buffer buffer) { -boolean recycleBuffer = true; NetworkActionsLogger.traceRecover( "InputChannelRecoveredStateHandler#recover", buffer, inputGate.getOwningTaskName(), channelInfo); -try { -final boolean wasEmpty; -synchronized (receivedBuffers) { -// Similar to notifyBufferAvailable(), make sure that we never add a buffer -// after releaseAllResources() released all buffers from receivedBuffers. -if (isReleased) { -wasEmpty = false; -} else { -wasEmpty = receivedBuffers.isEmpty(); -receivedBuffers.add(buffer); -recycleBuffer = false; -} -} -if (wasEmpty) { -notifyChannelNonEmpty(); -} -} finally { -if (recycleBuffer) { -buffer.recycleBuffer(); +final boolean wasEmpty; +synchronized (receivedBuffers) { +// Similar to notifyBufferAvailable(), make sure that we never add a buffer +// after releaseAllResources() released all buffers from receivedBuffers. +if (isReleased) { +wasEmpty = false; +} else { +wasEmpty = receivedBuffers.isEmpty(); +receivedBuffers.add(buffer.retainBuffer()); Review comment: This makes this method inconsistent with `RemoteInputChannel#onBuffer`, which makes things a bit more confusing. Also the old way seems more natural/explicit to me, that `onBuffer()` call transfers the ownership of the buffer to the `Remote/RecoveredInputChannel` and if a caller want's to re-use this buffer else where, it should be the one doing the retaining. Either way, I think this method should document in the java doc the contract whether the passed `buffer` argument's ownership is taken by the this instance or not. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java ## @@ -153,7 +153,12 @@ public BufferRecycler getRecycler() { } public void recycle() { -recycler.recycle(memorySegment); +// If at least one consumer was created then they responsible for the memory recycling +// because BufferBuilder doesn't contain a references counter so it will be impossible to +// correctly recycle memory here. +if (!bufferConsumerCreated) { +recycler.recycle(memorySegment); +} Review comment: Frankly, this seems (still?) like a partial solution/hack, because what's the contract when this method should be called? Before this `recycle()` method was introduced, it was at least clear, that `BufferBuilder` is never recycling the segment, and this is always done via closing `BufferConsumer`s. Now it seems like we are going deeper into the murky waters that "sometimes" `recycle()` should be called? I think I would like the "hack" of relaying on the `bufferConsumerCreated` flag to avoid retaining the buffer in the `BufferBuilder`, but I think it's still confusing: 1. It would be probably better to make `BufferBuilder` implement `Closeable` and rename `recycle()` to `close()` - this would probably cause quite a bit of changes, especially in tests. 2. It still doesn't solve the problem of writing to already released `memorySegment`, like what if the `BufferConsumer` was created and has already been closed, while someone is still writing data to the `BufferBuilder`? There was a bug like that, that I fixed on a different layer, but maybe we should fix this after all here as well? I mean especially measuring on the benchmarking machine if there is any overhead of retaining and recycling `NetworkBuffer` one extra time? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xintongsong closed pull request #15915: [FLINK-22592][runtime] numBuffersInLocal is always zero when using unaligned checkpoints
xintongsong closed pull request #15915: URL: https://github.com/apache/flink/pull/15915 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15913: [FLINK-22649][python][table-planner-blink] Support StreamExecPythonCalc json serialization/deserialization
flinkbot edited a comment on pull request #15913: URL: https://github.com/apache/flink/pull/15913#issuecomment-840516258 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15928: [FLINK-22651][python][table-planner-blink] Support StreamExecPythonGroupAggregate json serialization/deserialization
flinkbot edited a comment on pull request #15928: URL: https://github.com/apache/flink/pull/15928#issuecomment-842073143 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan merged pull request #15932: [BP-1.13][FLINK-22502][checkpointing] Don't tolerate checkpoint retrieval failures on recovery
rkhachatryan merged pull request #15932: URL: https://github.com/apache/flink/pull/15932 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support
flinkbot edited a comment on pull request #15599: URL: https://github.com/apache/flink/pull/15599#issuecomment-818947931 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org