[GitHub] [flink-connector-opensearch] reswqa merged pull request #21: [hotfix] Workaround new violations message

2023-05-12 Thread via GitHub


reswqa merged PR #21:
URL: https://github.com/apache/flink-connector-opensearch/pull/21


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reswqa commented on pull request #21: [hotfix] Workaround new violations message

2023-05-12 Thread via GitHub


reswqa commented on PR #21:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/21#issuecomment-1546537951

   Thanks for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Mrart commented on a diff in pull request #21527: [FLINK-27925] [kubernetes]Performance optimization when watch tm pod and list pod.

2023-05-12 Thread via GitHub


Mrart commented on code in PR #21527:
URL: https://github.com/apache/flink/pull/21527#discussion_r1192918839


##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
 server.expect().get().withPath(path).andReturn(500, "Expected 
error").always();
 }
 
+protected void mockPodEventWithLabels(Map labels) {
+final Pod pod1 =
+new PodBuilder()
+.withNewMetadata()
+.withNamespace("test")
+.withName("tm_pod1")
+.withLabels(labels)
+.withResourceVersion("5668")
+.endMetadata()
+.build();
+// mock four kinds of events.
+server.expect()
+.withPath(
+
"/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2&resourceVersion=0&allowWatchBookmarks=true&watch=true")
+.andUpgradeToWebSocket()
+.open()
+.waitFor(1000)

Review Comment:
   I have test, It at most 1 second.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32072) Create and wire FileMergingSnapshotManager with TaskManagerServices

2023-05-12 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan updated FLINK-32072:

Fix Version/s: 1.18.0

> Create and wire FileMergingSnapshotManager with TaskManagerServices
> ---
>
> Key: FLINK-32072
> URL: https://issues.apache.org/jira/browse/FLINK-32072
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Zakelly Lan
>Assignee: Yanfei Lei
>Priority: Major
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32092) Integrate snapshot file-merging with existing IT cases

2023-05-12 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-32092:
---

 Summary: Integrate snapshot file-merging with existing IT cases
 Key: FLINK-32092
 URL: https://issues.apache.org/jira/browse/FLINK-32092
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Rui Xia






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32091) Add necessary metrics for file-merging

2023-05-12 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-32091:
---

 Summary: Add necessary metrics for file-merging
 Key: FLINK-32091
 URL: https://issues.apache.org/jira/browse/FLINK-32091
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Hangxiang Yu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32089) Do fast copy in best-effort during first checkpoint after restoration

2023-05-12 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-32089:
---

 Summary: Do fast copy in best-effort during first checkpoint after 
restoration
 Key: FLINK-32089
 URL: https://issues.apache.org/jira/browse/FLINK-32089
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Yanfei Lei






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32090) Python API for enabling and configuring file merging snapshot

2023-05-12 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-32090:
---

 Summary: Python API for enabling and configuring file merging 
snapshot
 Key: FLINK-32090
 URL: https://issues.apache.org/jira/browse/FLINK-32090
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Yanfei Lei






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32088) Re-uploading in state file-merging for space amplification control

2023-05-12 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-32088:
---

 Summary: Re-uploading in state file-merging for space 
amplification control
 Key: FLINK-32088
 URL: https://issues.apache.org/jira/browse/FLINK-32088
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Han Yin






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32087) Space amplification statistics of file merging

2023-05-12 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-32087:
---

 Summary: Space amplification statistics of file merging
 Key: FLINK-32087
 URL: https://issues.apache.org/jira/browse/FLINK-32087
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Rui Xia






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32084) Migrate current file merging of channel state into the file merging framework

2023-05-12 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-32084:
---

 Summary: Migrate current file merging of channel state into the 
file merging framework
 Key: FLINK-32084
 URL: https://issues.apache.org/jira/browse/FLINK-32084
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Yanfei Lei






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32086) Cleanup non-reported managed directory on exit of TM

2023-05-12 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-32086:
---

 Summary: Cleanup non-reported managed directory on exit of TM
 Key: FLINK-32086
 URL: https://issues.apache.org/jira/browse/FLINK-32086
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Zakelly Lan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32085) Implement and migrate batch uploading in changelog files into the file merging framework

2023-05-12 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-32085:
---

 Summary: Implement and migrate batch uploading in changelog files 
into the file merging framework
 Key: FLINK-32085
 URL: https://issues.apache.org/jira/browse/FLINK-32085
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Hangxiang Yu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32083) Chinese translation of documentation of checkpoint file-merging

2023-05-12 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-32083:
---

 Summary: Chinese translation of documentation of checkpoint 
file-merging
 Key: FLINK-32083
 URL: https://issues.apache.org/jira/browse/FLINK-32083
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Hangxiang Yu
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32082) Documentation of checkpoint file-merging

2023-05-12 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-32082:
---

 Summary: Documentation of checkpoint file-merging
 Key: FLINK-32082
 URL: https://issues.apache.org/jira/browse/FLINK-32082
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Yanfei Lei
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32081) Compatibility between file-merging on and off across job runs

2023-05-12 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-32081:
---

 Summary: Compatibility between file-merging on and off across job 
runs
 Key: FLINK-32081
 URL: https://issues.apache.org/jira/browse/FLINK-32081
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Han Yin
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32079) Read/write checkpoint metadata of merged files

2023-05-12 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-32079:
---

 Summary: Read/write checkpoint metadata of merged files
 Key: FLINK-32079
 URL: https://issues.apache.org/jira/browse/FLINK-32079
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Hangxiang Yu
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32080) Restoration of FileMergingSnapshotManager

2023-05-12 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-32080:
---

 Summary: Restoration of FileMergingSnapshotManager
 Key: FLINK-32080
 URL: https://issues.apache.org/jira/browse/FLINK-32080
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Zakelly Lan
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32076) Add file pool for concurrent file reusing

2023-05-12 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-32076:
---

 Summary: Add file pool for concurrent file reusing
 Key: FLINK-32076
 URL: https://issues.apache.org/jira/browse/FLINK-32076
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Rui Xia
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32077) Implement shared state file merging

2023-05-12 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-32077:
---

 Summary: Implement shared state file merging
 Key: FLINK-32077
 URL: https://issues.apache.org/jira/browse/FLINK-32077
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Zakelly Lan
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32078) Implement private state file merging

2023-05-12 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-32078:
---

 Summary: Implement private state file merging
 Key: FLINK-32078
 URL: https://issues.apache.org/jira/browse/FLINK-32078
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Yanfei Lei
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32075) Delete merged files on checkpoint abort

2023-05-12 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-32075:
---

 Summary: Delete merged files on checkpoint abort
 Key: FLINK-32075
 URL: https://issues.apache.org/jira/browse/FLINK-32075
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Zakelly Lan
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32074) Support file merging across checkpoints

2023-05-12 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-32074:
---

 Summary: Support file merging across checkpoints
 Key: FLINK-32074
 URL: https://issues.apache.org/jira/browse/FLINK-32074
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Han Yin
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32073) Implement file merging in snapshot

2023-05-12 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-32073:
---

 Summary: Implement file merging in snapshot
 Key: FLINK-32073
 URL: https://issues.apache.org/jira/browse/FLINK-32073
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Han Yin






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32072) Create and wire FileMergingSnapshotManager with TaskManagerServices

2023-05-12 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-32072:
---

 Summary: Create and wire FileMergingSnapshotManager with 
TaskManagerServices
 Key: FLINK-32072
 URL: https://issues.apache.org/jira/browse/FLINK-32072
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Yanfei Lei






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32071) Implement the snapshot manager for merged checkpoint files in TM

2023-05-12 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-32071:
---

 Summary: Implement the snapshot manager for merged checkpoint 
files in TM
 Key: FLINK-32071
 URL: https://issues.apache.org/jira/browse/FLINK-32071
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
Assignee: Zakelly Lan
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32070) FLIP-306 Unified File Merging Mechanism for Checkpoints

2023-05-12 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-32070:
---

 Summary: FLIP-306 Unified File Merging Mechanism for Checkpoints
 Key: FLINK-32070
 URL: https://issues.apache.org/jira/browse/FLINK-32070
 Project: Flink
  Issue Type: New Feature
Reporter: Zakelly Lan
Assignee: Zakelly Lan
 Fix For: 1.18.0


The FLIP: 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints]

 

The creation of multiple checkpoint files can lead to a 'file flood' problem, 
in which a large number of files are written to the checkpoint storage in a 
short amount of time. This can cause issues in large clusters with high 
workloads, such as the creation and deletion of many files increasing the 
amount of file meta modification on DFS, leading to single-machine hotspot 
issues for meta maintainers (e.g. NameNode in HDFS). Additionally, the 
performance of object storage (e.g. Amazon S3 and Alibaba OSS) can 
significantly decrease when listing objects, which is necessary for object name 
de-duplication before creating an object, further affecting the performance of 
directory manipulation in the file system's perspective of view (See 
[hadoop-aws module 
documentation|https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=an%20intermediate%20state.-,Warning%20%232%3A%20Directories%20are%20mimicked,-The%20S3A%20clients],
 section 'Warning #2: Directories are mimicked').

While many solutions have been proposed for individual types of state files 
(e.g. FLINK-11937 for keyed state (RocksDB) and FLINK-26803 for channel state), 
the file flood problems from each type of checkpoint file are similar and lack 
systematic view and solution. Therefore, the goal of this FLIP is to establish 
a unified file merging mechanism to address the file flood problem during 
checkpoint creation for all types of state files, including keyed, non-keyed, 
channel, and changelog state. This will significantly improve the system 
stability and availability of fault tolerance in Flink.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] xuzhiwen1255 commented on a diff in pull request #21971: [FLINK-31084][connectors/dataGen] Add default value for dataGen seque…

2023-05-12 Thread via GitHub


xuzhiwen1255 commented on code in PR #21971:
URL: https://github.com/apache/flink/pull/21971#discussion_r1192908722


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##
@@ -111,11 +120,20 @@ public boolean hasNext() {
 return !this.valuesToEmit.isEmpty();
 }
 
