[GitHub] [flink-connector-opensearch] reswqa merged pull request #21: [hotfix] Workaround new violations message
reswqa merged PR #21: URL: https://github.com/apache/flink-connector-opensearch/pull/21 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reswqa commented on pull request #21: [hotfix] Workaround new violations message
reswqa commented on PR #21: URL: https://github.com/apache/flink-connector-opensearch/pull/21#issuecomment-1546537951 Thanks for the review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Mrart commented on a diff in pull request #21527: [FLINK-27925] [kubernetes]Performance optimization when watch tm pod and list pod.
Mrart commented on code in PR #21527: URL: https://github.com/apache/flink/pull/21527#discussion_r1192918839 ## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java: ## @@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() { server.expect().get().withPath(path).andReturn(500, "Expected error").always(); } +protected void mockPodEventWithLabels(Map labels) { +final Pod pod1 = +new PodBuilder() +.withNewMetadata() +.withNamespace("test") +.withName("tm_pod1") +.withLabels(labels) +.withResourceVersion("5668") +.endMetadata() +.build(); +// mock four kinds of events. +server.expect() +.withPath( + "/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2&resourceVersion=0&allowWatchBookmarks=true&watch=true") +.andUpgradeToWebSocket() +.open() +.waitFor(1000) Review Comment: I have test, It at most 1 second. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32072) Create and wire FileMergingSnapshotManager with TaskManagerServices
[ https://issues.apache.org/jira/browse/FLINK-32072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zakelly Lan updated FLINK-32072: Fix Version/s: 1.18.0 > Create and wire FileMergingSnapshotManager with TaskManagerServices > --- > > Key: FLINK-32072 > URL: https://issues.apache.org/jira/browse/FLINK-32072 > Project: Flink > Issue Type: Sub-task >Reporter: Zakelly Lan >Assignee: Yanfei Lei >Priority: Major > Fix For: 1.18.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32092) Integrate snapshot file-merging with existing IT cases
Zakelly Lan created FLINK-32092: --- Summary: Integrate snapshot file-merging with existing IT cases Key: FLINK-32092 URL: https://issues.apache.org/jira/browse/FLINK-32092 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Assignee: Rui Xia -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32091) Add necessary metrics for file-merging
Zakelly Lan created FLINK-32091: --- Summary: Add necessary metrics for file-merging Key: FLINK-32091 URL: https://issues.apache.org/jira/browse/FLINK-32091 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Assignee: Hangxiang Yu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32089) Do fast copy in best-effort during first checkpoint after restoration
Zakelly Lan created FLINK-32089: --- Summary: Do fast copy in best-effort during first checkpoint after restoration Key: FLINK-32089 URL: https://issues.apache.org/jira/browse/FLINK-32089 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Assignee: Yanfei Lei -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32090) Python API for enabling and configuring file merging snapshot
Zakelly Lan created FLINK-32090: --- Summary: Python API for enabling and configuring file merging snapshot Key: FLINK-32090 URL: https://issues.apache.org/jira/browse/FLINK-32090 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Assignee: Yanfei Lei -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32088) Re-uploading in state file-merging for space amplification control
Zakelly Lan created FLINK-32088: --- Summary: Re-uploading in state file-merging for space amplification control Key: FLINK-32088 URL: https://issues.apache.org/jira/browse/FLINK-32088 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Assignee: Han Yin -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32087) Space amplification statistics of file merging
Zakelly Lan created FLINK-32087: --- Summary: Space amplification statistics of file merging Key: FLINK-32087 URL: https://issues.apache.org/jira/browse/FLINK-32087 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Assignee: Rui Xia -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32084) Migrate current file merging of channel state into the file merging framework
Zakelly Lan created FLINK-32084: --- Summary: Migrate current file merging of channel state into the file merging framework Key: FLINK-32084 URL: https://issues.apache.org/jira/browse/FLINK-32084 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Assignee: Yanfei Lei -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32086) Cleanup non-reported managed directory on exit of TM
Zakelly Lan created FLINK-32086: --- Summary: Cleanup non-reported managed directory on exit of TM Key: FLINK-32086 URL: https://issues.apache.org/jira/browse/FLINK-32086 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Assignee: Zakelly Lan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32085) Implement and migrate batch uploading in changelog files into the file merging framework
Zakelly Lan created FLINK-32085: --- Summary: Implement and migrate batch uploading in changelog files into the file merging framework Key: FLINK-32085 URL: https://issues.apache.org/jira/browse/FLINK-32085 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Assignee: Hangxiang Yu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32083) Chinese translation of documentation of checkpoint file-merging
Zakelly Lan created FLINK-32083: --- Summary: Chinese translation of documentation of checkpoint file-merging Key: FLINK-32083 URL: https://issues.apache.org/jira/browse/FLINK-32083 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Assignee: Hangxiang Yu Fix For: 1.18.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32082) Documentation of checkpoint file-merging
Zakelly Lan created FLINK-32082: --- Summary: Documentation of checkpoint file-merging Key: FLINK-32082 URL: https://issues.apache.org/jira/browse/FLINK-32082 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Assignee: Yanfei Lei Fix For: 1.18.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32081) Compatibility between file-merging on and off across job runs
Zakelly Lan created FLINK-32081: --- Summary: Compatibility between file-merging on and off across job runs Key: FLINK-32081 URL: https://issues.apache.org/jira/browse/FLINK-32081 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Assignee: Han Yin Fix For: 1.18.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32079) Read/write checkpoint metadata of merged files
Zakelly Lan created FLINK-32079: --- Summary: Read/write checkpoint metadata of merged files Key: FLINK-32079 URL: https://issues.apache.org/jira/browse/FLINK-32079 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Assignee: Hangxiang Yu Fix For: 1.18.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32080) Restoration of FileMergingSnapshotManager
Zakelly Lan created FLINK-32080: --- Summary: Restoration of FileMergingSnapshotManager Key: FLINK-32080 URL: https://issues.apache.org/jira/browse/FLINK-32080 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Assignee: Zakelly Lan Fix For: 1.18.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32076) Add file pool for concurrent file reusing
Zakelly Lan created FLINK-32076: --- Summary: Add file pool for concurrent file reusing Key: FLINK-32076 URL: https://issues.apache.org/jira/browse/FLINK-32076 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Assignee: Rui Xia Fix For: 1.18.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32077) Implement shared state file merging
Zakelly Lan created FLINK-32077: --- Summary: Implement shared state file merging Key: FLINK-32077 URL: https://issues.apache.org/jira/browse/FLINK-32077 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Assignee: Zakelly Lan Fix For: 1.18.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32078) Implement private state file merging
Zakelly Lan created FLINK-32078: --- Summary: Implement private state file merging Key: FLINK-32078 URL: https://issues.apache.org/jira/browse/FLINK-32078 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Assignee: Yanfei Lei Fix For: 1.18.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32075) Delete merged files on checkpoint abort
Zakelly Lan created FLINK-32075: --- Summary: Delete merged files on checkpoint abort Key: FLINK-32075 URL: https://issues.apache.org/jira/browse/FLINK-32075 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Assignee: Zakelly Lan Fix For: 1.18.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32074) Support file merging across checkpoints
Zakelly Lan created FLINK-32074: --- Summary: Support file merging across checkpoints Key: FLINK-32074 URL: https://issues.apache.org/jira/browse/FLINK-32074 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Assignee: Han Yin Fix For: 1.18.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32073) Implement file merging in snapshot
Zakelly Lan created FLINK-32073: --- Summary: Implement file merging in snapshot Key: FLINK-32073 URL: https://issues.apache.org/jira/browse/FLINK-32073 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Assignee: Han Yin -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32072) Create and wire FileMergingSnapshotManager with TaskManagerServices
Zakelly Lan created FLINK-32072: --- Summary: Create and wire FileMergingSnapshotManager with TaskManagerServices Key: FLINK-32072 URL: https://issues.apache.org/jira/browse/FLINK-32072 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Assignee: Yanfei Lei -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32071) Implement the snapshot manager for merged checkpoint files in TM
Zakelly Lan created FLINK-32071: --- Summary: Implement the snapshot manager for merged checkpoint files in TM Key: FLINK-32071 URL: https://issues.apache.org/jira/browse/FLINK-32071 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Assignee: Zakelly Lan Fix For: 1.18.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32070) FLIP-306 Unified File Merging Mechanism for Checkpoints
Zakelly Lan created FLINK-32070: --- Summary: FLIP-306 Unified File Merging Mechanism for Checkpoints Key: FLINK-32070 URL: https://issues.apache.org/jira/browse/FLINK-32070 Project: Flink Issue Type: New Feature Reporter: Zakelly Lan Assignee: Zakelly Lan Fix For: 1.18.0 The FLIP: [https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints] The creation of multiple checkpoint files can lead to a 'file flood' problem, in which a large number of files are written to the checkpoint storage in a short amount of time. This can cause issues in large clusters with high workloads, such as the creation and deletion of many files increasing the amount of file meta modification on DFS, leading to single-machine hotspot issues for meta maintainers (e.g. NameNode in HDFS). Additionally, the performance of object storage (e.g. Amazon S3 and Alibaba OSS) can significantly decrease when listing objects, which is necessary for object name de-duplication before creating an object, further affecting the performance of directory manipulation in the file system's perspective of view (See [hadoop-aws module documentation|https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=an%20intermediate%20state.-,Warning%20%232%3A%20Directories%20are%20mimicked,-The%20S3A%20clients], section 'Warning #2: Directories are mimicked'). While many solutions have been proposed for individual types of state files (e.g. FLINK-11937 for keyed state (RocksDB) and FLINK-26803 for channel state), the file flood problems from each type of checkpoint file are similar and lack systematic view and solution. Therefore, the goal of this FLIP is to establish a unified file merging mechanism to address the file flood problem during checkpoint creation for all types of state files, including keyed, non-keyed, channel, and changelog state. This will significantly improve the system stability and availability of fault tolerance in Flink. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] xuzhiwen1255 commented on a diff in pull request #21971: [FLINK-31084][connectors/dataGen] Add default value for dataGen seque…
xuzhiwen1255 commented on code in PR #21971: URL: https://github.com/apache/flink/pull/21971#discussion_r1192908722 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java: ## @@ -111,11 +120,20 @@ public boolean hasNext() { return !this.valuesToEmit.isEmpty(); } -private static int safeDivide(long left, long right) { -Preconditions.checkArgument(right > 0); -Preconditions.checkArgument(left >= 0); -Preconditions.checkArgument(left <= Integer.MAX_VALUE * right); -return (int) (left / right); +private static long safeDivide(long totalRows, long stepSize) { +Preconditions.checkArgument(stepSize > 0, "cannot be equal to 0"); +Preconditions.checkArgument(totalRows >= 0, "Cannot be less than 0"); +return totalRows / stepSize; +} + +@VisibleForTesting +public long getStart() { +return start; +} + +@VisibleForTesting +public long getEnd() { +return end; Review Comment: Sorry, I may not understand it a bit. Does adding the numberOfElementsLimit parameter have any meaning for us to test the actual sequence generation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xuzhiwen1255 commented on a diff in pull request #22010: [FLINK-31192][connectors/dataGen] Fix dataGen takes too long to initi…
xuzhiwen1255 commented on code in PR #22010: URL: https://github.com/apache/flink/pull/22010#discussion_r1192906502 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java: ## @@ -65,33 +72,46 @@ public void open( this.checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized."); -this.checkpointedState = -context.getOperatorStateStore() -.getListState( -new ListStateDescriptor<>( -name + "-sequence-state", LongSerializer.INSTANCE)); -this.valuesToEmit = new ArrayDeque<>(); -if (context.isRestored()) { -// upon restoring +ListStateDescriptor stateDescriptor = +new ListStateDescriptor<>( +name + "-sequence-state", TypeInformation.of(InternalState.class)); +this.checkpointedState = context.getOperatorStateStore().getListState(stateDescriptor); +this.internalStates = Lists.newArrayList(); -for (Long v : this.checkpointedState.get()) { -this.valuesToEmit.add(v); -} +totalNoOfElements = Math.abs(end - start + 1); +if (context.isRestored()) { +checkpointedState.get().forEach(state -> internalStates.add(state)); } else { // the first time the job is executed -final int stepSize = runtimeContext.getNumberOfParallelSubtasks(); final int taskIdx = runtimeContext.getIndexOfThisSubtask(); -final long congruence = start + taskIdx; +final long stepSize = runtimeContext.getNumberOfParallelSubtasks(); +internalStates.add(new InternalState(0, taskIdx, stepSize)); +} +} -long totalNoOfElements = Math.abs(end - start + 1); -final int baseSize = safeDivide(totalNoOfElements, stepSize); -final int toCollect = -(totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : baseSize; +private long toCollect(long baseSize, long stepSize, int taskIdx) { +return (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : baseSize; +} -for (long collected = 0; collected < toCollect; collected++) { -this.valuesToEmit.add(collected * stepSize + congruence); +public Long nextValue() { +Iterator iterator = internalStates.iterator(); +if (iterator.hasNext()) { +InternalState state = iterator.next(); +long nextSequence = state.collected * state.stepSize + (start + state.taskId); +state.collected++; Review Comment: Let me rethink, perhaps it is more reasonable to change this way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xuzhiwen1255 commented on a diff in pull request #22010: [FLINK-31192][connectors/dataGen] Fix dataGen takes too long to initi…
xuzhiwen1255 commented on code in PR #22010: URL: https://github.com/apache/flink/pull/22010#discussion_r1192906438 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java: ## @@ -65,33 +72,46 @@ public void open( this.checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized."); -this.checkpointedState = -context.getOperatorStateStore() -.getListState( -new ListStateDescriptor<>( -name + "-sequence-state", LongSerializer.INSTANCE)); -this.valuesToEmit = new ArrayDeque<>(); -if (context.isRestored()) { -// upon restoring +ListStateDescriptor stateDescriptor = +new ListStateDescriptor<>( +name + "-sequence-state", TypeInformation.of(InternalState.class)); +this.checkpointedState = context.getOperatorStateStore().getListState(stateDescriptor); +this.internalStates = Lists.newArrayList(); -for (Long v : this.checkpointedState.get()) { -this.valuesToEmit.add(v); -} +totalNoOfElements = Math.abs(end - start + 1); +if (context.isRestored()) { +checkpointedState.get().forEach(state -> internalStates.add(state)); } else { // the first time the job is executed -final int stepSize = runtimeContext.getNumberOfParallelSubtasks(); final int taskIdx = runtimeContext.getIndexOfThisSubtask(); -final long congruence = start + taskIdx; +final long stepSize = runtimeContext.getNumberOfParallelSubtasks(); +internalStates.add(new InternalState(0, taskIdx, stepSize)); +} +} -long totalNoOfElements = Math.abs(end - start + 1); -final int baseSize = safeDivide(totalNoOfElements, stepSize); -final int toCollect = -(totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : baseSize; +private long toCollect(long baseSize, long stepSize, int taskIdx) { +return (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : baseSize; +} -for (long collected = 0; collected < toCollect; collected++) { -this.valuesToEmit.add(collected * stepSize + congruence); +public Long nextValue() { +Iterator iterator = internalStates.iterator(); +if (iterator.hasNext()) { +InternalState state = iterator.next(); +long nextSequence = state.collected * state.stepSize + (start + state.taskId); +state.collected++; +// All sequence values are cleared from the stateList after they have been sent +if (state.collected +>= toCollect( +safeDivide(totalNoOfElements, state.stepSize), +state.stepSize, +state.taskId)) { +iterator.remove(); } +return nextSequence; } + +// Before calling this method, you should call hasNext to check +throw new IllegalStateException(); Review Comment: You're quite right. I'll change it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27
dannycranmer commented on code in PR #49: URL: https://github.com/apache/flink-connector-aws/pull/49#discussion_r1192703805 ## flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssigner.java: ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.enumerator.assigner; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +/** An implementation of the {@link KinesisShardAssigner} that assigns splits uniformly. */ +@Internal +public class UniformShardAssigner implements KinesisShardAssigner { +@Override +public int assign(KinesisShardSplit split, Context context) { +int selectedSubtask = -1; +int curMinAssignment = Integer.MAX_VALUE; +Map> splitAssignment = context.getCurrentSplitAssignment(); + +for (int subtaskId : context.getRegisteredReaders().keySet()) { +int subtaskAssignmentSize = +splitAssignment.getOrDefault(subtaskId, Collections.emptySet()).size(); +if (subtaskAssignmentSize < curMinAssignment) { +curMinAssignment = subtaskAssignmentSize; +selectedSubtask = subtaskId; +} +} + +Preconditions.checkArgument( +selectedSubtask != -1, +"Expected at least one registered reader. Unable to assign split."); +return selectedSubtask; Review Comment: I ran this locally with 2 parallelism and 2 shards. Both shards were assigned to subTask 0 because `getCurrentSplitAssignment` was not updated in time, race condition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] youngminz commented on a diff in pull request #615: Fix typo
youngminz commented on code in PR #615: URL: https://github.com/apache/flink-web/pull/615#discussion_r1192674124 ## docs/content/posts/2023-02-27-release-kubernetes-operator-1.4.0.md: ## @@ -84,7 +84,7 @@ We rely on users to share feedback so we can improve and make this a very robust Until now the operator only integrated with the Flink Kubernetes HA mechanism for last-state and other types of application upgrades. 1.4.0 adds support for the Zookeeper HA storage as well. -While Zookeper is a slightly older solution, many users are still using it for HA metadata even in the Kubernetes world. +While Zookeeper is a slightly older solution, many users are still using it for HA metadata even in the Kubernetes world. Review Comment: I found all of ZooKeeper in this document and replaced it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29533) Add proper table style to Flink website
[ https://issues.apache.org/jira/browse/FLINK-29533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722260#comment-17722260 ] Sakshi Sharma commented on FLINK-29533: --- Can you please guide me on where the issue is? The styling looks as expected to me. Is there something I am missing? !image-2023-05-12-14-14-58-020.png|width=736,height=262! > Add proper table style to Flink website > --- > > Key: FLINK-29533 > URL: https://issues.apache.org/jira/browse/FLINK-29533 > Project: Flink > Issue Type: Bug > Components: Project Website >Affects Versions: 1.16.0, 1.17.0 >Reporter: Matthias Pohl >Assignee: Sakshi Sharma >Priority: Major > Labels: starter > Attachments: Screenshot from 2022-10-07 08-23-01.png, > image-2023-05-12-14-14-58-020.png > > > Tables can be created using simple markdown syntax. But the corresponding > rendered table lacks proper styling: !Screenshot from 2022-10-07 > 08-23-01.png! > Several blog post work around that by adding a custom style: > * [Apache Flink Kubernetes Operator 1.0.0 Release > Announcement|https://flink.apache.org/news/2022/06/05/release-kubernetes-operator-1.0.0.html] > * [Improving speed and stability of checkpointing with generic log-based > incremental > checkpoints|https://flink.apache.org/2022/05/30/changelog-state-backend.html] > What about coming up with a common style that doesn't require people to come > up with their own custom style per post. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation
[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722259#comment-17722259 ] Martijn Visser commented on FLINK-32069: [~mapohl] Any thoughts? > jobClient.getJobStatus() can return status RUNNING for finished insert > operation > > > Key: FLINK-32069 > URL: https://issues.apache.org/jira/browse/FLINK-32069 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.1, 1.15.4 >Reporter: Aleksandr Iushmanov >Priority: Major > > Using zeppelin with remote cluster I came across some race condition issue > leading to failed expectations for SQL insert operations. > > Below is an example of zeppelin code that is failing because > jobClient.getJobStatus() returns running even after job has finished. I have > verified that same failover can happen if I use > jobClient.getJobExecutionResult().get() (Job execution result is: "Program > execution finished" but job status is not consistently finished) > {code:java} > TableResult tableResult = ((TableEnvironmentInternal) > tbenv).executeInternal(operations); > checkState(tableResult.getJobClient().isPresent()); > try { > tableResult.await(); > JobClient jobClient = tableResult.getJobClient().get(); > if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { > context.out.write("Insertion successfully.\n"); > } else { > throw new IOException("Job is failed, " + > jobClient.getJobExecutionResult().get().toString()); > } > } catch (InterruptedException e) { > throw new IOException("Flink job is interrupted", e); > } catch (ExecutionException e) { > throw new IOException("Flink job is failed", e); > } {code} > ZeppelinCode: > [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384] > I suspect that job status is returned based on runningJobsRegistry and since > 1.15 this registry is not updated with FINISHED status prior to job result > future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} > [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387] > > > It looks like as race condition that is hard to reproduce on lightweight > setup. I was reproducing this running zeppelin notebook with remote flink > cluster and triggering SQL insert operation. If I find a smaller setup to > reproduce on small local cluster with lightweight client, I will update this > ticket when I have more input. I am open to suggestions on how to fix this. > > For Zeppelin I have a separate ticket because Flink 1.15 is not going to be > fixed but this issue if I understand it correctly should be common for all > versions starting 1.15, therefore it makes sense to address this starting > 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909 > > [~mapohl], Thank you for assistance in slack, I have created this ticket to > back our conversation, could you please add your thoughts on this failure > mode? > > One possible solution would be to have additional check for presence of > JobResult in Result store before returning jobStatus (if there is a result, > job shouldn't be reported as running based on this documentation: > [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29533) Add proper table style to Flink website
[ https://issues.apache.org/jira/browse/FLINK-29533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sakshi Sharma updated FLINK-29533: -- Attachment: image-2023-05-12-14-14-58-020.png > Add proper table style to Flink website > --- > > Key: FLINK-29533 > URL: https://issues.apache.org/jira/browse/FLINK-29533 > Project: Flink > Issue Type: Bug > Components: Project Website >Affects Versions: 1.16.0, 1.17.0 >Reporter: Matthias Pohl >Assignee: Sakshi Sharma >Priority: Major > Labels: starter > Attachments: Screenshot from 2022-10-07 08-23-01.png, > image-2023-05-12-14-14-58-020.png > > > Tables can be created using simple markdown syntax. But the corresponding > rendered table lacks proper styling: !Screenshot from 2022-10-07 > 08-23-01.png! > Several blog post work around that by adding a custom style: > * [Apache Flink Kubernetes Operator 1.0.0 Release > Announcement|https://flink.apache.org/news/2022/06/05/release-kubernetes-operator-1.0.0.html] > * [Improving speed and stability of checkpointing with generic log-based > incremental > checkpoints|https://flink.apache.org/2022/05/30/changelog-state-backend.html] > What about coming up with a common style that doesn't require people to come > up with their own custom style per post. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27
dannycranmer commented on code in PR #49: URL: https://github.com/apache/flink-connector-aws/pull/49#discussion_r1192670238 ## flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/SourceConfigUtil.java: ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.config; + +import org.apache.flink.annotation.Internal; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Properties; + +import static org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT; +import static org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.STREAM_INITIAL_TIMESTAMP; +import static org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT; + +/** Utility functions to use with {@link SourceConfigConstants}. */ +@Internal +public class SourceConfigUtil { Review Comment: nit: The name is pretty generic, can we rename to `KinesisStreamsSourceConfigUtil`. i realise this is implicit by the package, but my preference is to make the class name reflect the responsibility. Same for Constants -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27
dannycranmer commented on code in PR #49: URL: https://github.com/apache/flink-connector-aws/pull/49#discussion_r1192668358 ## flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java: ## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.aws.util.AWSClientUtil; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants; +import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumerator; +import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorState; +import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorStateSerializer; +import org.apache.flink.connector.kinesis.source.proxy.KinesisStreamProxy; +import org.apache.flink.connector.kinesis.source.reader.KinesisStreamsRecordEmitter; +import org.apache.flink.connector.kinesis.source.reader.KinesisStreamsSourceReader; +import org.apache.flink.connector.kinesis.source.reader.PollingKinesisShardSplitReader; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.utils.AttributeMap; + +import java.util.Properties; +import java.util.function.Supplier; + +/** + * The {@link KinesisStreamsSource} is an exactly-once parallel streaming data source that + * subscribes to a single AWS Kinesis data stream. It is able to handle resharding of streams, and + * stores its current progress in Flink checkpoints. The source will read in data from the Kinesis + * Data stream, deserialize it using the provided {@link DeserializationSchema}, and emit the record + * into the Flink job graph. + * + * Exactly-once semantics. To leverage Flink's checkpointing mechanics for exactly-once stream + * processing, the Kinesis Source is implemented with the AWS Java SDK, instead of the officially + * recommended AWS Kinesis Client Library. The source will store its current progress in Flink + * checkpoint/savepoint, and will pick up from where it left off upon restore from the + * checkpoint/savepoint. + * + * Initial starting points. The Kinesis Streams Source supports reads starting from TRIM_HORIZON, + * LATEST, and AT_TIMESTAMP. + * + * @param the data type emitted by the source + */ +@Experimental +public class KinesisStreamsSource Review Comment: Let's add a builder like the sink has -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27
dannycranmer commented on code in PR #49: URL: https://github.com/apache/flink-connector-aws/pull/49#discussion_r1175101246 ## flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/SourceConfigConstants.java: ## @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.config; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.aws.config.AWSConfigConstants; +import org.apache.flink.connector.kinesis.source.reader.PollingKinesisShardSplitReader; + +import java.time.Duration; + +@PublicEvolving +public class SourceConfigConstants extends AWSConfigConstants { Review Comment: Why do you need to extend `AWSConfigConstants` here? We did this before to retain backwards compatibility, but since this is a new Source we might not need to do this ## flink-connector-aws-kinesis-streams/pom.xml: ## @@ -80,6 +80,12 @@ under the License. test-jar test Review Comment: Can we rename this module, it says Sink v2 currently. See the `` above ## flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java: ## @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.enumerator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.InitialPosition; +import org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory; +import org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException; +import org.apache.flink.connector.kinesis.source.model.CompletedShardsEvent; +import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; +import org.apache.flink.connector.kinesis.source.split.StartingPosition; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.model.Shard; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import static org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS; +import static org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION; +import static org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS; +import static org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.STREAM_INITIAL_POSITION; +import static org.apache.flink.connector.kinesis.source.config.SourceConfigUtil.parseStreamTimestampStartingPosition; + +/** + * Th
[GitHub] [flink-connector-aws] reswqa commented on pull request #73: [FLINK-32024][docs] Short code related to externalized connector retrieve version from its own data yaml
reswqa commented on PR #73: URL: https://github.com/apache/flink-connector-aws/pull/73#issuecomment-1546060123 Thanks for the review! I will merge this after `FLINK-32024` merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31810) RocksDBException: Bad table magic number on checkpoint rescale
[ https://issues.apache.org/jira/browse/FLINK-31810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722245#comment-17722245 ] David Artiga commented on FLINK-31810: -- Something quite similar just happened to us (using {{1.15.4}}): {code} 20230512T182327.440+0200 ERROR Caught unexpected exception. java.io.IOException: Error while opening RocksDB instance. at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:92) at org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.loadDb(RocksDBHandle.java:134) at org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.openDB(RocksDBHandle.java:124) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:240) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:219) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:186) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:166) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:325) at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:494) at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:101) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:265) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:703) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:679) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:646) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.rocksdb.RocksDBException: Bad table magic number: expected 9863518390377041911, found 370301222607884801 in /mnt/1/hadoop/yarn/nm-local-dir/usercache/splat/appcache/application_1677581391115_0019/tm_container_e02_1677581391115_0019_01_06/tmp/job_06041d3a256afa3e088b474eb626985e_op_RBEAStreamOperator_707eed42a9b74f065cc8bb6798b04782__64_180__uuid_dfc34320-a336-42dc-b4f4-d48ccdfe2c09/db/045922.sst at org.rocksdb.RocksDB.open(Native Method) at org.rocksdb.RocksDB.open(RocksDB.java:306) at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:80) ... 25 common frames omitted [org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build() @ 415] {code} > RocksDBException: Bad table magic number on checkpoint rescale > -- > > Key: FLINK-31810 > URL: https://issues.apache.org/jira/browse/FLINK-31810 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.15.2 >Reporter: Robert Metzger >Priority: Major > > While rescaling a job from checkpoint, I ran into this exception: > {code:java} > SinkMaterializer[7] -> rob-result[7]: Writer -> rob-result[7]: Committer > (4/4)#3 (c1b
[jira] [Resolved] (FLINK-32066) Flink CI service on Azure stops responding to pull requests
[ https://issues.apache.org/jira/browse/FLINK-32066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge resolved FLINK-32066. - Fix Version/s: 1.18.0 Resolution: Fixed > Flink CI service on Azure stops responding to pull requests > --- > > Key: FLINK-32066 > URL: https://issues.apache.org/jira/browse/FLINK-32066 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines >Affects Versions: 1.18.0 >Reporter: Wencong Liu >Assignee: Jing Ge >Priority: Blocker > Fix For: 1.18.0 > > Attachments: 20230512152023.jpg > > > As of the time when this issue was created, Flink's CI service on Azure could > no longer be triggered by new pull requests. > !20230512152023.jpg! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] JTaky commented on pull request #22394: [FLINK-31780][component=Runtime/Coordination] Allow users to disable 'Ensemble tracking' feature for ZooKeeper Curator framework
JTaky commented on PR #22394: URL: https://github.com/apache/flink/pull/22394#issuecomment-1546005491 Thanks a lot for the review! Sorry but I think I cannot merge this PR. https://github.com/apache/flink/assets/1190166/b189cc24-b9ea-4b35-be12-c2b4a815d045";> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32066) Flink CI service on Azure stops responding to pull requests
[ https://issues.apache.org/jira/browse/FLINK-32066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1776#comment-1776 ] Jing Ge commented on FLINK-32066: - should work now, please check and share the feedback for confirmation > Flink CI service on Azure stops responding to pull requests > --- > > Key: FLINK-32066 > URL: https://issues.apache.org/jira/browse/FLINK-32066 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines >Affects Versions: 1.18.0 >Reporter: Wencong Liu >Assignee: Jing Ge >Priority: Blocker > Attachments: 20230512152023.jpg > > > As of the time when this issue was created, Flink's CI service on Azure could > no longer be triggered by new pull requests. > !20230512152023.jpg! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-aws] usamj commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27
usamj commented on code in PR #49: URL: https://github.com/apache/flink-connector-aws/pull/49#discussion_r1192556373 ## flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java: ## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.aws.util.AWSClientUtil; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants; +import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumerator; +import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorState; +import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorStateSerializer; +import org.apache.flink.connector.kinesis.source.proxy.KinesisStreamProxy; +import org.apache.flink.connector.kinesis.source.reader.KinesisStreamsRecordEmitter; +import org.apache.flink.connector.kinesis.source.reader.KinesisStreamsSourceReader; +import org.apache.flink.connector.kinesis.source.reader.PollingKinesisShardSplitReader; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.utils.AttributeMap; + +import java.util.Properties; +import java.util.function.Supplier; + +/** + * The {@link KinesisStreamsSource} is an exactly-once parallel streaming data source that + * subscribes to a single AWS Kinesis data stream. It is able to handle resharding of streams, and + * stores its current progress in Flink checkpoints. The source will read in data from the Kinesis + * Data stream, deserialize it using the provided {@link DeserializationSchema}, and emit the record + * into the Flink job graph. + * + * Exactly-once semantics. To leverage Flink's checkpointing mechanics for exactly-once stream + * processing, the Kinesis Source is implemented with the AWS Java SDK, instead of the officially + * recommended AWS Kinesis Client Library. The source will store its current progress in Flink + * checkpoint/savepoint, and will pick up from where it left off upon restore from the + * checkpoint/savepoint. + * + * Initial starting points. The Kinesis Streams Source supports reads starting from TRIM_HORIZON, + * LATEST, and AT_TIMESTAMP. Review Comment: Nit: "LATEST, **or** AT_TIMESTAMP" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22575: Updating docs on how to upgrade kafka connectors
flinkbot commented on PR #22575: URL: https://github.com/apache/flink/pull/22575#issuecomment-1545956373 ## CI report: * d0b8d571c4900d50f52744e3d622c476428bb5bf UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5
flinkbot commented on PR #22574: URL: https://github.com/apache/flink/pull/22574#issuecomment-1545956251 ## CI report: * c7e02124129c26fbabb41f24e6f89d2dc848b510 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22573: [FLINK-32058][tests] Migrate the subclasses of BatchAbstractTestBase in runtime.batch.sql to JUnit5
flinkbot commented on PR #22573: URL: https://github.com/apache/flink/pull/22573#issuecomment-1545956110 ## CI report: * dd980a1bfd99b12404368abd94d93f299ee72d74 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22572: [FLINK-29618] Remove Timeout annotation in YARNSessionFIFOSecuredITCase
flinkbot commented on PR #22572: URL: https://github.com/apache/flink/pull/22572#issuecomment-1545955981 ## CI report: * 3e82df924333d6f676f20a50c068bb35fa7c3076 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32066) Flink CI service on Azure stops responding to pull requests
[ https://issues.apache.org/jira/browse/FLINK-32066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722212#comment-17722212 ] Jing Ge commented on FLINK-32066: - working on it > Flink CI service on Azure stops responding to pull requests > --- > > Key: FLINK-32066 > URL: https://issues.apache.org/jira/browse/FLINK-32066 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines >Affects Versions: 1.18.0 >Reporter: Wencong Liu >Priority: Blocker > Attachments: 20230512152023.jpg > > > As of the time when this issue was created, Flink's CI service on Azure could > no longer be triggered by new pull requests. > !20230512152023.jpg! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-32066) Flink CI service on Azure stops responding to pull requests
[ https://issues.apache.org/jira/browse/FLINK-32066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge reassigned FLINK-32066: --- Assignee: Jing Ge > Flink CI service on Azure stops responding to pull requests > --- > > Key: FLINK-32066 > URL: https://issues.apache.org/jira/browse/FLINK-32066 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines >Affects Versions: 1.18.0 >Reporter: Wencong Liu >Assignee: Jing Ge >Priority: Blocker > Attachments: 20230512152023.jpg > > > As of the time when this issue was created, Flink's CI service on Azure could > no longer be triggered by new pull requests. > !20230512152023.jpg! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-web] RyanSkraba commented on a diff in pull request #615: Fix typo
RyanSkraba commented on code in PR #615: URL: https://github.com/apache/flink-web/pull/615#discussion_r1192516634 ## docs/content/posts/2023-02-27-release-kubernetes-operator-1.4.0.md: ## @@ -84,7 +84,7 @@ We rely on users to share feedback so we can improve and make this a very robust Until now the operator only integrated with the Flink Kubernetes HA mechanism for last-state and other types of application upgrades. 1.4.0 adds support for the Zookeeper HA storage as well. -While Zookeper is a slightly older solution, many users are still using it for HA metadata even in the Kubernetes world. +While Zookeeper is a slightly older solution, many users are still using it for HA metadata even in the Kubernetes world. Review Comment: ```suggestion While ZooKeeper is a slightly older solution, many users are still using it for HA metadata even in the Kubernetes world. ``` This looks like an obvious typo, but the article could be improved by using the correct title case for ZooKeeper everywhere (there is an internal capital K). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] RyanSkraba commented on a diff in pull request #22010: [FLINK-31192][connectors/dataGen] Fix dataGen takes too long to initi…
RyanSkraba commented on code in PR #22010: URL: https://github.com/apache/flink/pull/22010#discussion_r1192511163 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java: ## @@ -65,33 +72,46 @@ public void open( this.checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized."); -this.checkpointedState = -context.getOperatorStateStore() -.getListState( -new ListStateDescriptor<>( -name + "-sequence-state", LongSerializer.INSTANCE)); -this.valuesToEmit = new ArrayDeque<>(); -if (context.isRestored()) { -// upon restoring +ListStateDescriptor stateDescriptor = +new ListStateDescriptor<>( +name + "-sequence-state", TypeInformation.of(InternalState.class)); +this.checkpointedState = context.getOperatorStateStore().getListState(stateDescriptor); +this.internalStates = Lists.newArrayList(); -for (Long v : this.checkpointedState.get()) { -this.valuesToEmit.add(v); -} +totalNoOfElements = Math.abs(end - start + 1); +if (context.isRestored()) { +checkpointedState.get().forEach(state -> internalStates.add(state)); } else { // the first time the job is executed -final int stepSize = runtimeContext.getNumberOfParallelSubtasks(); final int taskIdx = runtimeContext.getIndexOfThisSubtask(); -final long congruence = start + taskIdx; +final long stepSize = runtimeContext.getNumberOfParallelSubtasks(); +internalStates.add(new InternalState(0, taskIdx, stepSize)); +} +} -long totalNoOfElements = Math.abs(end - start + 1); -final int baseSize = safeDivide(totalNoOfElements, stepSize); -final int toCollect = -(totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : baseSize; +private long toCollect(long baseSize, long stepSize, int taskIdx) { +return (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : baseSize; +} -for (long collected = 0; collected < toCollect; collected++) { -this.valuesToEmit.add(collected * stepSize + congruence); +public Long nextValue() { +Iterator iterator = internalStates.iterator(); +if (iterator.hasNext()) { +InternalState state = iterator.next(); +long nextSequence = state.collected * state.stepSize + (start + state.taskId); +state.collected++; +// All sequence values are cleared from the stateList after they have been sent +if (state.collected +>= toCollect( +safeDivide(totalNoOfElements, state.stepSize), +state.stepSize, +state.taskId)) { +iterator.remove(); } +return nextSequence; } + +// Before calling this method, you should call hasNext to check +throw new IllegalStateException(); Review Comment: ```suggestion throw new IllegalStateException("SequenceGenerator.nextValue() was called with no remaining values."); ``` This code should be unreachable, so it's not a big deal -- but just in case, a helpful message is a good practice. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation
[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aleksandr Iushmanov updated FLINK-32069: Description: Using zeppelin with remote cluster I came across some race condition issue leading to failed expectations for SQL insert operations. Below is an example of zeppelin code that is failing because jobClient.getJobStatus() returns running even after job has finished. I have verified that same failover can happen if I use jobClient.getJobExecutionResult().get() (Job execution result is: "Program execution finished" but job status is not consistently finished) {code:java} TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations); checkState(tableResult.getJobClient().isPresent()); try { tableResult.await(); JobClient jobClient = tableResult.getJobClient().get(); if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { context.out.write("Insertion successfully.\n"); } else { throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString()); } } catch (InterruptedException e) { throw new IOException("Flink job is interrupted", e); } catch (ExecutionException e) { throw new IOException("Flink job is failed", e); } {code} [ https://github.com/apache/flink/pull/18189|https://github.com/apache/flink/pull/18189]. ZeppelinCode: [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384] I suspect that job status is returned based on runningJobsRegistry and since 1.15 this registry is not updated with FINISHED status prior to job result future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387] It looks like as race condition that is hard to reproduce on lightweight setup. I was reproducing this running zeppelin notebook with remote flink cluster and triggering SQL insert operation. If I find a smaller setup to reproduce on small local cluster with lightweight client, I will update this ticket when I have more input. I am open to suggestions on how to fix this. For Zeppelin I have a separate ticket because Flink 1.15 is not going to be fixed but this issue if I understand it correctly should be common for all versions starting 1.15, therefore it makes sense to address this starting 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909 [~mapohl], Thank you for assistance in slack, I have created this ticket to back our conversation, could you please add your thoughts on this failure mode? One possible solution would be to have additional check for presence of JobResult in Result store before returning jobStatus (if there is a result, job shouldn't be reported as running based on this documentation: [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--]) was: Using zeppelin with remote cluster I came across some race condition issue leading to failed expectations for SQL insert operations. Below is an example of zeppelin code that is failing because jobClient.getJobStatus() returns running even after job has finished. I have verified that same failover can happen if I use jobClient.getJobExecutionResult().get() (Job execution result is: "Program execution finished" but job status is not consistently finished) {code:java} TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations); checkState(tableResult.getJobClient().isPresent()); try { tableResult.await(); JobClient jobClient = tableResult.getJobClient().get(); if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { context.out.write("Insertion successfully.\n"); } else { throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString()); } } catch (InterruptedException e) { throw new IOException("Flink job is interrupted", e); } catch (ExecutionException e) { throw new IOException("Flink job is failed", e); } {code} [ https://github.com/apache/flink/pull/18189|https://github.com/apache/flink/pull/18189]. ZeppelinCode: [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384] TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations); checkState(tableResult.getJobClient().isPresent()); try \{ tableResult.await(); JobClient jobClient = tableResult.getJobClient().get(); if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { context.out.write("Insertion successfully.\n");
[jira] [Updated] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation
[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aleksandr Iushmanov updated FLINK-32069: Description: Using zeppelin with remote cluster I came across some race condition issue leading to failed expectations for SQL insert operations. Below is an example of zeppelin code that is failing because jobClient.getJobStatus() returns running even after job has finished. I have verified that same failover can happen if I use jobClient.getJobExecutionResult().get() (Job execution result is: "Program execution finished" but job status is not consistently finished) {code:java} TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations); checkState(tableResult.getJobClient().isPresent()); try { tableResult.await(); JobClient jobClient = tableResult.getJobClient().get(); if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { context.out.write("Insertion successfully.\n"); } else { throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString()); } } catch (InterruptedException e) { throw new IOException("Flink job is interrupted", e); } catch (ExecutionException e) { throw new IOException("Flink job is failed", e); } {code} ZeppelinCode: [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384] I suspect that job status is returned based on runningJobsRegistry and since 1.15 this registry is not updated with FINISHED status prior to job result future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387] It looks like as race condition that is hard to reproduce on lightweight setup. I was reproducing this running zeppelin notebook with remote flink cluster and triggering SQL insert operation. If I find a smaller setup to reproduce on small local cluster with lightweight client, I will update this ticket when I have more input. I am open to suggestions on how to fix this. For Zeppelin I have a separate ticket because Flink 1.15 is not going to be fixed but this issue if I understand it correctly should be common for all versions starting 1.15, therefore it makes sense to address this starting 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909 [~mapohl], Thank you for assistance in slack, I have created this ticket to back our conversation, could you please add your thoughts on this failure mode? One possible solution would be to have additional check for presence of JobResult in Result store before returning jobStatus (if there is a result, job shouldn't be reported as running based on this documentation: [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--]) was: Using zeppelin with remote cluster I came across some race condition issue leading to failed expectations for SQL insert operations. Below is an example of zeppelin code that is failing because jobClient.getJobStatus() returns running even after job has finished. I have verified that same failover can happen if I use jobClient.getJobExecutionResult().get() (Job execution result is: "Program execution finished" but job status is not consistently finished) {code:java} TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations); checkState(tableResult.getJobClient().isPresent()); try { tableResult.await(); JobClient jobClient = tableResult.getJobClient().get(); if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { context.out.write("Insertion successfully.\n"); } else { throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString()); } } catch (InterruptedException e) { throw new IOException("Flink job is interrupted", e); } catch (ExecutionException e) { throw new IOException("Flink job is failed", e); } {code} [ https://github.com/apache/flink/pull/18189|https://github.com/apache/flink/pull/18189]. ZeppelinCode: [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384] I suspect that job status is returned based on runningJobsRegistry and since 1.15 this registry is not updated with FINISHED status prior to job result future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387] It looks like as race condition that is hard to reproduce on lightweight setup. I was reproducing thi
[jira] [Created] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation
Aleksandr Iushmanov created FLINK-32069: --- Summary: jobClient.getJobStatus() can return status RUNNING for finished insert operation Key: FLINK-32069 URL: https://issues.apache.org/jira/browse/FLINK-32069 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.15.4, 1.16.1 Reporter: Aleksandr Iushmanov Using zeppelin with remote cluster I came across some race condition issue leading to failed expectations for SQL insert operations. Below is an example of zeppelin code that is failing because jobClient.getJobStatus() returns running even after job has finished. I have verified that same failover can happen if I use jobClient.getJobExecutionResult().get() (Job execution result is: "Program execution finished" but job status is not consistently finished) {code:java} TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations); checkState(tableResult.getJobClient().isPresent()); try { tableResult.await(); JobClient jobClient = tableResult.getJobClient().get(); if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { context.out.write("Insertion successfully.\n"); } else { throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString()); } } catch (InterruptedException e) { throw new IOException("Flink job is interrupted", e); } catch (ExecutionException e) { throw new IOException("Flink job is failed", e); } {code} [ https://github.com/apache/flink/pull/18189|https://github.com/apache/flink/pull/18189]. ZeppelinCode: [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384] TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations); checkState(tableResult.getJobClient().isPresent()); try \{ tableResult.await(); JobClient jobClient = tableResult.getJobClient().get(); if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { context.out.write("Insertion successfully.\n"); } else \{ throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString()); } } catch (InterruptedException e) \{ throw new IOException("Flink job is interrupted", e); } catch (ExecutionException e) \{ throw new IOException("Flink job is failed", e); } I suspect that job status is returned based on runningJobsRegistry and since 1.15 this registry is not updated with FINISHED status prior to job result future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387] It looks like as race condition that is hard to reproduce on lightweight setup. I was reproducing this running zeppelin notebook with remote flink cluster and triggering SQL insert operation. If I find a smaller setup to reproduce on small local cluster with lightweight client, I will update this ticket when I have more input. I am open to suggestions on how to fix this. For Zeppelin I have a separate ticket because Flink 1.15 is not going to be fixed but this issue if I understand it correctly should be common for all versions starting 1.15, therefore it makes sense to address this starting 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909 [~mapohl], Thank you for assistance in slack, I have created this ticket to back our conversation, could you please add your thoughts on this failure mode? One possible solution would be to have additional check for presence of JobResult in Result store before returning jobStatus (if there is a result, job shouldn't be reported as running based on this documentation: https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-web] RyanSkraba opened a new pull request, #648: [hotfix] Minor grammar fixes
RyanSkraba opened a new pull request, #648: URL: https://github.com/apache/flink-web/pull/648 I've batched together some observations that I've noted on the website, mostly grammar fixes and polish. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora merged pull request #595: [backport][FLINK-32002] Autoscaler default config improvements
gyfora merged PR #595: URL: https://github.com/apache/flink-kubernetes-operator/pull/595 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32047) Fix args in JobSpec not being passed through to Flink in Standalone mode - 1.4.0
[ https://issues.apache.org/jira/browse/FLINK-32047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722198#comment-17722198 ] Usamah Jassat commented on FLINK-32047: --- Hey Gil, have you got an example FlinkDeployment to recreate the issue? Which version of Flink do you see the issue with? > Fix args in JobSpec not being passed through to Flink in Standalone mode - > 1.4.0 > > > Key: FLINK-32047 > URL: https://issues.apache.org/jira/browse/FLINK-32047 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Gil Shmaya >Priority: Major > Attachments: image-2023-04-30-18-54-22-291.png, > image-2023-04-30-19-56-30-150.png, image-2023-04-30-19-56-57-680.png > > > This issue is related to a previously fixed bug in version 1.2.0 - > FLINK-29388 > I have noticed that while the args are successfully being passed when using > version 1.2.0, this is not the case with version 1.4.0. > {+}Scenario{+}: > I added a log that prints the argument array length at the beginning of the > main function of the flink job: > !image-2023-04-30-18-54-22-291.png! > The result when running with 1.2.0: > !image-2023-04-30-19-56-30-150.png! > The result when running with 1.4.0: > !image-2023-04-30-19-56-57-680.png! > h4. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-opensearch] reswqa opened a new pull request, #21: [hotfix] Workaround new violations message
reswqa opened a new pull request, #21: URL: https://github.com/apache/flink-connector-opensearch/pull/21 apache/flink `1.18` changes the `archunit-violations` rule about `MiniCluster` use in `ITCase`. Due to our workflow testing both `1.17` and `1.18` versions, duplicate a bit of the exclusion to suppress failure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request, #595: [backport][FLINK-32002] Autoscaler default config improvements
gyfora opened a new pull request, #595: URL: https://github.com/apache/flink-kubernetes-operator/pull/595 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora merged pull request #594: [FLINK-32002] Autoscaler default config improvements
gyfora merged PR #594: URL: https://github.com/apache/flink-kubernetes-operator/pull/594 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-28203) Mark all bundled dependencies as optional
[ https://issues.apache.org/jira/browse/FLINK-28203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-28203. Resolution: Fixed master: edf5666224f12e4b345ecafc6ae2fe5e3ecea393 49dcdf0fc2a5f113c0c1558b65df4ffa587ff72a 4fa80bf27b5501e87c9abcb2004fac8083c6d660 > Mark all bundled dependencies as optional > - > > Key: FLINK-28203 > URL: https://issues.apache.org/jira/browse/FLINK-28203 > Project: Flink > Issue Type: Sub-task > Components: Build System >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-28016) Support Maven 3.3+
[ https://issues.apache.org/jira/browse/FLINK-28016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-28016. Resolution: Fixed master: f0d01903aaa517af9c1be26b1244a778189dce65 > Support Maven 3.3+ > -- > > Key: FLINK-28016 > URL: https://issues.apache.org/jira/browse/FLINK-28016 > Project: Flink > Issue Type: Technical Debt > Components: Build System >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.18.0 > > > We are currently de-facto limited to Maven 3.2.5 because our packaging relies > on the shade-plugin modifying the dependency tree at runtime when bundling > dependencies, which is no longer possible on Maven 3.3+. > Being locked in to such an old Maven version isn't a good state to be in, and > the contributor experience suffers as well. > I've been looking into removing this limitation by explicitly marking every > dependency that we bundle as {{optional}} in the poms, which really means > {{non-transitive}}. This ensures that the everything being bundled by one > module is not visible to other modules. Some tooling to capture developer > mistakes were also written. > Overall this is actually quite a nice change, as it makes things more > explicit and reduces inconsistencies (e.g., the dependency plugin results are > questionable if the shade-plugin didn't run!); and it already highlighted > several problems in Flink. > This change will have no effect on users or the released poms, because the > dependency-reduced poms will be generated as before and remove all modified > dependencies. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] zentol merged pull request #21349: [FLINK-28203] Support Maven 3.3+
zentol merged PR #21349: URL: https://github.com/apache/flink/pull/21349 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request, #594: [FLINK-32002] Autoscaler default config improvements
gyfora opened a new pull request, #594: URL: https://github.com/apache/flink-kubernetes-operator/pull/594 Some further minor improvements to the autoscaler default config: - Turn off ineffective scaling detection by default as it probed to be a little unstable - Set a large (but not too large) number for max scale up factor to improve the doc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31706) The default source parallelism should be the same as execution's default parallelism under adaptive batch scheduler
[ https://issues.apache.org/jira/browse/FLINK-31706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722171#comment-17722171 ] clownxc commented on FLINK-31706: - (y)(y):D Thank you for your reply, I see [~wanglijie] > The default source parallelism should be the same as execution's default > parallelism under adaptive batch scheduler > --- > > Key: FLINK-31706 > URL: https://issues.apache.org/jira/browse/FLINK-31706 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Yun Tang >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > Currently, the sources need to set > {{execution.batch.adaptive.auto-parallelism.default-source-parallelism }} in > the adaptive batch scheduler mode, otherwise, the source parallelism is only > 1 by default. A better solution might be set as the default execution > parallelism if no user configured. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-32067) When no pod template configured, an invalid null pod template is configured
[ https://issues.apache.org/jira/browse/FLINK-32067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-32067. -- Fix Version/s: kubernetes-operator-1.6.0 Resolution: Fixed main: af8aabf48928804619cfdb6874700b2bf2250a87 release-1.5: f5be73b9e8861fe7134c224d68eabcd10a9ebfd3 > When no pod template configured, an invalid null pod template is configured > > > Key: FLINK-32067 > URL: https://issues.apache.org/jira/browse/FLINK-32067 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.5.0 >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Blocker > Labels: pull-request-available > Fix For: kubernetes-operator-1.5.0, kubernetes-operator-1.6.0 > > > https://issues.apache.org/jira/browse/FLINK-30609 introduced a bug in the > podtemplate logic that breaks deployments when no podtemplates are configured. > The basic example doesnt work anymore for example. The reason is that an > invalid null object is set as podtemplate when nothing is configured. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31963) java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned checkpoints
[ https://issues.apache.org/jira/browse/FLINK-31963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-31963: --- Affects Version/s: 1.15.4 1.16.1 1.18.0 > java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned > checkpoints > - > > Key: FLINK-31963 > URL: https://issues.apache.org/jira/browse/FLINK-31963 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.17.0, 1.16.1, 1.15.4, 1.18.0 > Environment: Flink: 1.17.0 > FKO: 1.4.0 > StateBackend: RocksDB(Genetic Incremental Checkpoint & Unaligned Checkpoint > enabled) >Reporter: Tan Kim >Assignee: Stefan Richter >Priority: Critical > Labels: stability > Attachments: image-2023-04-29-02-49-05-607.png, jobmanager_error.txt, > taskmanager_error.txt > > > I'm testing Autoscaler through Kubernetes Operator and I'm facing the > following issue. > As you know, when a job is scaled down through the autoscaler, the job > manager and task manager go down and then back up again. > When this happens, an index out of bounds exception is thrown and the state > is not restored from a checkpoint. > [~gyfora] told me via the Flink Slack troubleshooting channel that this is > likely an issue with Unaligned Checkpoint and not an issue with the > autoscaler, but I'm opening a ticket with Gyula for more clarification. > Please see the attached JM and TM error logs. > Thank you. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-32068) flink-connector-jdbc support clickhouse
[ https://issues.apache.org/jira/browse/FLINK-32068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser reassigned FLINK-32068: -- Assignee: leishuiyu > flink-connector-jdbc support clickhouse > > > Key: FLINK-32068 > URL: https://issues.apache.org/jira/browse/FLINK-32068 > Project: Flink > Issue Type: New Feature > Components: Connectors / JDBC >Reporter: leishuiyu >Assignee: leishuiyu >Priority: Minor > > flink sql support clickhouse > * int batch scene ,the clickhouse can as source and sink > * int stream scene ,the clickhouse as sink -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32068) flink-connector-jdbc support clickhouse
[ https://issues.apache.org/jira/browse/FLINK-32068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722142#comment-17722142 ] Martijn Visser commented on FLINK-32068: [~leishuiyu] Sure. I've assigned the ticket to you. > flink-connector-jdbc support clickhouse > > > Key: FLINK-32068 > URL: https://issues.apache.org/jira/browse/FLINK-32068 > Project: Flink > Issue Type: New Feature > Components: Connectors / JDBC >Reporter: leishuiyu >Priority: Minor > > flink sql support clickhouse > * int batch scene ,the clickhouse can as source and sink > * int stream scene ,the clickhouse as sink -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-31717) Unit tests running with local kube config
[ https://issues.apache.org/jira/browse/FLINK-31717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-31717. -- Resolution: Fixed merged to main e399c9bbd37d2dca352d973de01be0cb403fec55 > Unit tests running with local kube config > - > > Key: FLINK-31717 > URL: https://issues.apache.org/jira/browse/FLINK-31717 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.4.0 >Reporter: Matyas Orhidi >Assignee: Mate Czagany >Priority: Critical > Labels: pull-request-available > Fix For: kubernetes-operator-1.6.0 > > > Some unit tests are using local kube environment. This can be dangerous when > pointing to sensitive clusters e.g. in prod. > {quote}2023-04-03 12:32:53,956 i.f.k.c.Config [DEBUG] Found > for Kubernetes config at: [/Users//.kube/config]. > {quote} > A misconfigured kube config environment revealed the issue: > {quote}[ERROR] Tests run: 2, Failures: 0, Errors: 2, Skipped: 0, Time > elapsed: 0.012 s <<< FAILURE! - in > org.apache.flink.kubernetes.operator.FlinkOperatorTest > [ERROR] > org.apache.flink.kubernetes.operator.FlinkOperatorTest.testConfigurationPassedToJOSDK > Time elapsed: 0.008 s <<< ERROR! > java.lang.NullPointerException > at > org.apache.flink.kubernetes.operator.FlinkOperatorTest.testConfigurationPassedToJOSDK(FlinkOperatorTest.java:63) > [ERROR] > org.apache.flink.kubernetes.operator.FlinkOperatorTest.testLeaderElectionConfig > Time elapsed: 0.004 s <<< ERROR! > java.lang.NullPointerException > at > org.apache.flink.kubernetes.operator.FlinkOperatorTest.testLeaderElectionConfig(FlinkOperatorTest.java:108){quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gyfora merged pull request #587: [FLINK-31717] Fix unit tests using local kube config
gyfora merged PR #587: URL: https://github.com/apache/flink-kubernetes-operator/pull/587 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-32065) Got NoSuchFileException when initialize source function.
[ https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722121#comment-17722121 ] Spongebob edited comment on FLINK-32065 at 5/12/23 11:15 AM: - Hi [~Thesharing] Does it likelihood due to the incorrect starting of the standalone cluster? Such as secondary starting the cluster before it is stopped. was (Author: spongebobz): Hi [~Thesharing] Does it likelihood due to the incorrect starting of the standalone cluster? Such as secondary starting the cluster before stopping it. > Got NoSuchFileException when initialize source function. > > > Key: FLINK-32065 > URL: https://issues.apache.org/jira/browse/FLINK-32065 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.4 >Reporter: Spongebob >Priority: Major > Attachments: image-2023-05-12-14-07-45-771.png, > image-2023-05-12-14-26-46-268.png, image-2023-05-12-17-37-09-002.png > > > When I submit an application to flink standalone cluster, I got a > NoSuchFileException. I think it was failed to create the tmp channel file but > I am confused about the reason relative to this case. > I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so > is this diretory only working for that step of the application ? > BTW, this issue happen coincidently. > !image-2023-05-12-14-07-45-771.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32065) Got NoSuchFileException when initialize source function.
[ https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722121#comment-17722121 ] Spongebob commented on FLINK-32065: --- Hi [~Thesharing] Does it likelihood due to the incorrect starting of the standalone cluster? Such as secondary starting the cluster before stopping it. > Got NoSuchFileException when initialize source function. > > > Key: FLINK-32065 > URL: https://issues.apache.org/jira/browse/FLINK-32065 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.4 >Reporter: Spongebob >Priority: Major > Attachments: image-2023-05-12-14-07-45-771.png, > image-2023-05-12-14-26-46-268.png, image-2023-05-12-17-37-09-002.png > > > When I submit an application to flink standalone cluster, I got a > NoSuchFileException. I think it was failed to create the tmp channel file but > I am confused about the reason relative to this case. > I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so > is this diretory only working for that step of the application ? > BTW, this issue happen coincidently. > !image-2023-05-12-14-07-45-771.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31963) java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned checkpoints
[ https://issues.apache.org/jira/browse/FLINK-31963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722120#comment-17722120 ] Stefan Richter commented on FLINK-31963: Seems that this is similar to the problem described in FLINK-27031. > java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned > checkpoints > - > > Key: FLINK-31963 > URL: https://issues.apache.org/jira/browse/FLINK-31963 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.17.0 > Environment: Flink: 1.17.0 > FKO: 1.4.0 > StateBackend: RocksDB(Genetic Incremental Checkpoint & Unaligned Checkpoint > enabled) >Reporter: Tan Kim >Assignee: Stefan Richter >Priority: Critical > Labels: stability > Attachments: image-2023-04-29-02-49-05-607.png, jobmanager_error.txt, > taskmanager_error.txt > > > I'm testing Autoscaler through Kubernetes Operator and I'm facing the > following issue. > As you know, when a job is scaled down through the autoscaler, the job > manager and task manager go down and then back up again. > When this happens, an index out of bounds exception is thrown and the state > is not restored from a checkpoint. > [~gyfora] told me via the Flink Slack troubleshooting channel that this is > likely an issue with Unaligned Checkpoint and not an issue with the > autoscaler, but I'm opening a ticket with Gyula for more clarification. > Please see the attached JM and TM error logs. > Thank you. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-web] echauchot commented on pull request #643: Add blog article: Howto test a batch source with the new Source framework
echauchot commented on PR #643: URL: https://github.com/apache/flink-web/pull/643#issuecomment-1545554335 @zentol Thanks for your review (once again). I addressed all your comments PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] echauchot commented on pull request #643: Add blog article: Howto test a batch source with the new Source framework
echauchot commented on PR #643: URL: https://github.com/apache/flink-web/pull/643#issuecomment-1545544022 > > I took a quick look over it. I feel especially for the IT case it would be nice to first provide an overview over the differnet parts that make up a test "There's a test framework; We need a backend the source can read from, a test context for Flinks testing framework to interact with the backend, ..." and then drill down into the details as you already did. > > Right now it's very "I have to do this and this and this" but you don't really know where you're heading. > > Totally agree, I'll do something similar to what I did on the source creation blog: an architecture paragraph done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32068) flink-connector-jdbc support clickhouse
[ https://issues.apache.org/jira/browse/FLINK-32068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722107#comment-17722107 ] leishuiyu commented on FLINK-32068: --- This function can be implemented by me as a PR ? > flink-connector-jdbc support clickhouse > > > Key: FLINK-32068 > URL: https://issues.apache.org/jira/browse/FLINK-32068 > Project: Flink > Issue Type: New Feature > Components: Connectors / JDBC >Reporter: leishuiyu >Priority: Minor > > flink sql support clickhouse > * int batch scene ,the clickhouse can as source and sink > * int stream scene ,the clickhouse as sink -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32068) flink-connector-jdbc support clickhouse
leishuiyu created FLINK-32068: - Summary: flink-connector-jdbc support clickhouse Key: FLINK-32068 URL: https://issues.apache.org/jira/browse/FLINK-32068 Project: Flink Issue Type: New Feature Components: Connectors / JDBC Reporter: leishuiyu flink sql support clickhouse * int batch scene ,the clickhouse can as source and sink * int stream scene ,the clickhouse as sink -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31966) Flink Kubernetes operator lacks TLS support
[ https://issues.apache.org/jira/browse/FLINK-31966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722104#comment-17722104 ] Gyula Fora commented on FLINK-31966: [~adrianalexvasiliu] new configuration changes do not usually require a CRD change. It could be just an additional value set in the flinkConfiguration. If CRD changes are required (such as adding new fields) we can always make those as long as they are backward compatible. There is no schedule for that :) > Flink Kubernetes operator lacks TLS support > > > Key: FLINK-31966 > URL: https://issues.apache.org/jira/browse/FLINK-31966 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.4.0 >Reporter: Adrian Vasiliu >Priority: Major > > *Summary* > The Flink Kubernetes operator lacks support inside the FlinkDeployment > operand for configuring Flink with TLS (both one-way and mutual) for the > internal communication between jobmanagers and taskmanagers, and for the > external REST endpoint. Although a workaround exists to configure the job and > task managers, this breaks the operator and renders it unable to reconcile. > *Additional information* > * The Apache Flink operator supports passing through custom flink > configuration to be applied to job and task managers. > * If you supply SSL-based properties, the operator can no longer speak to > the deployed job manager. The operator is reading the flink conf and using it > to create a connection to the job manager REST endpoint, but it uses the > truststore file paths within flink-conf.yaml, which are unresolvable from the > operator. This leaves the operator hanging in a pending state as it cannot > complete a reconcile. > *Proposal* > Our proposal is to make changes to the operator code. A simple change exists > that would be enough to enable anonymous SSL at the REST endpoint, but more > invasive changes would be required to enable full mTLS throughout. > The simple change to enable anonymous SSL would be for the operator to parse > flink-conf and podTemplate to identify the Kubernetes resource that contains > the certificate from the job manager keystore and use it inside the > operator’s trust store. > In the case of mutual TLS, further changes are required: the operator would > need to generate a certificate signed by the same issuing authority as the > job manager’s certificates and then use it in a keystore when challenged by > that job manager. We propose that the operator becomes responsible for making > CertificateSigningRequests to generate certificates for job manager, task > manager and operator. The operator can then coordinate deploying the job and > task managers with the correct flink-conf and volume mounts. This would also > work for anonymous SSL. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] godfreyhe commented on a diff in pull request #22492: [FLINK-31950][table-planner] Introduce StateMetadata and StateMetadataJson SerDe
godfreyhe commented on code in PR #22492: URL: https://github.com/apache/flink/pull/22492#discussion_r1192099873 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java: ## @@ -55,7 +55,7 @@ private ExecNodeConfig( this.isCompiled = isCompiled; } -static ExecNodeConfig of( +public static ExecNodeConfig of( Review Comment: The reason changing the access level to public is this method can be used in testing ? If that, please add `VisibleForTesting` for the method ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/StateMetadataJsonSerializer.java: ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.serde; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.planner.plan.nodes.exec.StateMetadata; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer; + +import java.io.IOException; + +import static org.apache.flink.table.planner.plan.nodes.exec.StateMetadata.FIELD_NAME_STATE_INDEX; +import static org.apache.flink.table.planner.plan.nodes.exec.StateMetadata.FIELD_NAME_STATE_NAME; +import static org.apache.flink.table.planner.plan.nodes.exec.StateMetadata.FIELD_NAME_STATE_TTL; + +/** + * JSON serializer for {@link StateMetadata}. + * + * @see StateMetadataJsonDeserializer for the reverse operation. + */ +@Internal +public class StateMetadataJsonSerializer extends StdSerializer { + +private static final long serialVersionUID = 1L; + +StateMetadataJsonSerializer() { +super(StateMetadata.class); +} + +@Override +public void serialize( +StateMetadata stateMetadata, +JsonGenerator jsonGenerator, +SerializerProvider serializerProvider) +throws IOException { +jsonGenerator.writeStartObject(); +jsonGenerator.writeNumberField(FIELD_NAME_STATE_INDEX, stateMetadata.getStateIndex()); +jsonGenerator.writeStringField(FIELD_NAME_STATE_TTL, stateMetadata.getStateTtl() + " ms"); Review Comment: How about we use `Duration` type to avoid StateMetadataJsonSerializer/StateMetadataJsonDeserializer ? ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/StateMetadata.java: ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java
[jira] [Commented] (FLINK-32065) Got NoSuchFileException when initialize source function.
[ https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722100#comment-17722100 ] Zhilong Hong commented on FLINK-32065: -- It's hard to investigate the root cause with the limited context. Maybe full logs would be helpful. {{NoSuchFileException}} is thrown during the creation of {{{}FileChannelBoundedData{}}}. This is a bit of weird. Maybe it's because the path is deleted during the initialization of {{{}ResultPartitions{}}}. Maybe you could try to change the location of temp files to another path. Change the value of configuration {{io.tmp.dirs}} to another valid path and see if this issue is solved. > Got NoSuchFileException when initialize source function. > > > Key: FLINK-32065 > URL: https://issues.apache.org/jira/browse/FLINK-32065 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.4 >Reporter: Spongebob >Priority: Major > Attachments: image-2023-05-12-14-07-45-771.png, > image-2023-05-12-14-26-46-268.png, image-2023-05-12-17-37-09-002.png > > > When I submit an application to flink standalone cluster, I got a > NoSuchFileException. I think it was failed to create the tmp channel file but > I am confused about the reason relative to this case. > I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so > is this diretory only working for that step of the application ? > BTW, this issue happen coincidently. > !image-2023-05-12-14-07-45-771.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-web] echauchot commented on a diff in pull request #643: Add blog article: Howto test a batch source with the new Source framework
echauchot commented on code in PR #643: URL: https://github.com/apache/flink-web/pull/643#discussion_r1192171185 ## docs/content/posts/howto-test-batch-source.md: ## @@ -0,0 +1,202 @@ +--- +title: "Howto test a batch source with the new Source framework" +date: "2023-04-14T08:00:00.000Z" +authors: +- echauchot: + name: "Etienne Chauchot" + twitter: "echauchot" + +--- + +## Introduction + +The Flink community has +designed [a new Source framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/) +based +on [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) +lately. This article is the +continuation of +the [howto create a batch source with the new Source framework article](https://flink.apache.org/2023/04/14/howto-create-batch-source/) +. Now it is +time to test the created source ! As the previous article, this one was built while implementing the +[Flink batch source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca) +for [Cassandra](https://cassandra.apache.org/_/index.html). + +## Unit testing the source + +### Testing the serializers + +[example Cassandra SplitSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java) +and [SplitEnumeratorStateSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java) + +In the previous article, we +created [serializers](https://flink.apache.org/2023/04/14/howto-create-batch-source/#serializers) +for Split and SplitEnumeratorState. We should now test them in unit tests. As usual, to test serde +we just create an object, serialize it using the serializer and then deserialize it using the same +serializer and finally assert on the equality of the two objects. Thus, hascode() and equals() need +to be implemented for the serialized objects. + +### Other unit tests + +Of course, we also need to unit test low level processing such as query building for example or any +processing that does not require a running backend. + +## Integration testing the source + +[example Cassandra SourceITCase +](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java) + +For tests that require a running backend, Flink provides a JUnit5 source test framework. To use it +we create an *ITCase named class that +extends [SourceTestSuiteBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.html) +. This test suite provides all +the necessary tests already (single split, multiple splits, idle reader, etc...). It is targeted for +batch and streaming sources, so for our batch source case here, the tests below need to be disabled +as they are targeted for streaming sources. They can be disabled by overriding them in the ITCase +and annotating them with @Disabled: + +* testSourceMetrics +* testSavepoint +* testScaleUp +* testScaleDown +* testTaskManagerFailure + +Of course we can add our own integration tests cases for example tests on limits, tests on low level +splitting or any test that requires a running backend. But for most cases we only need to provide +Flink test environment classes to configure the ITCase: + +### Flink runtime environment + +We add this annotated field to our ITCase and we're done + +`@TestEnv +MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment(); +` + +### Backend runtime environment + +[example Cassandra TestEnvironment](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java) + +We add this annotated field to our ITCase + +`@TestExternalSystem +BackendTestEnvironment backendTestEnvironment = new BackendTestEnvironment(); +` + +BackendTestEnvironment +implements [TestResource](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html) +. This environment is scoped to the test suite so it is where we setup the backend runtime and +shared resources (session, tablespace, etc...) by +implementing [startup()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html#startUp--) +and [tearDown()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html#tearDown--
[GitHub] [flink] gaborgsomogyi commented on pull request #22550: [FLINK-31609][yarn][test] Extend log whitelist for expected AMRM heartbeat interrupt
gaborgsomogyi commented on PR #22550: URL: https://github.com/apache/flink/pull/22550#issuecomment-1545492583 The change itself looks good, could you plz show which test failed and how? (some log) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] echauchot commented on a diff in pull request #643: Add blog article: Howto test a batch source with the new Source framework
echauchot commented on code in PR #643: URL: https://github.com/apache/flink-web/pull/643#discussion_r1192167315 ## docs/content/posts/howto-test-batch-source.md: ## @@ -0,0 +1,202 @@ +--- +title: "Howto test a batch source with the new Source framework" +date: "2023-04-14T08:00:00.000Z" +authors: +- echauchot: + name: "Etienne Chauchot" + twitter: "echauchot" + +--- + +## Introduction + +The Flink community has +designed [a new Source framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/) +based +on [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) +lately. This article is the +continuation of +the [howto create a batch source with the new Source framework article](https://flink.apache.org/2023/04/14/howto-create-batch-source/) +. Now it is +time to test the created source ! As the previous article, this one was built while implementing the +[Flink batch source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca) +for [Cassandra](https://cassandra.apache.org/_/index.html). + +## Unit testing the source + +### Testing the serializers + +[example Cassandra SplitSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java) +and [SplitEnumeratorStateSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java) + +In the previous article, we +created [serializers](https://flink.apache.org/2023/04/14/howto-create-batch-source/#serializers) +for Split and SplitEnumeratorState. We should now test them in unit tests. As usual, to test serde +we just create an object, serialize it using the serializer and then deserialize it using the same +serializer and finally assert on the equality of the two objects. Thus, hascode() and equals() need +to be implemented for the serialized objects. + +### Other unit tests + +Of course, we also need to unit test low level processing such as query building for example or any +processing that does not require a running backend. + +## Integration testing the source + +[example Cassandra SourceITCase +](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java) + +For tests that require a running backend, Flink provides a JUnit5 source test framework. To use it +we create an *ITCase named class that +extends [SourceTestSuiteBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.html) +. This test suite provides all +the necessary tests already (single split, multiple splits, idle reader, etc...). It is targeted for +batch and streaming sources, so for our batch source case here, the tests below need to be disabled +as they are targeted for streaming sources. They can be disabled by overriding them in the ITCase +and annotating them with @Disabled: + +* testSourceMetrics +* testSavepoint +* testScaleUp +* testScaleDown +* testTaskManagerFailure + +Of course we can add our own integration tests cases for example tests on limits, tests on low level +splitting or any test that requires a running backend. But for most cases we only need to provide +Flink test environment classes to configure the ITCase: + +### Flink runtime environment + +We add this annotated field to our ITCase and we're done + +`@TestEnv +MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment(); +` + +### Backend runtime environment + +[example Cassandra TestEnvironment](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java) + +We add this annotated field to our ITCase + +`@TestExternalSystem +BackendTestEnvironment backendTestEnvironment = new BackendTestEnvironment(); +` + +BackendTestEnvironment +implements [TestResource](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html) +. This environment is scoped to the test suite so it is where we setup the backend runtime and +shared resources (session, tablespace, etc...) by +implementing [startup()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html#startUp--) +and [tearDown()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html#tearDown--
[GitHub] [flink-web] echauchot commented on a diff in pull request #643: Add blog article: Howto test a batch source with the new Source framework
echauchot commented on code in PR #643: URL: https://github.com/apache/flink-web/pull/643#discussion_r1192162301 ## docs/content/posts/howto-test-batch-source.md: ## @@ -0,0 +1,202 @@ +--- +title: "Howto test a batch source with the new Source framework" +date: "2023-04-14T08:00:00.000Z" +authors: +- echauchot: + name: "Etienne Chauchot" + twitter: "echauchot" + +--- + +## Introduction + +The Flink community has +designed [a new Source framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/) +based +on [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) +lately. This article is the +continuation of +the [howto create a batch source with the new Source framework article](https://flink.apache.org/2023/04/14/howto-create-batch-source/) +. Now it is +time to test the created source ! As the previous article, this one was built while implementing the +[Flink batch source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca) +for [Cassandra](https://cassandra.apache.org/_/index.html). + +## Unit testing the source + +### Testing the serializers + +[example Cassandra SplitSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java) +and [SplitEnumeratorStateSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java) + +In the previous article, we +created [serializers](https://flink.apache.org/2023/04/14/howto-create-batch-source/#serializers) +for Split and SplitEnumeratorState. We should now test them in unit tests. As usual, to test serde +we just create an object, serialize it using the serializer and then deserialize it using the same +serializer and finally assert on the equality of the two objects. Thus, hascode() and equals() need +to be implemented for the serialized objects. + +### Other unit tests + +Of course, we also need to unit test low level processing such as query building for example or any +processing that does not require a running backend. + +## Integration testing the source + +[example Cassandra SourceITCase +](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java) + +For tests that require a running backend, Flink provides a JUnit5 source test framework. To use it +we create an *ITCase named class that +extends [SourceTestSuiteBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.html) +. This test suite provides all +the necessary tests already (single split, multiple splits, idle reader, etc...). It is targeted for +batch and streaming sources, so for our batch source case here, the tests below need to be disabled +as they are targeted for streaming sources. They can be disabled by overriding them in the ITCase +and annotating them with @Disabled: + +* testSourceMetrics +* testSavepoint +* testScaleUp +* testScaleDown +* testTaskManagerFailure + +Of course we can add our own integration tests cases for example tests on limits, tests on low level +splitting or any test that requires a running backend. But for most cases we only need to provide +Flink test environment classes to configure the ITCase: + +### Flink runtime environment + +We add this annotated field to our ITCase and we're done + +`@TestEnv +MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment(); +` + +### Backend runtime environment Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] echauchot commented on a diff in pull request #643: Add blog article: Howto test a batch source with the new Source framework
echauchot commented on code in PR #643: URL: https://github.com/apache/flink-web/pull/643#discussion_r1192157171 ## docs/content/posts/howto-test-batch-source.md: ## @@ -0,0 +1,202 @@ +--- +title: "Howto test a batch source with the new Source framework" +date: "2023-04-14T08:00:00.000Z" +authors: +- echauchot: + name: "Etienne Chauchot" + twitter: "echauchot" + +--- + +## Introduction + +The Flink community has +designed [a new Source framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/) +based +on [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) +lately. This article is the +continuation of +the [howto create a batch source with the new Source framework article](https://flink.apache.org/2023/04/14/howto-create-batch-source/) +. Now it is +time to test the created source ! As the previous article, this one was built while implementing the +[Flink batch source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca) +for [Cassandra](https://cassandra.apache.org/_/index.html). + +## Unit testing the source + +### Testing the serializers + +[example Cassandra SplitSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java) +and [SplitEnumeratorStateSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java) + +In the previous article, we +created [serializers](https://flink.apache.org/2023/04/14/howto-create-batch-source/#serializers) +for Split and SplitEnumeratorState. We should now test them in unit tests. As usual, to test serde +we just create an object, serialize it using the serializer and then deserialize it using the same +serializer and finally assert on the equality of the two objects. Thus, hascode() and equals() need +to be implemented for the serialized objects. + +### Other unit tests + +Of course, we also need to unit test low level processing such as query building for example or any +processing that does not require a running backend. + +## Integration testing the source + +[example Cassandra SourceITCase +](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java) + +For tests that require a running backend, Flink provides a JUnit5 source test framework. To use it +we create an *ITCase named class that +extends [SourceTestSuiteBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.html) +. This test suite provides all +the necessary tests already (single split, multiple splits, idle reader, etc...). It is targeted for +batch and streaming sources, so for our batch source case here, the tests below need to be disabled +as they are targeted for streaming sources. They can be disabled by overriding them in the ITCase +and annotating them with @Disabled: + +* testSourceMetrics +* testSavepoint +* testScaleUp +* testScaleDown +* testTaskManagerFailure + +Of course we can add our own integration tests cases for example tests on limits, tests on low level +splitting or any test that requires a running backend. But for most cases we only need to provide +Flink test environment classes to configure the ITCase: + +### Flink runtime environment + +We add this annotated field to our ITCase and we're done + +`@TestEnv +MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment(); +` + +### Backend runtime environment Review Comment: ok fair enough, in that case I'll also remove "runtime" from the "Flink runtime environment" title for coherence. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] echauchot commented on a diff in pull request #643: Add blog article: Howto test a batch source with the new Source framework
echauchot commented on code in PR #643: URL: https://github.com/apache/flink-web/pull/643#discussion_r1192157171 ## docs/content/posts/howto-test-batch-source.md: ## @@ -0,0 +1,202 @@ +--- +title: "Howto test a batch source with the new Source framework" +date: "2023-04-14T08:00:00.000Z" +authors: +- echauchot: + name: "Etienne Chauchot" + twitter: "echauchot" + +--- + +## Introduction + +The Flink community has +designed [a new Source framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/) +based +on [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) +lately. This article is the +continuation of +the [howto create a batch source with the new Source framework article](https://flink.apache.org/2023/04/14/howto-create-batch-source/) +. Now it is +time to test the created source ! As the previous article, this one was built while implementing the +[Flink batch source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca) +for [Cassandra](https://cassandra.apache.org/_/index.html). + +## Unit testing the source + +### Testing the serializers + +[example Cassandra SplitSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java) +and [SplitEnumeratorStateSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java) + +In the previous article, we +created [serializers](https://flink.apache.org/2023/04/14/howto-create-batch-source/#serializers) +for Split and SplitEnumeratorState. We should now test them in unit tests. As usual, to test serde +we just create an object, serialize it using the serializer and then deserialize it using the same +serializer and finally assert on the equality of the two objects. Thus, hascode() and equals() need +to be implemented for the serialized objects. + +### Other unit tests + +Of course, we also need to unit test low level processing such as query building for example or any +processing that does not require a running backend. + +## Integration testing the source + +[example Cassandra SourceITCase +](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java) + +For tests that require a running backend, Flink provides a JUnit5 source test framework. To use it +we create an *ITCase named class that +extends [SourceTestSuiteBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.html) +. This test suite provides all +the necessary tests already (single split, multiple splits, idle reader, etc...). It is targeted for +batch and streaming sources, so for our batch source case here, the tests below need to be disabled +as they are targeted for streaming sources. They can be disabled by overriding them in the ITCase +and annotating them with @Disabled: + +* testSourceMetrics +* testSavepoint +* testScaleUp +* testScaleDown +* testTaskManagerFailure + +Of course we can add our own integration tests cases for example tests on limits, tests on low level +splitting or any test that requires a running backend. But for most cases we only need to provide +Flink test environment classes to configure the ITCase: + +### Flink runtime environment + +We add this annotated field to our ITCase and we're done + +`@TestEnv +MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment(); +` + +### Backend runtime environment Review Comment: ok fair enough, in that case I'll also remove "runtime" from "Flink runtime environment" title for coherence. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhuzhurk commented on a diff in pull request #22506: [FLINK-31890][runtime] Introduce SchedulerBase per-task failure enrichment/labeling
zhuzhurk commented on code in PR #22506: URL: https://github.com/apache/flink/pull/22506#discussion_r1192155460 ## flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java: ## @@ -1289,4 +1293,59 @@ public static CompletableFuture switchExecutor( }, executor); } + +/** + * A serializable implementation of CompletableFuture. + * + * This class extends CompletableFuture and implements the Serializable interface to allow it + * to be serialized and deserialized. The result of the CompletableFuture is extracted and + * serialized when the object is written to a stream, and the result is set using the complete() + * method when the object is read from a stream. + * + * @param the type of the result of the CompletableFuture + */ +public static class SerializableCompletableFuture extends CompletableFuture +implements Serializable { +private static final long serialVersionUID = 1L; +private transient T result; + +public SerializableCompletableFuture(T value) { +this.result = value; +this.complete(value); +} + +/** + * Writes this object to the given OutputStream. The result of the CompletableFuture is + * extracted and serialized along with the object. + * + * @param out the ObjectOutputStream to write to + * @throws IOException if an I/O error occurs + */ +private void writeObject(ObjectOutputStream out) +throws IOException, ExecutionException, InterruptedException { +out.defaultWriteObject(); +if (result == null) { +result = this.get(); Review Comment: This `get()` is possible to block the main thread. Maybe we do not need to introduce a `SerializableCompletableFuture`, but instead modify the ErrorInfo to achieve the goal. e.g. * introduce two fields to ErrorInfo: `transient CompletableFuture> labelsFuture` as well as a `Map labels` * the `labels` will be set as soon as `labelsFuture` is completed * `ErrorInfo#getLabels()` returns a `Map`, which can be empty if `labels` is null and `labelsFuture` is not completed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32067) When no pod template configured, an invalid null pod template is configured
[ https://issues.apache.org/jira/browse/FLINK-32067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32067: --- Labels: pull-request-available (was: ) > When no pod template configured, an invalid null pod template is configured > > > Key: FLINK-32067 > URL: https://issues.apache.org/jira/browse/FLINK-32067 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.5.0 >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Blocker > Labels: pull-request-available > Fix For: kubernetes-operator-1.5.0 > > > https://issues.apache.org/jira/browse/FLINK-30609 introduced a bug in the > podtemplate logic that breaks deployments when no podtemplates are configured. > The basic example doesnt work anymore for example. The reason is that an > invalid null object is set as podtemplate when nothing is configured. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request, #592: [FLINK-32067] Do not configure invalid null podTemplate
gyfora opened a new pull request, #592: URL: https://github.com/apache/flink-kubernetes-operator/pull/592 ## What is the purpose of the change Recent changes broke the podTemplate configuration logic when no pod template or ephemeral storage is configured resulting in an invalid null object serialized and set. ## Brief change log - Fix invalid logic - Add unit test ## Verifying this change New unit test case has been added. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no - Core observer or reconciler logic that is regularly executed: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32065) Got NoSuchFileException when initialize source function.
[ https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722094#comment-17722094 ] Spongebob commented on FLINK-32065: --- Hi, [~Thesharing] , may be I could provide these logs for you. !image-2023-05-12-17-37-09-002.png! > Got NoSuchFileException when initialize source function. > > > Key: FLINK-32065 > URL: https://issues.apache.org/jira/browse/FLINK-32065 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.4 >Reporter: Spongebob >Priority: Major > Attachments: image-2023-05-12-14-07-45-771.png, > image-2023-05-12-14-26-46-268.png, image-2023-05-12-17-37-09-002.png > > > When I submit an application to flink standalone cluster, I got a > NoSuchFileException. I think it was failed to create the tmp channel file but > I am confused about the reason relative to this case. > I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so > is this diretory only working for that step of the application ? > BTW, this issue happen coincidently. > !image-2023-05-12-14-07-45-771.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32065) Got NoSuchFileException when initialize source function.
[ https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob updated FLINK-32065: -- Attachment: image-2023-05-12-17-37-09-002.png > Got NoSuchFileException when initialize source function. > > > Key: FLINK-32065 > URL: https://issues.apache.org/jira/browse/FLINK-32065 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.4 >Reporter: Spongebob >Priority: Major > Attachments: image-2023-05-12-14-07-45-771.png, > image-2023-05-12-14-26-46-268.png, image-2023-05-12-17-37-09-002.png > > > When I submit an application to flink standalone cluster, I got a > NoSuchFileException. I think it was failed to create the tmp channel file but > I am confused about the reason relative to this case. > I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so > is this diretory only working for that step of the application ? > BTW, this issue happen coincidently. > !image-2023-05-12-14-07-45-771.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32067) When no pod template configured, an invalid null pod template is configured
Gyula Fora created FLINK-32067: -- Summary: When no pod template configured, an invalid null pod template is configured Key: FLINK-32067 URL: https://issues.apache.org/jira/browse/FLINK-32067 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.5.0 Reporter: Gyula Fora Assignee: Gyula Fora Fix For: kubernetes-operator-1.5.0 https://issues.apache.org/jira/browse/FLINK-30609 introduced a bug in the podtemplate logic that breaks deployments when no podtemplates are configured. The basic example doesnt work anymore for example. The reason is that an invalid null object is set as podtemplate when nothing is configured. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-elasticsearch] complone commented on pull request #62: [FLINK-25568] Support Elasticsearch Source Connector
complone commented on PR #62: URL: https://github.com/apache/flink-connector-elasticsearch/pull/62#issuecomment-1545440640 > Support for FilterPushDown, ProjectPushDown, and LimitPushDown to optimize query performance. Have these functions been implemented? I don't seem to have seen this part of the code Hello, I am very happy to read your comments. Currently this PR belongs to draft status. For the three push-downs you mentioned. As shown in FLIP-127, I am going to implement related implementations of FilterPushDown, ProjectPushDown, and LimitPushDown and optimize es query expressions. Currently only the ProjectPushDown interface is implemented, please refer to Elasticsearch7DynamicSource#applyProjection -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32063) AWS CI mvn compile fails to cast objects to parent type.
[ https://issues.apache.org/jira/browse/FLINK-32063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722072#comment-17722072 ] Ahmed Hamdy commented on FLINK-32063: - So apparently the CI patches the changes to main before running tests. The issue was due to an un-rebased change from main. Even though this is an untraditional way for running CI this is not an issue. Please close as "not an issue". > AWS CI mvn compile fails to cast objects to parent type. > > > Key: FLINK-32063 > URL: https://issues.apache.org/jira/browse/FLINK-32063 > Project: Flink > Issue Type: Bug > Components: Connectors / AWS, Tests >Reporter: Ahmed Hamdy >Priority: Minor > Labels: test-stability > > h2. Description > AWS Connectors CI fails to cast {{TestSinkInitContext}} into base type > {{InitContext}}, > - Failure > https://github.com/apache/flink-connector-aws/actions/runs/4924790308/jobs/8841458606?pr=70 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32060) Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5
[ https://issues.apache.org/jira/browse/FLINK-32060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32060: --- Labels: pull-request-available (was: ) > Migrate subclasses of BatchAbstractTestBase in table and other modules to > JUnit5 > > > Key: FLINK-32060 > URL: https://issues.apache.org/jira/browse/FLINK-32060 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.18.0 > Environment: Migrate subclasses of BatchAbstractTestBase in table and > other modules to JUnit5. >Reporter: Yuxin Tan >Assignee: Weijie Guo >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-web] echauchot commented on a diff in pull request #643: Add blog article: Howto test a batch source with the new Source framework
echauchot commented on code in PR #643: URL: https://github.com/apache/flink-web/pull/643#discussion_r1192083594 ## docs/content/posts/howto-test-batch-source.md: ## @@ -0,0 +1,202 @@ +--- +title: "Howto test a batch source with the new Source framework" +date: "2023-04-14T08:00:00.000Z" +authors: +- echauchot: + name: "Etienne Chauchot" + twitter: "echauchot" + +--- + +## Introduction + +The Flink community has +designed [a new Source framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/) +based +on [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) +lately. This article is the +continuation of +the [howto create a batch source with the new Source framework article](https://flink.apache.org/2023/04/14/howto-create-batch-source/) +. Now it is +time to test the created source ! As the previous article, this one was built while implementing the +[Flink batch source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca) +for [Cassandra](https://cassandra.apache.org/_/index.html). + +## Unit testing the source + +### Testing the serializers + +[example Cassandra SplitSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java) +and [SplitEnumeratorStateSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java) + +In the previous article, we +created [serializers](https://flink.apache.org/2023/04/14/howto-create-batch-source/#serializers) +for Split and SplitEnumeratorState. We should now test them in unit tests. As usual, to test serde +we just create an object, serialize it using the serializer and then deserialize it using the same +serializer and finally assert on the equality of the two objects. Thus, hascode() and equals() need +to be implemented for the serialized objects. + +### Other unit tests + +Of course, we also need to unit test low level processing such as query building for example or any +processing that does not require a running backend. + +## Integration testing the source + +[example Cassandra SourceITCase +](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java) + +For tests that require a running backend, Flink provides a JUnit5 source test framework. To use it +we create an *ITCase named class that +extends [SourceTestSuiteBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.html) +. This test suite provides all +the necessary tests already (single split, multiple splits, idle reader, etc...). It is targeted for +batch and streaming sources, so for our batch source case here, the tests below need to be disabled +as they are targeted for streaming sources. They can be disabled by overriding them in the ITCase +and annotating them with @Disabled: + +* testSourceMetrics +* testSavepoint +* testScaleUp +* testScaleDown +* testTaskManagerFailure + +Of course we can add our own integration tests cases for example tests on limits, tests on low level +splitting or any test that requires a running backend. But for most cases we only need to provide +Flink test environment classes to configure the ITCase: + +### Flink runtime environment + +We add this annotated field to our ITCase and we're done + +`@TestEnv +MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment(); +` + +### Backend runtime environment Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa opened a new pull request, #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5
reswqa opened a new pull request, #22574: URL: https://github.com/apache/flink/pull/22574 ## What is the purpose of the change *Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5* ## Brief change log - *Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32065) Got NoSuchFileException when initialize source function.
[ https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722070#comment-17722070 ] Zhilong Hong commented on FLINK-32065: -- Hi, [~SpongebobZ] Would you please upload the full log of the TaskExecutor? It seems this issue happens during the initialization of BoundedBlockingPartitions. This folder is used by blocking shuffle. > Got NoSuchFileException when initialize source function. > > > Key: FLINK-32065 > URL: https://issues.apache.org/jira/browse/FLINK-32065 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.4 >Reporter: Spongebob >Priority: Major > Attachments: image-2023-05-12-14-07-45-771.png, > image-2023-05-12-14-26-46-268.png > > > When I submit an application to flink standalone cluster, I got a > NoSuchFileException. I think it was failed to create the tmp channel file but > I am confused about the reason relative to this case. > I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so > is this diretory only working for that step of the application ? > BTW, this issue happen coincidently. > !image-2023-05-12-14-07-45-771.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)