-private static int safeDivide(long left, long right) {
-Preconditions.checkArgument(right > 0);
-Preconditions.checkArgument(left >= 0);
-Preconditions.checkArgument(left <= Integer.MAX_VALUE * right);
-return (int) (left / right);
+private static long safeDivide(long totalRows, long stepSize) {
+Preconditions.checkArgument(stepSize > 0, "cannot be equal to 0");
+Preconditions.checkArgument(totalRows >= 0, "Cannot be less than 0");
+return totalRows / stepSize;
+}
+
+@VisibleForTesting
+public long getStart() {
+return start;
+}
+
+@VisibleForTesting
+public long getEnd() {
+return end;

Review Comment:
   Sorry, I may not understand it a bit. Does adding the numberOfElementsLimit 
parameter have any meaning for us to test the actual sequence generation?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xuzhiwen1255 commented on a diff in pull request #22010: [FLINK-31192][connectors/dataGen] Fix dataGen takes too long to initi…

2023-05-12 Thread via GitHub


xuzhiwen1255 commented on code in PR #22010:
URL: https://github.com/apache/flink/pull/22010#discussion_r1192906502


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##
@@ -65,33 +72,46 @@ public void open(
 this.checkpointedState == null,
 "The " + getClass().getSimpleName() + " has already been 
initialized.");
 
-this.checkpointedState =
-context.getOperatorStateStore()
-.getListState(
-new ListStateDescriptor<>(
-name + "-sequence-state", 
LongSerializer.INSTANCE));
-this.valuesToEmit = new ArrayDeque<>();
-if (context.isRestored()) {
-// upon restoring
+ListStateDescriptor stateDescriptor =
+new ListStateDescriptor<>(
+name + "-sequence-state", 
TypeInformation.of(InternalState.class));
+this.checkpointedState = 
context.getOperatorStateStore().getListState(stateDescriptor);
+this.internalStates = Lists.newArrayList();
 
-for (Long v : this.checkpointedState.get()) {
-this.valuesToEmit.add(v);
-}
+totalNoOfElements = Math.abs(end - start + 1);
+if (context.isRestored()) {
+checkpointedState.get().forEach(state -> 
internalStates.add(state));
 } else {
 // the first time the job is executed
-final int stepSize = runtimeContext.getNumberOfParallelSubtasks();
 final int taskIdx = runtimeContext.getIndexOfThisSubtask();
-final long congruence = start + taskIdx;
+final long stepSize = runtimeContext.getNumberOfParallelSubtasks();
+internalStates.add(new InternalState(0, taskIdx, stepSize));
+}
+}
 
-long totalNoOfElements = Math.abs(end - start + 1);
-final int baseSize = safeDivide(totalNoOfElements, stepSize);
-final int toCollect =
-(totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : 
baseSize;
+private long toCollect(long baseSize, long stepSize, int taskIdx) {
+return (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : 
baseSize;
+}
 
-for (long collected = 0; collected < toCollect; collected++) {
-this.valuesToEmit.add(collected * stepSize + congruence);
+public Long nextValue() {
+Iterator iterator = internalStates.iterator();
+if (iterator.hasNext()) {
+InternalState state = iterator.next();
+long nextSequence = state.collected * state.stepSize + (start + 
state.taskId);
+state.collected++;

Review Comment:
   Let me rethink, perhaps it is more reasonable to change this way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xuzhiwen1255 commented on a diff in pull request #22010: [FLINK-31192][connectors/dataGen] Fix dataGen takes too long to initi…

2023-05-12 Thread via GitHub


xuzhiwen1255 commented on code in PR #22010:
URL: https://github.com/apache/flink/pull/22010#discussion_r1192906438


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##
@@ -65,33 +72,46 @@ public void open(
 this.checkpointedState == null,
 "The " + getClass().getSimpleName() + " has already been 
initialized.");
 
-this.checkpointedState =
-context.getOperatorStateStore()
-.getListState(
-new ListStateDescriptor<>(
-name + "-sequence-state", 
LongSerializer.INSTANCE));
-this.valuesToEmit = new ArrayDeque<>();
-if (context.isRestored()) {
-// upon restoring
+ListStateDescriptor stateDescriptor =
+new ListStateDescriptor<>(
+name + "-sequence-state", 
TypeInformation.of(InternalState.class));
+this.checkpointedState = 
context.getOperatorStateStore().getListState(stateDescriptor);
+this.internalStates = Lists.newArrayList();
 
-for (Long v : this.checkpointedState.get()) {
-this.valuesToEmit.add(v);
-}
+totalNoOfElements = Math.abs(end - start + 1);
+if (context.isRestored()) {
+checkpointedState.get().forEach(state -> 
internalStates.add(state));
 } else {
 // the first time the job is executed
-final int stepSize = runtimeContext.getNumberOfParallelSubtasks();
 final int taskIdx = runtimeContext.getIndexOfThisSubtask();
-final long congruence = start + taskIdx;
+final long stepSize = runtimeContext.getNumberOfParallelSubtasks();
+internalStates.add(new InternalState(0, taskIdx, stepSize));
+}
+}
 
-long totalNoOfElements = Math.abs(end - start + 1);
-final int baseSize = safeDivide(totalNoOfElements, stepSize);
-final int toCollect =
-(totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : 
baseSize;
+private long toCollect(long baseSize, long stepSize, int taskIdx) {
+return (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : 
baseSize;
+}
 
-for (long collected = 0; collected < toCollect; collected++) {
-this.valuesToEmit.add(collected * stepSize + congruence);
+public Long nextValue() {
+Iterator iterator = internalStates.iterator();
+if (iterator.hasNext()) {
+InternalState state = iterator.next();
+long nextSequence = state.collected * state.stepSize + (start + 
state.taskId);
+state.collected++;
+// All sequence values are cleared from the stateList after they 
have been sent
+if (state.collected
+>= toCollect(
+safeDivide(totalNoOfElements, state.stepSize),
+state.stepSize,
+state.taskId)) {
+iterator.remove();
 }
+return nextSequence;
 }
+
+// Before calling this method, you should call hasNext to check
+throw new IllegalStateException();

Review Comment:
   You're quite right. I'll change it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-05-12 Thread via GitHub


dannycranmer commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1192703805


##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssigner.java:
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/** An implementation of the {@link KinesisShardAssigner} that assigns splits 
uniformly. */
+@Internal
+public class UniformShardAssigner implements KinesisShardAssigner {
+@Override
+public int assign(KinesisShardSplit split, Context context) {
+int selectedSubtask = -1;
+int curMinAssignment = Integer.MAX_VALUE;
+Map> splitAssignment = 
context.getCurrentSplitAssignment();
+
+for (int subtaskId : context.getRegisteredReaders().keySet()) {
+int subtaskAssignmentSize =
+splitAssignment.getOrDefault(subtaskId, 
Collections.emptySet()).size();
+if (subtaskAssignmentSize < curMinAssignment) {
+curMinAssignment = subtaskAssignmentSize;
+selectedSubtask = subtaskId;
+}
+}
+
+Preconditions.checkArgument(
+selectedSubtask != -1,
+"Expected at least one registered reader. Unable to assign 
split.");
+return selectedSubtask;

Review Comment:
   I ran this locally with 2 parallelism and 2 shards. Both shards were 
assigned to subTask 0 because `getCurrentSplitAssignment` was not updated in 
time, race condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] youngminz commented on a diff in pull request #615: Fix typo

2023-05-12 Thread via GitHub


youngminz commented on code in PR #615:
URL: https://github.com/apache/flink-web/pull/615#discussion_r1192674124


##
docs/content/posts/2023-02-27-release-kubernetes-operator-1.4.0.md:
##
@@ -84,7 +84,7 @@ We rely on users to share feedback so we can improve and make 
this a very robust
 
 Until now the operator only integrated with the Flink Kubernetes HA mechanism 
for last-state and other types of application upgrades. 1.4.0 adds support for 
the Zookeeper HA storage as well.
 
-While Zookeper is a slightly older solution, many users are still using it for 
HA metadata even in the Kubernetes world.
+While Zookeeper is a slightly older solution, many users are still using it 
for HA metadata even in the Kubernetes world.

Review Comment:
   I found all of ZooKeeper in this document and replaced it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29533) Add proper table style to Flink website

2023-05-12 Thread Sakshi Sharma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722260#comment-17722260
 ] 

Sakshi Sharma commented on FLINK-29533:
---

Can you please guide me on where the issue is? The styling looks as expected to 
me. Is there something I am missing?
!image-2023-05-12-14-14-58-020.png|width=736,height=262!

> Add proper table style to Flink website
> ---
>
> Key: FLINK-29533
> URL: https://issues.apache.org/jira/browse/FLINK-29533
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Matthias Pohl
>Assignee: Sakshi Sharma
>Priority: Major
>  Labels: starter
> Attachments: Screenshot from 2022-10-07 08-23-01.png, 
> image-2023-05-12-14-14-58-020.png
>
>
> Tables can be created using simple markdown syntax. But the corresponding 
> rendered table lacks proper styling:  !Screenshot from 2022-10-07 
> 08-23-01.png!
> Several blog post work around that by adding a custom style:
>  * [Apache Flink Kubernetes Operator 1.0.0 Release 
> Announcement|https://flink.apache.org/news/2022/06/05/release-kubernetes-operator-1.0.0.html]
>  * [Improving speed and stability of checkpointing with generic log-based 
> incremental 
> checkpoints|https://flink.apache.org/2022/05/30/changelog-state-backend.html]
> What about coming up with a common style that doesn't require people to come 
> up with their own custom style per post.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation

2023-05-12 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722259#comment-17722259
 ] 

Martijn Visser commented on FLINK-32069:


[~mapohl] Any thoughts?

> jobClient.getJobStatus() can return status RUNNING for finished insert 
> operation
> 
>
> Key: FLINK-32069
> URL: https://issues.apache.org/jira/browse/FLINK-32069
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1, 1.15.4
>Reporter: Aleksandr Iushmanov
>Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue 
> leading to failed expectations for SQL insert operations. 
>  
> Below is an example of zeppelin code that is failing because 
> jobClient.getJobStatus() returns running even after job has finished. I have 
> verified that same failover can happen if I use 
> jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
> execution finished" but job status is not consistently finished)
> {code:java}
> TableResult tableResult = ((TableEnvironmentInternal) 
> tbenv).executeInternal(operations);
> checkState(tableResult.getJobClient().isPresent());
> try {
>   tableResult.await();
>   JobClient jobClient = tableResult.getJobClient().get();
>   if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
> context.out.write("Insertion successfully.\n");
>   } else {
> throw new IOException("Job is failed, " + 
> jobClient.getJobExecutionResult().get().toString());
>   }
> } catch (InterruptedException e) {
>   throw new IOException("Flink job is interrupted", e);
> } catch (ExecutionException e) {
>   throw new IOException("Flink job is failed", e);
> } {code}
>  ZeppelinCode: 
> [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
> I suspect that job status is returned based on runningJobsRegistry and since 
> 1.15 this registry is not updated with FINISHED status prior to job result 
> future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} 
> [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
>  
>  
> It looks like as race condition that is hard to reproduce on lightweight 
> setup. I was reproducing this running zeppelin notebook with remote flink 
> cluster and triggering SQL insert operation. If I find a smaller setup to 
> reproduce on small local cluster with lightweight client, I will update this 
> ticket when I have more input. I am open to suggestions on how to fix this. 
>  
> For Zeppelin I have a separate ticket because Flink 1.15 is not going to be 
> fixed but this issue if I understand it correctly should be common for all 
> versions starting 1.15, therefore it makes sense to address this starting 
> 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909
>  
> [~mapohl], Thank you for assistance in slack, I have created this ticket to 
> back our  conversation, could you please add your thoughts on this failure 
> mode?
>  
> One possible solution would be to have additional check for presence of 
> JobResult in Result store before returning jobStatus (if there is a result, 
> job shouldn't be reported as running based on this documentation: 
> [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29533) Add proper table style to Flink website

2023-05-12 Thread Sakshi Sharma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sakshi Sharma updated FLINK-29533:
--
Attachment: image-2023-05-12-14-14-58-020.png

> Add proper table style to Flink website
> ---
>
> Key: FLINK-29533
> URL: https://issues.apache.org/jira/browse/FLINK-29533
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Matthias Pohl
>Assignee: Sakshi Sharma
>Priority: Major
>  Labels: starter
> Attachments: Screenshot from 2022-10-07 08-23-01.png, 
> image-2023-05-12-14-14-58-020.png
>
>
> Tables can be created using simple markdown syntax. But the corresponding 
> rendered table lacks proper styling:  !Screenshot from 2022-10-07 
> 08-23-01.png!
> Several blog post work around that by adding a custom style:
>  * [Apache Flink Kubernetes Operator 1.0.0 Release 
> Announcement|https://flink.apache.org/news/2022/06/05/release-kubernetes-operator-1.0.0.html]
>  * [Improving speed and stability of checkpointing with generic log-based 
> incremental 
> checkpoints|https://flink.apache.org/2022/05/30/changelog-state-backend.html]
> What about coming up with a common style that doesn't require people to come 
> up with their own custom style per post.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-05-12 Thread via GitHub


dannycranmer commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1192670238


##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/SourceConfigUtil.java:
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.config;
+
+import org.apache.flink.annotation.Internal;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Properties;
+
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.STREAM_INITIAL_TIMESTAMP;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT;
+
+/** Utility functions to use with {@link SourceConfigConstants}. */
+@Internal
+public class SourceConfigUtil {

Review Comment:
   nit: The name is pretty generic, can we rename to 
`KinesisStreamsSourceConfigUtil`. i realise this is implicit by the package, 
but my preference is to make the class name reflect the responsibility. Same 
for Constants



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-05-12 Thread via GitHub


dannycranmer commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1192668358


##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java:
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumerator;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorState;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorStateSerializer;
+import org.apache.flink.connector.kinesis.source.proxy.KinesisStreamProxy;
+import 
org.apache.flink.connector.kinesis.source.reader.KinesisStreamsRecordEmitter;
+import 
org.apache.flink.connector.kinesis.source.reader.KinesisStreamsSourceReader;
+import 
org.apache.flink.connector.kinesis.source.reader.PollingKinesisShardSplitReader;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import 
org.apache.flink.connector.kinesis.source.split.KinesisShardSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.utils.AttributeMap;
+
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * The {@link KinesisStreamsSource} is an exactly-once parallel streaming data 
source that
+ * subscribes to a single AWS Kinesis data stream. It is able to handle 
resharding of streams, and
+ * stores its current progress in Flink checkpoints. The source will read in 
data from the Kinesis
+ * Data stream, deserialize it using the provided {@link 
DeserializationSchema}, and emit the record
+ * into the Flink job graph.
+ *
+ * Exactly-once semantics. To leverage Flink's checkpointing mechanics for 
exactly-once stream
+ * processing, the Kinesis Source is implemented with the AWS Java SDK, 
instead of the officially
+ * recommended AWS Kinesis Client Library. The source will store its current 
progress in Flink
+ * checkpoint/savepoint, and will pick up from where it left off upon restore 
from the
+ * checkpoint/savepoint.
+ *
+ * Initial starting points. The Kinesis Streams Source supports reads 
starting from TRIM_HORIZON,
+ * LATEST, and AT_TIMESTAMP.
+ *
+ * @param  the data type emitted by the source
+ */
+@Experimental
+public class KinesisStreamsSource

Review Comment:
   Let's add a builder like the sink has



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-05-12 Thread via GitHub


dannycranmer commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1175101246


##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/SourceConfigConstants.java:
##
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import 
org.apache.flink.connector.kinesis.source.reader.PollingKinesisShardSplitReader;
+
+import java.time.Duration;
+
+@PublicEvolving
+public class SourceConfigConstants extends AWSConfigConstants {

Review Comment:
   Why do you need to extend `AWSConfigConstants` here? We did this before to 
retain backwards compatibility, but since this is a new Source we might not 
need to do this



##
flink-connector-aws-kinesis-streams/pom.xml:
##
@@ -80,6 +80,12 @@ under the License.
 test-jar
 test
 

Review Comment:
   Can we rename this module, it says Sink v2 currently. See the `` above



##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java:
##
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory;
+import 
org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
+import org.apache.flink.connector.kinesis.source.model.CompletedShardsEvent;
+import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigUtil.parseStreamTimestampStartingPosition;
+
+/**
+ * Th

[GitHub] [flink-connector-aws] reswqa commented on pull request #73: [FLINK-32024][docs] Short code related to externalized connector retrieve version from its own data yaml

2023-05-12 Thread via GitHub


reswqa commented on PR #73:
URL: 
https://github.com/apache/flink-connector-aws/pull/73#issuecomment-1546060123

   Thanks for the review! I will merge this after `FLINK-32024` merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31810) RocksDBException: Bad table magic number on checkpoint rescale

2023-05-12 Thread David Artiga (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722245#comment-17722245
 ] 

David Artiga commented on FLINK-31810:
--

Something quite similar just happened to us (using {{1.15.4}}):
{code}
20230512T182327.440+0200 ERROR Caught unexpected exception. 
java.io.IOException: Error while opening RocksDB instance.
at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:92)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.loadDb(RocksDBHandle.java:134)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.openDB(RocksDBHandle.java:124)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:240)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:219)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:186)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:166)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:325)
at 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:494)
at 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:101)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:265)
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:703)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:679)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:646)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.rocksdb.RocksDBException: Bad table magic number: expected 
9863518390377041911, found 370301222607884801 in 
/mnt/1/hadoop/yarn/nm-local-dir/usercache/splat/appcache/application_1677581391115_0019/tm_container_e02_1677581391115_0019_01_06/tmp/job_06041d3a256afa3e088b474eb626985e_op_RBEAStreamOperator_707eed42a9b74f065cc8bb6798b04782__64_180__uuid_dfc34320-a336-42dc-b4f4-d48ccdfe2c09/db/045922.sst
at org.rocksdb.RocksDB.open(Native Method)
at org.rocksdb.RocksDB.open(RocksDB.java:306)
at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:80)
... 25 common frames omitted
 
[org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build()
 @ 415]
{code}

> RocksDBException: Bad table magic number on checkpoint rescale
> --
>
> Key: FLINK-31810
> URL: https://issues.apache.org/jira/browse/FLINK-31810
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.15.2
>Reporter: Robert Metzger
>Priority: Major
>
> While rescaling a job from checkpoint, I ran into this exception:
> {code:java}
> SinkMaterializer[7] -> rob-result[7]: Writer -> rob-result[7]: Committer 
> (4/4)#3 (c1b

[jira] [Resolved] (FLINK-32066) Flink CI service on Azure stops responding to pull requests

2023-05-12 Thread Jing Ge (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jing Ge resolved FLINK-32066.
-
Fix Version/s: 1.18.0
   Resolution: Fixed

> Flink CI service on Azure stops responding to pull requests
> ---
>
> Key: FLINK-32066
> URL: https://issues.apache.org/jira/browse/FLINK-32066
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines
>Affects Versions: 1.18.0
>Reporter: Wencong Liu
>Assignee: Jing Ge
>Priority: Blocker
> Fix For: 1.18.0
>
> Attachments: 20230512152023.jpg
>
>
> As of the time when this issue was created, Flink's CI service on Azure could 
> no longer be triggered by new pull requests.
> !20230512152023.jpg!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] JTaky commented on pull request #22394: [FLINK-31780][component=Runtime/Coordination] Allow users to disable 'Ensemble tracking' feature for ZooKeeper Curator framework

2023-05-12 Thread via GitHub


JTaky commented on PR #22394:
URL: https://github.com/apache/flink/pull/22394#issuecomment-1546005491

   Thanks a lot for the review! 
   Sorry but I think I cannot merge this PR.
   
   https://github.com/apache/flink/assets/1190166/b189cc24-b9ea-4b35-be12-c2b4a815d045";>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32066) Flink CI service on Azure stops responding to pull requests

2023-05-12 Thread Jing Ge (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1776#comment-1776
 ] 

Jing Ge commented on FLINK-32066:
-

should work now, please check and share the feedback for confirmation

> Flink CI service on Azure stops responding to pull requests
> ---
>
> Key: FLINK-32066
> URL: https://issues.apache.org/jira/browse/FLINK-32066
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines
>Affects Versions: 1.18.0
>Reporter: Wencong Liu
>Assignee: Jing Ge
>Priority: Blocker
> Attachments: 20230512152023.jpg
>
>
> As of the time when this issue was created, Flink's CI service on Azure could 
> no longer be triggered by new pull requests.
> !20230512152023.jpg!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-aws] usamj commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-05-12 Thread via GitHub


usamj commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1192556373


##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java:
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumerator;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorState;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorStateSerializer;
+import org.apache.flink.connector.kinesis.source.proxy.KinesisStreamProxy;
+import 
org.apache.flink.connector.kinesis.source.reader.KinesisStreamsRecordEmitter;
+import 
org.apache.flink.connector.kinesis.source.reader.KinesisStreamsSourceReader;
+import 
org.apache.flink.connector.kinesis.source.reader.PollingKinesisShardSplitReader;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import 
org.apache.flink.connector.kinesis.source.split.KinesisShardSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.utils.AttributeMap;
+
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * The {@link KinesisStreamsSource} is an exactly-once parallel streaming data 
source that
+ * subscribes to a single AWS Kinesis data stream. It is able to handle 
resharding of streams, and
+ * stores its current progress in Flink checkpoints. The source will read in 
data from the Kinesis
+ * Data stream, deserialize it using the provided {@link 
DeserializationSchema}, and emit the record
+ * into the Flink job graph.
+ *
+ * Exactly-once semantics. To leverage Flink's checkpointing mechanics for 
exactly-once stream
+ * processing, the Kinesis Source is implemented with the AWS Java SDK, 
instead of the officially
+ * recommended AWS Kinesis Client Library. The source will store its current 
progress in Flink
+ * checkpoint/savepoint, and will pick up from where it left off upon restore 
from the
+ * checkpoint/savepoint.
+ *
+ * Initial starting points. The Kinesis Streams Source supports reads 
starting from TRIM_HORIZON,
+ * LATEST, and AT_TIMESTAMP.

Review Comment:
   Nit: "LATEST, **or** AT_TIMESTAMP"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22575: Updating docs on how to upgrade kafka connectors

2023-05-12 Thread via GitHub


flinkbot commented on PR #22575:
URL: https://github.com/apache/flink/pull/22575#issuecomment-1545956373

   
   ## CI report:
   
   * d0b8d571c4900d50f52744e3d622c476428bb5bf UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5

2023-05-12 Thread via GitHub


flinkbot commented on PR #22574:
URL: https://github.com/apache/flink/pull/22574#issuecomment-1545956251

   
   ## CI report:
   
   * c7e02124129c26fbabb41f24e6f89d2dc848b510 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22573: [FLINK-32058][tests] Migrate the subclasses of BatchAbstractTestBase in runtime.batch.sql to JUnit5

2023-05-12 Thread via GitHub


flinkbot commented on PR #22573:
URL: https://github.com/apache/flink/pull/22573#issuecomment-1545956110

   
   ## CI report:
   
   * dd980a1bfd99b12404368abd94d93f299ee72d74 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22572: [FLINK-29618] Remove Timeout annotation in YARNSessionFIFOSecuredITCase

2023-05-12 Thread via GitHub


flinkbot commented on PR #22572:
URL: https://github.com/apache/flink/pull/22572#issuecomment-1545955981

   
   ## CI report:
   
   * 3e82df924333d6f676f20a50c068bb35fa7c3076 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32066) Flink CI service on Azure stops responding to pull requests

2023-05-12 Thread Jing Ge (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722212#comment-17722212
 ] 

Jing Ge commented on FLINK-32066:
-

working on it

> Flink CI service on Azure stops responding to pull requests
> ---
>
> Key: FLINK-32066
> URL: https://issues.apache.org/jira/browse/FLINK-32066
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines
>Affects Versions: 1.18.0
>Reporter: Wencong Liu
>Priority: Blocker
> Attachments: 20230512152023.jpg
>
>
> As of the time when this issue was created, Flink's CI service on Azure could 
> no longer be triggered by new pull requests.
> !20230512152023.jpg!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32066) Flink CI service on Azure stops responding to pull requests

2023-05-12 Thread Jing Ge (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jing Ge reassigned FLINK-32066:
---

Assignee: Jing Ge

> Flink CI service on Azure stops responding to pull requests
> ---
>
> Key: FLINK-32066
> URL: https://issues.apache.org/jira/browse/FLINK-32066
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines
>Affects Versions: 1.18.0
>Reporter: Wencong Liu
>Assignee: Jing Ge
>Priority: Blocker
> Attachments: 20230512152023.jpg
>
>
> As of the time when this issue was created, Flink's CI service on Azure could 
> no longer be triggered by new pull requests.
> !20230512152023.jpg!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-web] RyanSkraba commented on a diff in pull request #615: Fix typo

2023-05-12 Thread via GitHub


RyanSkraba commented on code in PR #615:
URL: https://github.com/apache/flink-web/pull/615#discussion_r1192516634


##
docs/content/posts/2023-02-27-release-kubernetes-operator-1.4.0.md:
##
@@ -84,7 +84,7 @@ We rely on users to share feedback so we can improve and make 
this a very robust
 
 Until now the operator only integrated with the Flink Kubernetes HA mechanism 
for last-state and other types of application upgrades. 1.4.0 adds support for 
the Zookeeper HA storage as well.
 
-While Zookeper is a slightly older solution, many users are still using it for 
HA metadata even in the Kubernetes world.
+While Zookeeper is a slightly older solution, many users are still using it 
for HA metadata even in the Kubernetes world.

Review Comment:
   ```suggestion
   While ZooKeeper is a slightly older solution, many users are still using it 
for HA metadata even in the Kubernetes world.
   ```
   
   This looks like an obvious typo, but the article could be improved by using 
the correct title case for ZooKeeper everywhere (there is an internal capital 
K).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] RyanSkraba commented on a diff in pull request #22010: [FLINK-31192][connectors/dataGen] Fix dataGen takes too long to initi…

2023-05-12 Thread via GitHub


RyanSkraba commented on code in PR #22010:
URL: https://github.com/apache/flink/pull/22010#discussion_r1192511163


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##
@@ -65,33 +72,46 @@ public void open(
 this.checkpointedState == null,
 "The " + getClass().getSimpleName() + " has already been 
initialized.");
 
-this.checkpointedState =
-context.getOperatorStateStore()
-.getListState(
-new ListStateDescriptor<>(
-name + "-sequence-state", 
LongSerializer.INSTANCE));
-this.valuesToEmit = new ArrayDeque<>();
-if (context.isRestored()) {
-// upon restoring
+ListStateDescriptor stateDescriptor =
+new ListStateDescriptor<>(
+name + "-sequence-state", 
TypeInformation.of(InternalState.class));
+this.checkpointedState = 
context.getOperatorStateStore().getListState(stateDescriptor);
+this.internalStates = Lists.newArrayList();
 
-for (Long v : this.checkpointedState.get()) {
-this.valuesToEmit.add(v);
-}
+totalNoOfElements = Math.abs(end - start + 1);
+if (context.isRestored()) {
+checkpointedState.get().forEach(state -> 
internalStates.add(state));
 } else {
 // the first time the job is executed
-final int stepSize = runtimeContext.getNumberOfParallelSubtasks();
 final int taskIdx = runtimeContext.getIndexOfThisSubtask();
-final long congruence = start + taskIdx;
+final long stepSize = runtimeContext.getNumberOfParallelSubtasks();
+internalStates.add(new InternalState(0, taskIdx, stepSize));
+}
+}
 
-long totalNoOfElements = Math.abs(end - start + 1);
-final int baseSize = safeDivide(totalNoOfElements, stepSize);
-final int toCollect =
-(totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : 
baseSize;
+private long toCollect(long baseSize, long stepSize, int taskIdx) {
+return (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : 
baseSize;
+}
 
-for (long collected = 0; collected < toCollect; collected++) {
-this.valuesToEmit.add(collected * stepSize + congruence);
+public Long nextValue() {
+Iterator iterator = internalStates.iterator();
+if (iterator.hasNext()) {
+InternalState state = iterator.next();
+long nextSequence = state.collected * state.stepSize + (start + 
state.taskId);
+state.collected++;
+// All sequence values are cleared from the stateList after they 
have been sent
+if (state.collected
+>= toCollect(
+safeDivide(totalNoOfElements, state.stepSize),
+state.stepSize,
+state.taskId)) {
+iterator.remove();
 }
+return nextSequence;
 }
+
+// Before calling this method, you should call hasNext to check
+throw new IllegalStateException();

Review Comment:
   ```suggestion
   throw new IllegalStateException("SequenceGenerator.nextValue() was 
called with no remaining values.");
   ```
   This code should be unreachable, so it's not a big deal -- but just in case, 
a helpful message is a good practice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation

2023-05-12 Thread Aleksandr Iushmanov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aleksandr Iushmanov updated FLINK-32069:

Description: 
Using zeppelin with remote cluster I came across some race condition issue 
leading to failed expectations for SQL insert operations. 
 
Below is an example of zeppelin code that is failing because 
jobClient.getJobStatus() returns running even after job has finished. I have 
verified that same failover can happen if I use 
jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
execution finished" but job status is not consistently finished)
{code:java}
TableResult tableResult = ((TableEnvironmentInternal) 
tbenv).executeInternal(operations);
checkState(tableResult.getJobClient().isPresent());
try {
  tableResult.await();
  JobClient jobClient = tableResult.getJobClient().get();
  if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
context.out.write("Insertion successfully.\n");
  } else {
throw new IOException("Job is failed, " + 
jobClient.getJobExecutionResult().get().toString());
  }
} catch (InterruptedException e) {
  throw new IOException("Flink job is interrupted", e);
} catch (ExecutionException e) {
  throw new IOException("Flink job is failed", e);
} {code}
 
[ 
https://github.com/apache/flink/pull/18189|https://github.com/apache/flink/pull/18189].
ZeppelinCode: 
[https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]


 I suspect that job status is returned based on runningJobsRegistry and since 
1.15 this registry is not updated with FINISHED status prior to job result 
future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} 
[https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
 
 
It looks like as race condition that is hard to reproduce on lightweight setup. 
I was reproducing this running zeppelin notebook with remote flink cluster and 
triggering SQL insert operation. If I find a smaller setup to reproduce on 
small local cluster with lightweight client, I will update this ticket when I 
have more input. I am open to suggestions on how to fix this. 
 
For Zeppelin I have a separate ticket because Flink 1.15 is not going to be 
fixed but this issue if I understand it correctly should be common for all 
versions starting 1.15, therefore it makes sense to address this starting 1.16. 
https://issues.apache.org/jira/browse/ZEPPELIN-5909
 
[~mapohl], Thank you for assistance in slack, I have created this ticket to 
back our  conversation, could you please add your thoughts on this failure mode?
 
One possible solution would be to have additional check for presence of 
JobResult in Result store before returning jobStatus (if there is a result, job 
shouldn't be reported as running based on this documentation: 
[https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--])

  was:
Using zeppelin with remote cluster I came across some race condition issue 
leading to failed expectations for SQL insert operations. 
 
Below is an example of zeppelin code that is failing because 
jobClient.getJobStatus() returns running even after job has finished. I have 
verified that same failover can happen if I use 
jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
execution finished" but job status is not consistently finished)
{code:java}
TableResult tableResult = ((TableEnvironmentInternal) 
tbenv).executeInternal(operations);
checkState(tableResult.getJobClient().isPresent());
try {
  tableResult.await();
  JobClient jobClient = tableResult.getJobClient().get();
  if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
context.out.write("Insertion successfully.\n");
  } else {
throw new IOException("Job is failed, " + 
jobClient.getJobExecutionResult().get().toString());
  }
} catch (InterruptedException e) {
  throw new IOException("Flink job is interrupted", e);
} catch (ExecutionException e) {
  throw new IOException("Flink job is failed", e);
} {code}
 
[ 
https://github.com/apache/flink/pull/18189|https://github.com/apache/flink/pull/18189].
ZeppelinCode: 
[https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
TableResult tableResult = ((TableEnvironmentInternal) 
tbenv).executeInternal(operations);
checkState(tableResult.getJobClient().isPresent());
try \{
  tableResult.await();
  JobClient jobClient = tableResult.getJobClient().get();
  if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
context.out.write("Insertion successfully.\n");

[jira] [Updated] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation

2023-05-12 Thread Aleksandr Iushmanov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aleksandr Iushmanov updated FLINK-32069:

Description: 
Using zeppelin with remote cluster I came across some race condition issue 
leading to failed expectations for SQL insert operations. 
 
Below is an example of zeppelin code that is failing because 
jobClient.getJobStatus() returns running even after job has finished. I have 
verified that same failover can happen if I use 
jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
execution finished" but job status is not consistently finished)
{code:java}
TableResult tableResult = ((TableEnvironmentInternal) 
tbenv).executeInternal(operations);
checkState(tableResult.getJobClient().isPresent());
try {
  tableResult.await();
  JobClient jobClient = tableResult.getJobClient().get();
  if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
context.out.write("Insertion successfully.\n");
  } else {
throw new IOException("Job is failed, " + 
jobClient.getJobExecutionResult().get().toString());
  }
} catch (InterruptedException e) {
  throw new IOException("Flink job is interrupted", e);
} catch (ExecutionException e) {
  throw new IOException("Flink job is failed", e);
} {code}
 ZeppelinCode: 
[https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]

I suspect that job status is returned based on runningJobsRegistry and since 
1.15 this registry is not updated with FINISHED status prior to job result 
future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} 
[https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
 
 
It looks like as race condition that is hard to reproduce on lightweight setup. 
I was reproducing this running zeppelin notebook with remote flink cluster and 
triggering SQL insert operation. If I find a smaller setup to reproduce on 
small local cluster with lightweight client, I will update this ticket when I 
have more input. I am open to suggestions on how to fix this. 
 
For Zeppelin I have a separate ticket because Flink 1.15 is not going to be 
fixed but this issue if I understand it correctly should be common for all 
versions starting 1.15, therefore it makes sense to address this starting 1.16. 
https://issues.apache.org/jira/browse/ZEPPELIN-5909
 
[~mapohl], Thank you for assistance in slack, I have created this ticket to 
back our  conversation, could you please add your thoughts on this failure mode?
 
One possible solution would be to have additional check for presence of 
JobResult in Result store before returning jobStatus (if there is a result, job 
shouldn't be reported as running based on this documentation: 
[https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--])

  was:
Using zeppelin with remote cluster I came across some race condition issue 
leading to failed expectations for SQL insert operations. 
 
Below is an example of zeppelin code that is failing because 
jobClient.getJobStatus() returns running even after job has finished. I have 
verified that same failover can happen if I use 
jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
execution finished" but job status is not consistently finished)
{code:java}
TableResult tableResult = ((TableEnvironmentInternal) 
tbenv).executeInternal(operations);
checkState(tableResult.getJobClient().isPresent());
try {
  tableResult.await();
  JobClient jobClient = tableResult.getJobClient().get();
  if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
context.out.write("Insertion successfully.\n");
  } else {
throw new IOException("Job is failed, " + 
jobClient.getJobExecutionResult().get().toString());
  }
} catch (InterruptedException e) {
  throw new IOException("Flink job is interrupted", e);
} catch (ExecutionException e) {
  throw new IOException("Flink job is failed", e);
} {code}
 
[ 
https://github.com/apache/flink/pull/18189|https://github.com/apache/flink/pull/18189].
ZeppelinCode: 
[https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]


 I suspect that job status is returned based on runningJobsRegistry and since 
1.15 this registry is not updated with FINISHED status prior to job result 
future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} 
[https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
 
 
It looks like as race condition that is hard to reproduce on lightweight setup. 
I was reproducing thi

[jira] [Created] (FLINK-32069) jobClient.getJobStatus() can return status RUNNING for finished insert operation

2023-05-12 Thread Aleksandr Iushmanov (Jira)
Aleksandr Iushmanov created FLINK-32069:
---

 Summary: jobClient.getJobStatus() can return status RUNNING for 
finished insert operation
 Key: FLINK-32069
 URL: https://issues.apache.org/jira/browse/FLINK-32069
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.15.4, 1.16.1
Reporter: Aleksandr Iushmanov


Using zeppelin with remote cluster I came across some race condition issue 
leading to failed expectations for SQL insert operations. 
 
Below is an example of zeppelin code that is failing because 
jobClient.getJobStatus() returns running even after job has finished. I have 
verified that same failover can happen if I use 
jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
execution finished" but job status is not consistently finished)
{code:java}
TableResult tableResult = ((TableEnvironmentInternal) 
tbenv).executeInternal(operations);
checkState(tableResult.getJobClient().isPresent());
try {
  tableResult.await();
  JobClient jobClient = tableResult.getJobClient().get();
  if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
context.out.write("Insertion successfully.\n");
  } else {
throw new IOException("Job is failed, " + 
jobClient.getJobExecutionResult().get().toString());
  }
} catch (InterruptedException e) {
  throw new IOException("Flink job is interrupted", e);
} catch (ExecutionException e) {
  throw new IOException("Flink job is failed", e);
} {code}
 
[ 
https://github.com/apache/flink/pull/18189|https://github.com/apache/flink/pull/18189].
ZeppelinCode: 
[https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
TableResult tableResult = ((TableEnvironmentInternal) 
tbenv).executeInternal(operations);
checkState(tableResult.getJobClient().isPresent());
try \{
  tableResult.await();
  JobClient jobClient = tableResult.getJobClient().get();
  if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
context.out.write("Insertion successfully.\n");
  } else \{
throw new IOException("Job is failed, " + 
jobClient.getJobExecutionResult().get().toString());
  }
} catch (InterruptedException e) \{
  throw new IOException("Flink job is interrupted", e);
} catch (ExecutionException e) \{
  throw new IOException("Flink job is failed", e);
}
 I suspect that job status is returned based on runningJobsRegistry and since 
1.15 this registry is not updated with FINISHED status prior to job result 
future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} 
[https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
 
 
It looks like as race condition that is hard to reproduce on lightweight setup. 
I was reproducing this running zeppelin notebook with remote flink cluster and 
triggering SQL insert operation. If I find a smaller setup to reproduce on 
small local cluster with lightweight client, I will update this ticket when I 
have more input. I am open to suggestions on how to fix this. 
 
For Zeppelin I have a separate ticket because Flink 1.15 is not going to be 
fixed but this issue if I understand it correctly should be common for all 
versions starting 1.15, therefore it makes sense to address this starting 1.16. 
https://issues.apache.org/jira/browse/ZEPPELIN-5909
 
[~mapohl], Thank you for assistance in slack, I have created this ticket to 
back our  conversation, could you please add your thoughts on this failure mode?
 
One possible solution would be to have additional check for presence of 
JobResult in Result store before returning jobStatus (if there is a result, job 
shouldn't be reported as running based on this documentation: 
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-web] RyanSkraba opened a new pull request, #648: [hotfix] Minor grammar fixes

2023-05-12 Thread via GitHub


RyanSkraba opened a new pull request, #648:
URL: https://github.com/apache/flink-web/pull/648

   I've batched together some observations that I've noted on the website, 
mostly grammar fixes and polish.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora merged pull request #595: [backport][FLINK-32002] Autoscaler default config improvements

2023-05-12 Thread via GitHub


gyfora merged PR #595:
URL: https://github.com/apache/flink-kubernetes-operator/pull/595


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32047) Fix args in JobSpec not being passed through to Flink in Standalone mode - 1.4.0

2023-05-12 Thread Usamah Jassat (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722198#comment-17722198
 ] 

Usamah Jassat commented on FLINK-32047:
---

Hey  Gil, have you got an example FlinkDeployment to recreate the issue? Which 
version of Flink do you see the issue with?

> Fix args in JobSpec not being passed through to Flink in Standalone mode - 
> 1.4.0
> 
>
> Key: FLINK-32047
> URL: https://issues.apache.org/jira/browse/FLINK-32047
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Gil Shmaya
>Priority: Major
> Attachments: image-2023-04-30-18-54-22-291.png, 
> image-2023-04-30-19-56-30-150.png, image-2023-04-30-19-56-57-680.png
>
>
> This issue is related to a previously fixed bug in version 1.2.0 -  
> FLINK-29388
> I have noticed that while the args are successfully being passed when using 
> version 1.2.0, this is not the case with version 1.4.0.
> {+}Scenario{+}:
> I added a log that prints the argument array length at the beginning of the 
> main  function of the flink job:
> !image-2023-04-30-18-54-22-291.png!
> The result when running with 1.2.0:
> !image-2023-04-30-19-56-30-150.png!
> The result when running with 1.4.0:
> !image-2023-04-30-19-56-57-680.png!
> h4.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-opensearch] reswqa opened a new pull request, #21: [hotfix] Workaround new violations message

2023-05-12 Thread via GitHub


reswqa opened a new pull request, #21:
URL: https://github.com/apache/flink-connector-opensearch/pull/21

   apache/flink `1.18` changes the `archunit-violations` rule about 
`MiniCluster` use in `ITCase`. Due to our workflow testing both `1.17` and 
`1.18` versions, duplicate a bit of the exclusion to suppress failure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request, #595: [backport][FLINK-32002] Autoscaler default config improvements

2023-05-12 Thread via GitHub


gyfora opened a new pull request, #595:
URL: https://github.com/apache/flink-kubernetes-operator/pull/595

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora merged pull request #594: [FLINK-32002] Autoscaler default config improvements

2023-05-12 Thread via GitHub


gyfora merged PR #594:
URL: https://github.com/apache/flink-kubernetes-operator/pull/594


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-28203) Mark all bundled dependencies as optional

2023-05-12 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-28203.

Resolution: Fixed

master:
edf5666224f12e4b345ecafc6ae2fe5e3ecea393
49dcdf0fc2a5f113c0c1558b65df4ffa587ff72a
4fa80bf27b5501e87c9abcb2004fac8083c6d660

> Mark all bundled dependencies as optional
> -
>
> Key: FLINK-28203
> URL: https://issues.apache.org/jira/browse/FLINK-28203
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28016) Support Maven 3.3+

2023-05-12 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-28016.

Resolution: Fixed

master: f0d01903aaa517af9c1be26b1244a778189dce65

> Support Maven 3.3+
> --
>
> Key: FLINK-28016
> URL: https://issues.apache.org/jira/browse/FLINK-28016
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.18.0
>
>
> We are currently de-facto limited to Maven 3.2.5 because our packaging relies 
> on the shade-plugin modifying the dependency tree at runtime when bundling 
> dependencies, which is no longer possible on Maven 3.3+.
> Being locked in to such an old Maven version isn't a good state to be in, and 
> the contributor experience suffers as well.
> I've been looking into removing this limitation by explicitly marking every 
> dependency that we bundle as {{optional}} in the poms, which really means 
> {{non-transitive}}. This ensures that the everything being bundled by one 
> module is not visible to other modules. Some tooling to capture developer 
> mistakes were also written.
> Overall this is actually quite a nice change, as it makes things more 
> explicit and reduces inconsistencies (e.g., the dependency plugin results are 
> questionable if the shade-plugin didn't run!); and it already highlighted 
> several problems in Flink.
> This change will have no effect on users or the released poms, because the 
> dependency-reduced poms will be generated as before and remove all modified 
> dependencies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zentol merged pull request #21349: [FLINK-28203] Support Maven 3.3+

2023-05-12 Thread via GitHub


zentol merged PR #21349:
URL: https://github.com/apache/flink/pull/21349


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request, #594: [FLINK-32002] Autoscaler default config improvements

2023-05-12 Thread via GitHub


gyfora opened a new pull request, #594:
URL: https://github.com/apache/flink-kubernetes-operator/pull/594

   Some further minor improvements to the autoscaler default config: 
   
- Turn off ineffective scaling detection by default as it probed to be a 
little unstable
- Set a large (but not too large) number for max scale up factor to improve 
the doc


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31706) The default source parallelism should be the same as execution's default parallelism under adaptive batch scheduler

2023-05-12 Thread clownxc (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722171#comment-17722171
 ] 

clownxc commented on FLINK-31706:
-

(y)(y):D Thank you for your reply, I see [~wanglijie] 

> The default source parallelism should be the same as execution's default 
> parallelism under adaptive batch scheduler
> ---
>
> Key: FLINK-31706
> URL: https://issues.apache.org/jira/browse/FLINK-31706
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Currently, the sources need to set 
> {{execution.batch.adaptive.auto-parallelism.default-source-parallelism }} in 
> the adaptive batch scheduler mode, otherwise, the source parallelism is only 
> 1 by default. A better solution might be set as the default execution 
> parallelism if no user configured. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32067) When no pod template configured, an invalid null pod template is configured

2023-05-12 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-32067.
--
Fix Version/s: kubernetes-operator-1.6.0
   Resolution: Fixed

main: af8aabf48928804619cfdb6874700b2bf2250a87
release-1.5: f5be73b9e8861fe7134c224d68eabcd10a9ebfd3

> When no pod template configured, an invalid null pod template is configured 
> 
>
> Key: FLINK-32067
> URL: https://issues.apache.org/jira/browse/FLINK-32067
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.5.0, kubernetes-operator-1.6.0
>
>
> https://issues.apache.org/jira/browse/FLINK-30609 introduced a bug in the 
> podtemplate logic that breaks deployments when no podtemplates are configured.
> The basic example doesnt work anymore for example. The reason is that an 
> invalid null object is set as podtemplate when nothing is configured.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31963) java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned checkpoints

2023-05-12 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-31963:
---
Affects Version/s: 1.15.4
   1.16.1
   1.18.0

> java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned 
> checkpoints
> -
>
> Key: FLINK-31963
> URL: https://issues.apache.org/jira/browse/FLINK-31963
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.0, 1.16.1, 1.15.4, 1.18.0
> Environment: Flink: 1.17.0
> FKO: 1.4.0
> StateBackend: RocksDB(Genetic Incremental Checkpoint & Unaligned Checkpoint 
> enabled)
>Reporter: Tan Kim
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: stability
> Attachments: image-2023-04-29-02-49-05-607.png, jobmanager_error.txt, 
> taskmanager_error.txt
>
>
> I'm testing Autoscaler through Kubernetes Operator and I'm facing the 
> following issue.
> As you know, when a job is scaled down through the autoscaler, the job 
> manager and task manager go down and then back up again.
> When this happens, an index out of bounds exception is thrown and the state 
> is not restored from a checkpoint.
> [~gyfora] told me via the Flink Slack troubleshooting channel that this is 
> likely an issue with Unaligned Checkpoint and not an issue with the 
> autoscaler, but I'm opening a ticket with Gyula for more clarification.
> Please see the attached JM and TM error logs.
> Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32068) flink-connector-jdbc support clickhouse

2023-05-12 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser reassigned FLINK-32068:
--

Assignee: leishuiyu

>  flink-connector-jdbc support clickhouse
> 
>
> Key: FLINK-32068
> URL: https://issues.apache.org/jira/browse/FLINK-32068
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / JDBC
>Reporter: leishuiyu
>Assignee: leishuiyu
>Priority: Minor
>
> flink sql support clickhouse
>  * int batch scene ,the clickhouse  can as source and sink
>  * int stream scene ,the clickhouse  as  sink



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32068) flink-connector-jdbc support clickhouse

2023-05-12 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722142#comment-17722142
 ] 

Martijn Visser commented on FLINK-32068:


[~leishuiyu] Sure. I've assigned the ticket to you. 

>  flink-connector-jdbc support clickhouse
> 
>
> Key: FLINK-32068
> URL: https://issues.apache.org/jira/browse/FLINK-32068
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / JDBC
>Reporter: leishuiyu
>Priority: Minor
>
> flink sql support clickhouse
>  * int batch scene ,the clickhouse  can as source and sink
>  * int stream scene ,the clickhouse  as  sink



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31717) Unit tests running with local kube config

2023-05-12 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-31717.
--
Resolution: Fixed

merged to main e399c9bbd37d2dca352d973de01be0cb403fec55

> Unit tests running with local kube config
> -
>
> Key: FLINK-31717
> URL: https://issues.apache.org/jira/browse/FLINK-31717
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.4.0
>Reporter: Matyas Orhidi
>Assignee: Mate Czagany
>Priority: Critical
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.6.0
>
>
> Some unit tests are using local kube environment. This can be dangerous when 
> pointing to sensitive clusters e.g. in prod.
> {quote}2023-04-03 12:32:53,956 i.f.k.c.Config [DEBUG] Found 
> for Kubernetes config at: [/Users//.kube/config].
> {quote}
> A misconfigured kube config environment revealed the issue:
> {quote}[ERROR] Tests run: 2, Failures: 0, Errors: 2, Skipped: 0, Time 
> elapsed: 0.012 s <<< FAILURE! - in 
> org.apache.flink.kubernetes.operator.FlinkOperatorTest
> [ERROR] 
> org.apache.flink.kubernetes.operator.FlinkOperatorTest.testConfigurationPassedToJOSDK
>   Time elapsed: 0.008 s  <<< ERROR!
> java.lang.NullPointerException
>   at 
> org.apache.flink.kubernetes.operator.FlinkOperatorTest.testConfigurationPassedToJOSDK(FlinkOperatorTest.java:63)
> [ERROR] 
> org.apache.flink.kubernetes.operator.FlinkOperatorTest.testLeaderElectionConfig
>   Time elapsed: 0.004 s  <<< ERROR!
> java.lang.NullPointerException
>   at 
> org.apache.flink.kubernetes.operator.FlinkOperatorTest.testLeaderElectionConfig(FlinkOperatorTest.java:108){quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #587: [FLINK-31717] Fix unit tests using local kube config

2023-05-12 Thread via GitHub


gyfora merged PR #587:
URL: https://github.com/apache/flink-kubernetes-operator/pull/587


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-32065) Got NoSuchFileException when initialize source function.

2023-05-12 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722121#comment-17722121
 ] 

Spongebob edited comment on FLINK-32065 at 5/12/23 11:15 AM:
-

Hi [~Thesharing] Does it likelihood due to the incorrect starting of the 
standalone cluster? Such as secondary starting the cluster before it is stopped.


was (Author: spongebobz):
Hi [~Thesharing] Does it likelihood due to the incorrect starting of the 
standalone cluster? Such as secondary starting the cluster before stopping it.

> Got NoSuchFileException when initialize source function.
> 
>
> Key: FLINK-32065
> URL: https://issues.apache.org/jira/browse/FLINK-32065
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.4
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2023-05-12-14-07-45-771.png, 
> image-2023-05-12-14-26-46-268.png, image-2023-05-12-17-37-09-002.png
>
>
> When I submit an application to flink standalone cluster, I got a 
> NoSuchFileException. I think it was failed to create the tmp channel file but 
> I am confused about the reason relative to this case.
> I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so 
> is this diretory only working for that step of the application ?
> BTW, this issue happen coincidently.
> !image-2023-05-12-14-07-45-771.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32065) Got NoSuchFileException when initialize source function.

2023-05-12 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722121#comment-17722121
 ] 

Spongebob commented on FLINK-32065:
---

Hi [~Thesharing] Does it likelihood due to the incorrect starting of the 
standalone cluster? Such as secondary starting the cluster before stopping it.

> Got NoSuchFileException when initialize source function.
> 
>
> Key: FLINK-32065
> URL: https://issues.apache.org/jira/browse/FLINK-32065
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.4
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2023-05-12-14-07-45-771.png, 
> image-2023-05-12-14-26-46-268.png, image-2023-05-12-17-37-09-002.png
>
>
> When I submit an application to flink standalone cluster, I got a 
> NoSuchFileException. I think it was failed to create the tmp channel file but 
> I am confused about the reason relative to this case.
> I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so 
> is this diretory only working for that step of the application ?
> BTW, this issue happen coincidently.
> !image-2023-05-12-14-07-45-771.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31963) java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned checkpoints

2023-05-12 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722120#comment-17722120
 ] 

Stefan Richter commented on FLINK-31963:


Seems that this is similar to the problem described in FLINK-27031.

> java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned 
> checkpoints
> -
>
> Key: FLINK-31963
> URL: https://issues.apache.org/jira/browse/FLINK-31963
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.0
> Environment: Flink: 1.17.0
> FKO: 1.4.0
> StateBackend: RocksDB(Genetic Incremental Checkpoint & Unaligned Checkpoint 
> enabled)
>Reporter: Tan Kim
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: stability
> Attachments: image-2023-04-29-02-49-05-607.png, jobmanager_error.txt, 
> taskmanager_error.txt
>
>
> I'm testing Autoscaler through Kubernetes Operator and I'm facing the 
> following issue.
> As you know, when a job is scaled down through the autoscaler, the job 
> manager and task manager go down and then back up again.
> When this happens, an index out of bounds exception is thrown and the state 
> is not restored from a checkpoint.
> [~gyfora] told me via the Flink Slack troubleshooting channel that this is 
> likely an issue with Unaligned Checkpoint and not an issue with the 
> autoscaler, but I'm opening a ticket with Gyula for more clarification.
> Please see the attached JM and TM error logs.
> Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-web] echauchot commented on pull request #643: Add blog article: Howto test a batch source with the new Source framework

2023-05-12 Thread via GitHub


echauchot commented on PR #643:
URL: https://github.com/apache/flink-web/pull/643#issuecomment-1545554335

   @zentol Thanks for your review (once again). I addressed all your comments 
PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] echauchot commented on pull request #643: Add blog article: Howto test a batch source with the new Source framework

2023-05-12 Thread via GitHub


echauchot commented on PR #643:
URL: https://github.com/apache/flink-web/pull/643#issuecomment-1545544022

   > > I took a quick look over it. I feel especially for the IT case it would 
be nice to first provide an overview over the differnet parts that make up a 
test "There's a test framework; We need a backend the source can read from, a 
test context for Flinks testing framework to interact with the backend, ..." 
and then drill down into the details as you already did.
   > > Right now it's very "I have to do this and this and this" but you don't 
really know where you're heading.
   > 
   > Totally agree, I'll do something similar to what I did on the source 
creation blog: an architecture paragraph
   
   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32068) flink-connector-jdbc support clickhouse

2023-05-12 Thread leishuiyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722107#comment-17722107
 ] 

leishuiyu commented on FLINK-32068:
---

This function can be implemented by me as a PR ?

>  flink-connector-jdbc support clickhouse
> 
>
> Key: FLINK-32068
> URL: https://issues.apache.org/jira/browse/FLINK-32068
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / JDBC
>Reporter: leishuiyu
>Priority: Minor
>
> flink sql support clickhouse
>  * int batch scene ,the clickhouse  can as source and sink
>  * int stream scene ,the clickhouse  as  sink



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32068) flink-connector-jdbc support clickhouse

2023-05-12 Thread leishuiyu (Jira)
leishuiyu created FLINK-32068:
-

 Summary:  flink-connector-jdbc support clickhouse
 Key: FLINK-32068
 URL: https://issues.apache.org/jira/browse/FLINK-32068
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / JDBC
Reporter: leishuiyu


flink sql support clickhouse
 * int batch scene ,the clickhouse  can as source and sink
 * int stream scene ,the clickhouse  as  sink



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31966) Flink Kubernetes operator lacks TLS support

2023-05-12 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722104#comment-17722104
 ] 

Gyula Fora commented on FLINK-31966:


[~adrianalexvasiliu] new configuration changes do not usually require a CRD 
change. It could be just an additional value set in the flinkConfiguration.

If CRD changes are required (such as adding new fields) we can always make 
those as long as they are backward compatible. There is no schedule for that :) 

> Flink Kubernetes operator lacks TLS support 
> 
>
> Key: FLINK-31966
> URL: https://issues.apache.org/jira/browse/FLINK-31966
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.4.0
>Reporter: Adrian Vasiliu
>Priority: Major
>
> *Summary*
> The Flink Kubernetes operator lacks support inside the FlinkDeployment 
> operand for configuring Flink with TLS (both one-way and mutual) for the 
> internal communication between jobmanagers and taskmanagers, and for the 
> external REST endpoint. Although a workaround exists to configure the job and 
> task managers, this breaks the operator and renders it unable to reconcile.
> *Additional information*
>  * The Apache Flink operator supports passing through custom flink 
> configuration to be applied to job and task managers.
>  * If you supply SSL-based properties, the operator can no longer speak to 
> the deployed job manager. The operator is reading the flink conf and using it 
> to create a connection to the job manager REST endpoint, but it uses the 
> truststore file paths within flink-conf.yaml, which are unresolvable from the 
> operator. This leaves the operator hanging in a pending state as it cannot 
> complete a reconcile.
> *Proposal*
> Our proposal is to make changes to the operator code. A simple change exists 
> that would be enough to enable anonymous SSL at the REST endpoint, but more 
> invasive changes would be required to enable full mTLS throughout.
> The simple change to enable anonymous SSL would be for the operator to parse 
> flink-conf and podTemplate to identify the Kubernetes resource that contains 
> the certificate from the job manager keystore and use it inside the 
> operator’s trust store.
> In the case of mutual TLS, further changes are required: the operator would 
> need to generate a certificate signed by the same issuing authority as the 
> job manager’s certificates and then use it in a keystore when challenged by 
> that job manager. We propose that the operator becomes responsible for making 
> CertificateSigningRequests to generate certificates for job manager, task 
> manager and operator. The operator can then coordinate deploying the job and 
> task managers with the correct flink-conf and volume mounts. This would also 
> work for anonymous SSL.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] godfreyhe commented on a diff in pull request #22492: [FLINK-31950][table-planner] Introduce StateMetadata and StateMetadataJson SerDe

2023-05-12 Thread via GitHub


godfreyhe commented on code in PR #22492:
URL: https://github.com/apache/flink/pull/22492#discussion_r1192099873


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java:
##
@@ -55,7 +55,7 @@ private ExecNodeConfig(
 this.isCompiled = isCompiled;
 }
 
-static ExecNodeConfig of(
+public static ExecNodeConfig of(

Review Comment:
   The reason changing the access level to public is this method can be used in 
testing ? If that, please add `VisibleForTesting` for the method 



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/StateMetadataJsonSerializer.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.planner.plan.nodes.exec.StateMetadata;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+import static 
org.apache.flink.table.planner.plan.nodes.exec.StateMetadata.FIELD_NAME_STATE_INDEX;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.StateMetadata.FIELD_NAME_STATE_NAME;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.StateMetadata.FIELD_NAME_STATE_TTL;
+
+/**
+ * JSON serializer for {@link StateMetadata}.
+ *
+ * @see StateMetadataJsonDeserializer for the reverse operation.
+ */
+@Internal
+public class StateMetadataJsonSerializer extends StdSerializer {
+
+private static final long serialVersionUID = 1L;
+
+StateMetadataJsonSerializer() {
+super(StateMetadata.class);
+}
+
+@Override
+public void serialize(
+StateMetadata stateMetadata,
+JsonGenerator jsonGenerator,
+SerializerProvider serializerProvider)
+throws IOException {
+jsonGenerator.writeStartObject();
+jsonGenerator.writeNumberField(FIELD_NAME_STATE_INDEX, 
stateMetadata.getStateIndex());
+jsonGenerator.writeStringField(FIELD_NAME_STATE_TTL, 
stateMetadata.getStateTtl() + " ms");

Review Comment:
   How about we use `Duration` type to avoid 
StateMetadataJsonSerializer/StateMetadataJsonDeserializer ?



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/StateMetadata.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java

[jira] [Commented] (FLINK-32065) Got NoSuchFileException when initialize source function.

2023-05-12 Thread Zhilong Hong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722100#comment-17722100
 ] 

Zhilong Hong commented on FLINK-32065:
--

It's hard to investigate the root cause with the limited context. Maybe full 
logs would be helpful. {{NoSuchFileException}} is thrown during the creation of 
{{{}FileChannelBoundedData{}}}. This is a bit of weird. Maybe it's because the 
path is deleted during the initialization of {{{}ResultPartitions{}}}.

Maybe you could try to change the location of temp files to another path. 
Change the value of configuration {{io.tmp.dirs}} to another valid path and see 
if this issue is solved.

> Got NoSuchFileException when initialize source function.
> 
>
> Key: FLINK-32065
> URL: https://issues.apache.org/jira/browse/FLINK-32065
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.4
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2023-05-12-14-07-45-771.png, 
> image-2023-05-12-14-26-46-268.png, image-2023-05-12-17-37-09-002.png
>
>
> When I submit an application to flink standalone cluster, I got a 
> NoSuchFileException. I think it was failed to create the tmp channel file but 
> I am confused about the reason relative to this case.
> I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so 
> is this diretory only working for that step of the application ?
> BTW, this issue happen coincidently.
> !image-2023-05-12-14-07-45-771.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-web] echauchot commented on a diff in pull request #643: Add blog article: Howto test a batch source with the new Source framework

2023-05-12 Thread via GitHub


echauchot commented on code in PR #643:
URL: https://github.com/apache/flink-web/pull/643#discussion_r1192171185


##
docs/content/posts/howto-test-batch-source.md:
##
@@ -0,0 +1,202 @@
+---
+title:  "Howto test a batch source with the new Source framework"
+date: "2023-04-14T08:00:00.000Z"
+authors:
+- echauchot:
+  name: "Etienne Chauchot"
+  twitter: "echauchot"
+
+---
+
+## Introduction
+
+The Flink community has
+designed [a new Source 
framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/)
+based
+on 
[FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
+lately. This article is the
+continuation of
+the [howto create a batch source with the new Source framework 
article](https://flink.apache.org/2023/04/14/howto-create-batch-source/)
+. Now it is
+time to test the created source ! As the previous article, this one was built 
while implementing the
+[Flink batch 
source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca)
+for [Cassandra](https://cassandra.apache.org/_/index.html).
+
+## Unit testing the source
+
+### Testing the serializers
+
+[example Cassandra 
SplitSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java)
+and 
[SplitEnumeratorStateSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java)
+
+In the previous article, we
+created 
[serializers](https://flink.apache.org/2023/04/14/howto-create-batch-source/#serializers)
+for Split and SplitEnumeratorState. We should now test them in unit tests. As 
usual, to test serde
+we just create an object, serialize it using the serializer and then 
deserialize it using the same
+serializer and finally assert on the equality of the two objects. Thus, 
hascode() and equals() need
+to be implemented for the serialized objects.
+
+### Other unit tests
+
+Of course, we also need to unit test low level processing such as query 
building for example or any
+processing that does not require a running backend.
+
+## Integration testing the source
+
+[example Cassandra SourceITCase
+](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java)
+
+For tests that require a running backend, Flink provides a JUnit5 source test 
framework. To use it
+we create an *ITCase named class that
+extends 
[SourceTestSuiteBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.html)
+. This test suite provides all
+the necessary tests already (single split, multiple splits, idle reader, 
etc...). It is targeted for
+batch and streaming sources, so for our batch source case here, the tests 
below need to be disabled
+as they are targeted for streaming sources. They can be disabled by overriding 
them in the ITCase
+and annotating them with @Disabled:
+
+* testSourceMetrics
+* testSavepoint
+* testScaleUp
+* testScaleDown
+* testTaskManagerFailure
+
+Of course we can add our own integration tests cases for example tests on 
limits, tests on low level
+splitting or any test that requires a running backend. But for most cases we 
only need to provide
+Flink test environment classes to configure the ITCase:
+
+### Flink runtime environment
+
+We add this annotated field to our ITCase and we're done
+
+`@TestEnv
+MiniClusterTestEnvironment flinkTestEnvironment = new 
MiniClusterTestEnvironment();
+`
+
+### Backend runtime environment
+
+[example Cassandra 
TestEnvironment](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java)
+
+We add this annotated field to our ITCase
+
+`@TestExternalSystem
+BackendTestEnvironment backendTestEnvironment = new BackendTestEnvironment();
+`
+
+BackendTestEnvironment
+implements 
[TestResource](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html)
+. This environment is scoped to the test suite so it is where we setup the 
backend runtime and
+shared resources (session, tablespace, etc...) by
+implementing 
[startup()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html#startUp--)
+and 
[tearDown()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html#tearDown--

[GitHub] [flink] gaborgsomogyi commented on pull request #22550: [FLINK-31609][yarn][test] Extend log whitelist for expected AMRM heartbeat interrupt

2023-05-12 Thread via GitHub


gaborgsomogyi commented on PR #22550:
URL: https://github.com/apache/flink/pull/22550#issuecomment-1545492583

   The change itself looks good, could you plz show which test failed and how? 
(some log)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] echauchot commented on a diff in pull request #643: Add blog article: Howto test a batch source with the new Source framework

2023-05-12 Thread via GitHub


echauchot commented on code in PR #643:
URL: https://github.com/apache/flink-web/pull/643#discussion_r1192167315


##
docs/content/posts/howto-test-batch-source.md:
##
@@ -0,0 +1,202 @@
+---
+title:  "Howto test a batch source with the new Source framework"
+date: "2023-04-14T08:00:00.000Z"
+authors:
+- echauchot:
+  name: "Etienne Chauchot"
+  twitter: "echauchot"
+
+---
+
+## Introduction
+
+The Flink community has
+designed [a new Source 
framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/)
+based
+on 
[FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
+lately. This article is the
+continuation of
+the [howto create a batch source with the new Source framework 
article](https://flink.apache.org/2023/04/14/howto-create-batch-source/)
+. Now it is
+time to test the created source ! As the previous article, this one was built 
while implementing the
+[Flink batch 
source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca)
+for [Cassandra](https://cassandra.apache.org/_/index.html).
+
+## Unit testing the source
+
+### Testing the serializers
+
+[example Cassandra 
SplitSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java)
+and 
[SplitEnumeratorStateSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java)
+
+In the previous article, we
+created 
[serializers](https://flink.apache.org/2023/04/14/howto-create-batch-source/#serializers)
+for Split and SplitEnumeratorState. We should now test them in unit tests. As 
usual, to test serde
+we just create an object, serialize it using the serializer and then 
deserialize it using the same
+serializer and finally assert on the equality of the two objects. Thus, 
hascode() and equals() need
+to be implemented for the serialized objects.
+
+### Other unit tests
+
+Of course, we also need to unit test low level processing such as query 
building for example or any
+processing that does not require a running backend.
+
+## Integration testing the source
+
+[example Cassandra SourceITCase
+](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java)
+
+For tests that require a running backend, Flink provides a JUnit5 source test 
framework. To use it
+we create an *ITCase named class that
+extends 
[SourceTestSuiteBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.html)
+. This test suite provides all
+the necessary tests already (single split, multiple splits, idle reader, 
etc...). It is targeted for
+batch and streaming sources, so for our batch source case here, the tests 
below need to be disabled
+as they are targeted for streaming sources. They can be disabled by overriding 
them in the ITCase
+and annotating them with @Disabled:
+
+* testSourceMetrics
+* testSavepoint
+* testScaleUp
+* testScaleDown
+* testTaskManagerFailure
+
+Of course we can add our own integration tests cases for example tests on 
limits, tests on low level
+splitting or any test that requires a running backend. But for most cases we 
only need to provide
+Flink test environment classes to configure the ITCase:
+
+### Flink runtime environment
+
+We add this annotated field to our ITCase and we're done
+
+`@TestEnv
+MiniClusterTestEnvironment flinkTestEnvironment = new 
MiniClusterTestEnvironment();
+`
+
+### Backend runtime environment
+
+[example Cassandra 
TestEnvironment](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java)
+
+We add this annotated field to our ITCase
+
+`@TestExternalSystem
+BackendTestEnvironment backendTestEnvironment = new BackendTestEnvironment();
+`
+
+BackendTestEnvironment
+implements 
[TestResource](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html)
+. This environment is scoped to the test suite so it is where we setup the 
backend runtime and
+shared resources (session, tablespace, etc...) by
+implementing 
[startup()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html#startUp--)
+and 
[tearDown()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html#tearDown--

[GitHub] [flink-web] echauchot commented on a diff in pull request #643: Add blog article: Howto test a batch source with the new Source framework

2023-05-12 Thread via GitHub


echauchot commented on code in PR #643:
URL: https://github.com/apache/flink-web/pull/643#discussion_r1192162301


##
docs/content/posts/howto-test-batch-source.md:
##
@@ -0,0 +1,202 @@
+---
+title:  "Howto test a batch source with the new Source framework"
+date: "2023-04-14T08:00:00.000Z"
+authors:
+- echauchot:
+  name: "Etienne Chauchot"
+  twitter: "echauchot"
+
+---
+
+## Introduction
+
+The Flink community has
+designed [a new Source 
framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/)
+based
+on 
[FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
+lately. This article is the
+continuation of
+the [howto create a batch source with the new Source framework 
article](https://flink.apache.org/2023/04/14/howto-create-batch-source/)
+. Now it is
+time to test the created source ! As the previous article, this one was built 
while implementing the
+[Flink batch 
source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca)
+for [Cassandra](https://cassandra.apache.org/_/index.html).
+
+## Unit testing the source
+
+### Testing the serializers
+
+[example Cassandra 
SplitSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java)
+and 
[SplitEnumeratorStateSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java)
+
+In the previous article, we
+created 
[serializers](https://flink.apache.org/2023/04/14/howto-create-batch-source/#serializers)
+for Split and SplitEnumeratorState. We should now test them in unit tests. As 
usual, to test serde
+we just create an object, serialize it using the serializer and then 
deserialize it using the same
+serializer and finally assert on the equality of the two objects. Thus, 
hascode() and equals() need
+to be implemented for the serialized objects.
+
+### Other unit tests
+
+Of course, we also need to unit test low level processing such as query 
building for example or any
+processing that does not require a running backend.
+
+## Integration testing the source
+
+[example Cassandra SourceITCase
+](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java)
+
+For tests that require a running backend, Flink provides a JUnit5 source test 
framework. To use it
+we create an *ITCase named class that
+extends 
[SourceTestSuiteBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.html)
+. This test suite provides all
+the necessary tests already (single split, multiple splits, idle reader, 
etc...). It is targeted for
+batch and streaming sources, so for our batch source case here, the tests 
below need to be disabled
+as they are targeted for streaming sources. They can be disabled by overriding 
them in the ITCase
+and annotating them with @Disabled:
+
+* testSourceMetrics
+* testSavepoint
+* testScaleUp
+* testScaleDown
+* testTaskManagerFailure
+
+Of course we can add our own integration tests cases for example tests on 
limits, tests on low level
+splitting or any test that requires a running backend. But for most cases we 
only need to provide
+Flink test environment classes to configure the ITCase:
+
+### Flink runtime environment
+
+We add this annotated field to our ITCase and we're done
+
+`@TestEnv
+MiniClusterTestEnvironment flinkTestEnvironment = new 
MiniClusterTestEnvironment();
+`
+
+### Backend runtime environment

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] echauchot commented on a diff in pull request #643: Add blog article: Howto test a batch source with the new Source framework

2023-05-12 Thread via GitHub


echauchot commented on code in PR #643:
URL: https://github.com/apache/flink-web/pull/643#discussion_r1192157171


##
docs/content/posts/howto-test-batch-source.md:
##
@@ -0,0 +1,202 @@
+---
+title:  "Howto test a batch source with the new Source framework"
+date: "2023-04-14T08:00:00.000Z"
+authors:
+- echauchot:
+  name: "Etienne Chauchot"
+  twitter: "echauchot"
+
+---
+
+## Introduction
+
+The Flink community has
+designed [a new Source 
framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/)
+based
+on 
[FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
+lately. This article is the
+continuation of
+the [howto create a batch source with the new Source framework 
article](https://flink.apache.org/2023/04/14/howto-create-batch-source/)
+. Now it is
+time to test the created source ! As the previous article, this one was built 
while implementing the
+[Flink batch 
source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca)
+for [Cassandra](https://cassandra.apache.org/_/index.html).
+
+## Unit testing the source
+
+### Testing the serializers
+
+[example Cassandra 
SplitSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java)
+and 
[SplitEnumeratorStateSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java)
+
+In the previous article, we
+created 
[serializers](https://flink.apache.org/2023/04/14/howto-create-batch-source/#serializers)
+for Split and SplitEnumeratorState. We should now test them in unit tests. As 
usual, to test serde
+we just create an object, serialize it using the serializer and then 
deserialize it using the same
+serializer and finally assert on the equality of the two objects. Thus, 
hascode() and equals() need
+to be implemented for the serialized objects.
+
+### Other unit tests
+
+Of course, we also need to unit test low level processing such as query 
building for example or any
+processing that does not require a running backend.
+
+## Integration testing the source
+
+[example Cassandra SourceITCase
+](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java)
+
+For tests that require a running backend, Flink provides a JUnit5 source test 
framework. To use it
+we create an *ITCase named class that
+extends 
[SourceTestSuiteBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.html)
+. This test suite provides all
+the necessary tests already (single split, multiple splits, idle reader, 
etc...). It is targeted for
+batch and streaming sources, so for our batch source case here, the tests 
below need to be disabled
+as they are targeted for streaming sources. They can be disabled by overriding 
them in the ITCase
+and annotating them with @Disabled:
+
+* testSourceMetrics
+* testSavepoint
+* testScaleUp
+* testScaleDown
+* testTaskManagerFailure
+
+Of course we can add our own integration tests cases for example tests on 
limits, tests on low level
+splitting or any test that requires a running backend. But for most cases we 
only need to provide
+Flink test environment classes to configure the ITCase:
+
+### Flink runtime environment
+
+We add this annotated field to our ITCase and we're done
+
+`@TestEnv
+MiniClusterTestEnvironment flinkTestEnvironment = new 
MiniClusterTestEnvironment();
+`
+
+### Backend runtime environment

Review Comment:
   ok fair enough, in that case I'll also remove "runtime" from the "Flink 
runtime environment" title for coherence.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] echauchot commented on a diff in pull request #643: Add blog article: Howto test a batch source with the new Source framework

2023-05-12 Thread via GitHub


echauchot commented on code in PR #643:
URL: https://github.com/apache/flink-web/pull/643#discussion_r1192157171


##
docs/content/posts/howto-test-batch-source.md:
##
@@ -0,0 +1,202 @@
+---
+title:  "Howto test a batch source with the new Source framework"
+date: "2023-04-14T08:00:00.000Z"
+authors:
+- echauchot:
+  name: "Etienne Chauchot"
+  twitter: "echauchot"
+
+---
+
+## Introduction
+
+The Flink community has
+designed [a new Source 
framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/)
+based
+on 
[FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
+lately. This article is the
+continuation of
+the [howto create a batch source with the new Source framework 
article](https://flink.apache.org/2023/04/14/howto-create-batch-source/)
+. Now it is
+time to test the created source ! As the previous article, this one was built 
while implementing the
+[Flink batch 
source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca)
+for [Cassandra](https://cassandra.apache.org/_/index.html).
+
+## Unit testing the source
+
+### Testing the serializers
+
+[example Cassandra 
SplitSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java)
+and 
[SplitEnumeratorStateSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java)
+
+In the previous article, we
+created 
[serializers](https://flink.apache.org/2023/04/14/howto-create-batch-source/#serializers)
+for Split and SplitEnumeratorState. We should now test them in unit tests. As 
usual, to test serde
+we just create an object, serialize it using the serializer and then 
deserialize it using the same
+serializer and finally assert on the equality of the two objects. Thus, 
hascode() and equals() need
+to be implemented for the serialized objects.
+
+### Other unit tests
+
+Of course, we also need to unit test low level processing such as query 
building for example or any
+processing that does not require a running backend.
+
+## Integration testing the source
+
+[example Cassandra SourceITCase
+](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java)
+
+For tests that require a running backend, Flink provides a JUnit5 source test 
framework. To use it
+we create an *ITCase named class that
+extends 
[SourceTestSuiteBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.html)
+. This test suite provides all
+the necessary tests already (single split, multiple splits, idle reader, 
etc...). It is targeted for
+batch and streaming sources, so for our batch source case here, the tests 
below need to be disabled
+as they are targeted for streaming sources. They can be disabled by overriding 
them in the ITCase
+and annotating them with @Disabled:
+
+* testSourceMetrics
+* testSavepoint
+* testScaleUp
+* testScaleDown
+* testTaskManagerFailure
+
+Of course we can add our own integration tests cases for example tests on 
limits, tests on low level
+splitting or any test that requires a running backend. But for most cases we 
only need to provide
+Flink test environment classes to configure the ITCase:
+
+### Flink runtime environment
+
+We add this annotated field to our ITCase and we're done
+
+`@TestEnv
+MiniClusterTestEnvironment flinkTestEnvironment = new 
MiniClusterTestEnvironment();
+`
+
+### Backend runtime environment

Review Comment:
   ok fair enough, in that case I'll also remove "runtime" from "Flink runtime 
environment" title for coherence.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a diff in pull request #22506: [FLINK-31890][runtime] Introduce SchedulerBase per-task failure enrichment/labeling

2023-05-12 Thread via GitHub


zhuzhurk commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1192155460


##
flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java:
##
@@ -1289,4 +1293,59 @@ public static  CompletableFuture switchExecutor(
 },
 executor);
 }
+
+/**
+ * A serializable implementation of CompletableFuture.
+ *
+ * This class extends CompletableFuture and implements the Serializable 
interface to allow it
+ * to be serialized and deserialized. The result of the CompletableFuture 
is extracted and
+ * serialized when the object is written to a stream, and the result is 
set using the complete()
+ * method when the object is read from a stream.
+ *
+ * @param  the type of the result of the CompletableFuture
+ */
+public static class SerializableCompletableFuture extends 
CompletableFuture
+implements Serializable {
+private static final long serialVersionUID = 1L;
+private transient T result;
+
+public SerializableCompletableFuture(T value) {
+this.result = value;
+this.complete(value);
+}
+
+/**
+ * Writes this object to the given OutputStream. The result of the 
CompletableFuture is
+ * extracted and serialized along with the object.
+ *
+ * @param out the ObjectOutputStream to write to
+ * @throws IOException if an I/O error occurs
+ */
+private void writeObject(ObjectOutputStream out)
+throws IOException, ExecutionException, InterruptedException {
+out.defaultWriteObject();
+if (result == null) {
+result = this.get();

Review Comment:
   This `get()` is possible to block the main thread. Maybe we do not need to 
introduce a `SerializableCompletableFuture`, but instead modify the ErrorInfo 
to achieve the goal. e.g.
   * introduce two fields to ErrorInfo: `transient  
CompletableFuture> labelsFuture` as well as a `Map labels`
   * the `labels` will be set as soon as `labelsFuture` is completed
   * `ErrorInfo#getLabels()` returns a `Map`, which can be 
empty if `labels` is null and `labelsFuture` is not completed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32067) When no pod template configured, an invalid null pod template is configured

2023-05-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32067:
---
Labels: pull-request-available  (was: )

> When no pod template configured, an invalid null pod template is configured 
> 
>
> Key: FLINK-32067
> URL: https://issues.apache.org/jira/browse/FLINK-32067
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.5.0
>
>
> https://issues.apache.org/jira/browse/FLINK-30609 introduced a bug in the 
> podtemplate logic that breaks deployments when no podtemplates are configured.
> The basic example doesnt work anymore for example. The reason is that an 
> invalid null object is set as podtemplate when nothing is configured.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request, #592: [FLINK-32067] Do not configure invalid null podTemplate

2023-05-12 Thread via GitHub


gyfora opened a new pull request, #592:
URL: https://github.com/apache/flink-kubernetes-operator/pull/592

   ## What is the purpose of the change
   
   Recent changes broke the podTemplate configuration logic when no pod 
template or ephemeral storage is configured resulting in an invalid null object 
serialized and set.
   
   ## Brief change log
   
- Fix invalid logic
- Add unit test
   
   ## Verifying this change
   
   New unit test case has been added.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
no
 - Core observer or reconciler logic that is regularly executed: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32065) Got NoSuchFileException when initialize source function.

2023-05-12 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722094#comment-17722094
 ] 

Spongebob commented on FLINK-32065:
---

Hi, [~Thesharing] , may be I could provide these logs for you.

!image-2023-05-12-17-37-09-002.png!

> Got NoSuchFileException when initialize source function.
> 
>
> Key: FLINK-32065
> URL: https://issues.apache.org/jira/browse/FLINK-32065
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.4
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2023-05-12-14-07-45-771.png, 
> image-2023-05-12-14-26-46-268.png, image-2023-05-12-17-37-09-002.png
>
>
> When I submit an application to flink standalone cluster, I got a 
> NoSuchFileException. I think it was failed to create the tmp channel file but 
> I am confused about the reason relative to this case.
> I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so 
> is this diretory only working for that step of the application ?
> BTW, this issue happen coincidently.
> !image-2023-05-12-14-07-45-771.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32065) Got NoSuchFileException when initialize source function.

2023-05-12 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-32065:
--
Attachment: image-2023-05-12-17-37-09-002.png

> Got NoSuchFileException when initialize source function.
> 
>
> Key: FLINK-32065
> URL: https://issues.apache.org/jira/browse/FLINK-32065
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.4
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2023-05-12-14-07-45-771.png, 
> image-2023-05-12-14-26-46-268.png, image-2023-05-12-17-37-09-002.png
>
>
> When I submit an application to flink standalone cluster, I got a 
> NoSuchFileException. I think it was failed to create the tmp channel file but 
> I am confused about the reason relative to this case.
> I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so 
> is this diretory only working for that step of the application ?
> BTW, this issue happen coincidently.
> !image-2023-05-12-14-07-45-771.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32067) When no pod template configured, an invalid null pod template is configured

2023-05-12 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-32067:
--

 Summary: When no pod template configured, an invalid null pod 
template is configured 
 Key: FLINK-32067
 URL: https://issues.apache.org/jira/browse/FLINK-32067
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.5.0
Reporter: Gyula Fora
Assignee: Gyula Fora
 Fix For: kubernetes-operator-1.5.0


https://issues.apache.org/jira/browse/FLINK-30609 introduced a bug in the 
podtemplate logic that breaks deployments when no podtemplates are configured.

The basic example doesnt work anymore for example. The reason is that an 
invalid null object is set as podtemplate when nothing is configured.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-elasticsearch] complone commented on pull request #62: [FLINK-25568] Support Elasticsearch Source Connector

2023-05-12 Thread via GitHub


complone commented on PR #62:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/62#issuecomment-1545440640

   > Support for FilterPushDown, ProjectPushDown, and LimitPushDown to optimize 
query performance. Have these functions been implemented? I don't seem to have 
seen this part of the code
   
   Hello, I am very happy to read your comments. Currently this PR belongs to 
draft status. For the three push-downs you mentioned. As shown in FLIP-127, I 
am going to implement related implementations of FilterPushDown, 
ProjectPushDown, and LimitPushDown and optimize es query expressions. Currently 
only the ProjectPushDown interface is implemented, please refer to 
Elasticsearch7DynamicSource#applyProjection


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32063) AWS CI mvn compile fails to cast objects to parent type.

2023-05-12 Thread Ahmed Hamdy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722072#comment-17722072
 ] 

Ahmed Hamdy commented on FLINK-32063:
-

So apparently the CI patches the changes to main before running tests.
The issue was due to an un-rebased change from main.
Even though this is an untraditional way for running CI this is not an issue.
Please close as "not an issue".

> AWS CI mvn compile fails to cast objects to parent type.
> 
>
> Key: FLINK-32063
> URL: https://issues.apache.org/jira/browse/FLINK-32063
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / AWS, Tests
>Reporter: Ahmed Hamdy
>Priority: Minor
>  Labels: test-stability
>
> h2. Description
> AWS Connectors CI fails to cast {{TestSinkInitContext}} into base type 
> {{InitContext}},
> - Failure
> https://github.com/apache/flink-connector-aws/actions/runs/4924790308/jobs/8841458606?pr=70
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32060) Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5

2023-05-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32060:
---
Labels: pull-request-available  (was: )

> Migrate subclasses of BatchAbstractTestBase in table and other modules to 
> JUnit5
> 
>
> Key: FLINK-32060
> URL: https://issues.apache.org/jira/browse/FLINK-32060
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
> Environment: Migrate subclasses of BatchAbstractTestBase in table and 
> other modules to JUnit5.
>Reporter: Yuxin Tan
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-web] echauchot commented on a diff in pull request #643: Add blog article: Howto test a batch source with the new Source framework

2023-05-12 Thread via GitHub


echauchot commented on code in PR #643:
URL: https://github.com/apache/flink-web/pull/643#discussion_r1192083594


##
docs/content/posts/howto-test-batch-source.md:
##
@@ -0,0 +1,202 @@
+---
+title:  "Howto test a batch source with the new Source framework"
+date: "2023-04-14T08:00:00.000Z"
+authors:
+- echauchot:
+  name: "Etienne Chauchot"
+  twitter: "echauchot"
+
+---
+
+## Introduction
+
+The Flink community has
+designed [a new Source 
framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/)
+based
+on 
[FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
+lately. This article is the
+continuation of
+the [howto create a batch source with the new Source framework 
article](https://flink.apache.org/2023/04/14/howto-create-batch-source/)
+. Now it is
+time to test the created source ! As the previous article, this one was built 
while implementing the
+[Flink batch 
source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca)
+for [Cassandra](https://cassandra.apache.org/_/index.html).
+
+## Unit testing the source
+
+### Testing the serializers
+
+[example Cassandra 
SplitSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java)
+and 
[SplitEnumeratorStateSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java)
+
+In the previous article, we
+created 
[serializers](https://flink.apache.org/2023/04/14/howto-create-batch-source/#serializers)
+for Split and SplitEnumeratorState. We should now test them in unit tests. As 
usual, to test serde
+we just create an object, serialize it using the serializer and then 
deserialize it using the same
+serializer and finally assert on the equality of the two objects. Thus, 
hascode() and equals() need
+to be implemented for the serialized objects.
+
+### Other unit tests
+
+Of course, we also need to unit test low level processing such as query 
building for example or any
+processing that does not require a running backend.
+
+## Integration testing the source
+
+[example Cassandra SourceITCase
+](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java)
+
+For tests that require a running backend, Flink provides a JUnit5 source test 
framework. To use it
+we create an *ITCase named class that
+extends 
[SourceTestSuiteBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.html)
+. This test suite provides all
+the necessary tests already (single split, multiple splits, idle reader, 
etc...). It is targeted for
+batch and streaming sources, so for our batch source case here, the tests 
below need to be disabled
+as they are targeted for streaming sources. They can be disabled by overriding 
them in the ITCase
+and annotating them with @Disabled:
+
+* testSourceMetrics
+* testSavepoint
+* testScaleUp
+* testScaleDown
+* testTaskManagerFailure
+
+Of course we can add our own integration tests cases for example tests on 
limits, tests on low level
+splitting or any test that requires a running backend. But for most cases we 
only need to provide
+Flink test environment classes to configure the ITCase:
+
+### Flink runtime environment
+
+We add this annotated field to our ITCase and we're done
+
+`@TestEnv
+MiniClusterTestEnvironment flinkTestEnvironment = new 
MiniClusterTestEnvironment();
+`
+
+### Backend runtime environment

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa opened a new pull request, #22574: [FLINK-32060][test] Migrate subclasses of BatchAbstractTestBase in table and other modules to JUnit5

2023-05-12 Thread via GitHub


reswqa opened a new pull request, #22574:
URL: https://github.com/apache/flink/pull/22574

   ## What is the purpose of the change
   
   *Migrate subclasses of BatchAbstractTestBase in table and other modules to 
JUnit5*
   
   
   ## Brief change log
   
 - *Migrate subclasses of BatchAbstractTestBase in table and other modules 
to JUnit5*
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32065) Got NoSuchFileException when initialize source function.

2023-05-12 Thread Zhilong Hong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722070#comment-17722070
 ] 

Zhilong Hong commented on FLINK-32065:
--

Hi, [~SpongebobZ] Would you please upload the full log of the TaskExecutor? It 
seems this issue happens during the initialization of 
BoundedBlockingPartitions. This folder is used by blocking shuffle.

> Got NoSuchFileException when initialize source function.
> 
>
> Key: FLINK-32065
> URL: https://issues.apache.org/jira/browse/FLINK-32065
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.4
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2023-05-12-14-07-45-771.png, 
> image-2023-05-12-14-26-46-268.png
>
>
> When I submit an application to flink standalone cluster, I got a 
> NoSuchFileException. I think it was failed to create the tmp channel file but 
> I am confused about the reason relative to this case.
> I found that this sub-directory `flink-netty-shuffle-xxx` was not existed, so 
> is this diretory only working for that step of the application ?
> BTW, this issue happen coincidently.
> !image-2023-05-12-14-07-45-771.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >