[GitHub] [flink-table-store] tsreaper commented on pull request #267: [hotfix] Fix github workflow build-different-versions.yaml
tsreaper commented on PR #267: URL: https://github.com/apache/flink-table-store/pull/267#issuecomment-1216223634 Tests passed in https://github.com/tsreaper/flink-table-store/actions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28265) Inconsistency in Kubernetes HA service: broken state handle
[ https://issues.apache.org/jira/browse/FLINK-28265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28265: --- Labels: pull-request-available (was: ) > Inconsistency in Kubernetes HA service: broken state handle > --- > > Key: FLINK-28265 > URL: https://issues.apache.org/jira/browse/FLINK-28265 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.14.4 >Reporter: Robert Metzger >Priority: Major > Labels: pull-request-available > Attachments: flink_checkpoint_issue.txt > > > I have a JobManager, which at some point failed to acknowledge a checkpoint: > {code} > Error while processing AcknowledgeCheckpoint message > org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete > the pending checkpoint 193393. Failure reason: Failure to finalize checkpoint. > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1255) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1100) > at > org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89) > at > org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: > org.apache.flink.runtime.persistence.StateHandleStore$AlreadyExistException: > checkpointID-0193393 already exists in ConfigMap > cm--jobmanager-leader > at > org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.getKeyAlreadyExistException(KubernetesStateHandleStore.java:534) > at > org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.lambda$addAndLock$0(KubernetesStateHandleStore.java:155) > at > org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$attemptCheckAndUpdateConfigMap$11(Fabric8FlinkKubeClient.java:316) > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown > Source) > ... 3 common frames omitted > {code} > the JobManager creates subsequent checkpoints successfully. > Upon failure, it tries to recover this checkpoint (0193393), but > fails to do so because of: > {code} > Caused by: org.apache.flink.util.FlinkException: Could not retrieve > checkpoint 193393 from state handle under checkpointID-0193393. > This indicates that the retrieved state handle is broken. Try cleaning the > state handle store ... Caused by: java.io.FileNotFoundException: No such file > or directory: s3://xxx/flink-ha/xxx/completedCheckpoint72e30229420c > {code} > I'm running Flink 1.14.4. > Note: This issue has been first discussed here: > https://github.com/apache/flink/pull/15832#pullrequestreview-1005973050 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wangyang0918 opened a new pull request, #20590: [FLINK-28265][k8s] Do not discard state when the AlreadyExistException is caused by retries
wangyang0918 opened a new pull request, #20590: URL: https://github.com/apache/flink/pull/20590 ## What is the purpose of the change If something is temporarily wrong with the JobManager network, `Fabric8FlinkKubeClient#checkAndUpdateConfigMap` failed with `KubernetesException` in the first run and retried again. However, the http request is actually sent successfully and handled by the K8s APIServer, which means the entry was added to the ConfigMap. This will cause the second retry fails with `AlreadyExistException` and then discard the state. If the JobManager crashed exactly, it will throw the `FileNotFoundException: No such file or directory: s3://xxx/flink-ha/xxx/completedCheckpoint72e30229420c` in the following attempts since added entry is not cleaned up. By make the `AlreadyExistException` in `KubernetesStateHandleStore#addAndLock` caused by `PossibleInconsistentStateException` we could avoid discarding the state. ## Brief change log * Do not discard state when the AlreadyExistException is caused by retries ## Verifying this change * Add a new unit test `testAddWithAlreadyExistExceptionCausedByRetriesShouldNotDiscardState` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28986) UNNEST function with nested fails to generate plan
[ https://issues.apache.org/jira/browse/FLINK-28986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580082#comment-17580082 ] godfrey he commented on FLINK-28986: [~qingyue] Thanks for reporting this, assign to you > UNNEST function with nested fails to generate plan > -- > > Key: FLINK-28986 > URL: https://issues.apache.org/jira/browse/FLINK-28986 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: Jane Chan >Assignee: Jane Chan >Priority: Major > Attachments: image-2022-08-16-14-36-07-061.png > > > h3. How to reproduce > add the following case to TableEnvironmentITCase > {code:scala} > @Test > def debug(): Unit = { > tEnv.executeSql( > s""" >|CREATE TEMPORARY TABLE source_kafka_wip_his_all ( >| GUID varchar, >| OPERATION varchar, >| PRODUCTID varchar, >| LOTNO varchar, >| SERIALNO varchar, >| QUERYSERIALNO varchar, >| SERIALNO1 varchar, >| SERIALNO2 varchar, >| WIPORDERNO varchar, >| WIPORDERTYPE varchar, >| VIRTUALLOT varchar, >| PREOPERATION varchar, >| NORMALPREOPERATION varchar, >| PROCESSID varchar, >| EQUIPMENT varchar, >| INBOUNDDATE varchar, >| OUTBOUNDDATE varchar, >| REWORK varchar, >| REWORKPROCESSID varchar, >| CONTAINER varchar, >| WIPCONTENTCLASSID varchar, >| STATUSCODE varchar, >| WIPSTATUS varchar, >| TESTPROCESSID varchar, >| TESTORDERTYPE varchar, >| TESTORDER varchar, >| TEST varchar, >| SORTINGPROCESSID varchar, >| SORTINGORDERTYPE varchar, >| SORTINGORDER varchar, >| SORTING varchar, >| MINO varchar, >| GROUPCODE varchar, >| HIGHLOWGROUP varchar, >| PRODUCTNO varchar, >| FACILITY varchar, >| WIPLINE varchar, >| CHILDEQUCODE varchar, >| STATION varchar, >| QTY varchar, >| PASS_FLAG varchar, >| DEFECTCODELIST varchar, >| ISFIRST varchar, >| PARALIST ARRAY string,OPERATION string,REWORKPROCESSID string,CHARACTERISTIC > string,CHARACTERISTICREVISION string,CHARACTERISTICTYPE > string,CHARACTERISTICCLASS string,UPPERCONTROLLIMIT string,TARGETVALUE > string,LOWERCONTROLLIMIT string,TESTVALUE string,TESTATTRIBUTE > string,TESTINGSTARTDATE string,TESTFINISHDATE string,UOMCODE > string,DEFECTCODE string,SPECPARAMID string,STATION string,GP_TIME > string,REFERENCEID string,LASTUPDATEON string,LASTUPDATEDBY string,CREATEDON > string,CREATEDBY string,ACTIVE string,LASTDELETEON string,LASTDELETEDBY > string,LASTREACTIVATEON string,LASTREACTIVATEDBY string,ARCHIVEID > string,LASTARCHIVEON string,LASTARCHIVEDBY string,LASTRESTOREON > string,LASTRESTOREDBY string,ROWVERSIONSTAMP string)>, >| REFERENCEID varchar, >| LASTUPDATEON varchar, >| LASTUPDATEDBY varchar, >| CREATEDON varchar, >| CREATEDBY varchar, >| ACTIVE varchar, >| LASTDELETEON varchar, >| LASTDELETEDBY varchar, >| LASTREACTIVATEON varchar, >| LASTREACTIVATEDBY varchar, >| ARCHIVEID varchar, >| LASTARCHIVEON varchar, >| LASTARCHIVEDBY varchar, >| LASTRESTOREON varchar, >| LASTRESTOREDBY varchar, >| ROWVERSIONSTAMP varchar, >| proctime as PROCTIME() >| ) with ( >| 'connector' = 'datagen' >|) >|""".stripMargin) > tEnv.executeSql( > s""" >|create TEMPORARY view transform_main_data as >|select >| r.GUID as wip_his_guid, >| r.EQUIPMENT as equipment, >| r.WIPLINE as wipline, >| r.STATION as station, >| cast(r.PROCESSID as decimal) as processid, >| r.PRODUCTNO as productno, >| t.TESTFINISHDATE as testfinishdate, >| t.OPERATION as operation, >| t.CHARACTERISTIC as characteristic, >| t.LOWERCONTROLLIMIT as lowercontrollimit, >| t.UPPERCONTROLLIMIT as uppercontrollimit, >| t.TARGETVALUE as targetvalue, >| t.DEFECTCODE as defectcode, >| t.TESTVALUE as testvalue, >| t.CHARACTERISTICTYPE as characteristictype, >| proctime >| from >| (select >| GUID, >| EQUIPMENT, >| WIPLINE, >| STATION, >| PROCESSID, >| PRODUCTNO, >| PARALIST, >| proctime >| FROM source_kafka_wip_his_all) r >| cross join >
[jira] [Assigned] (FLINK-28986) UNNEST function with nested fails to generate plan
[ https://issues.apache.org/jira/browse/FLINK-28986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he reassigned FLINK-28986: -- Assignee: Jane Chan > UNNEST function with nested fails to generate plan > -- > > Key: FLINK-28986 > URL: https://issues.apache.org/jira/browse/FLINK-28986 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: Jane Chan >Assignee: Jane Chan >Priority: Major > Attachments: image-2022-08-16-14-36-07-061.png > > > h3. How to reproduce > add the following case to TableEnvironmentITCase > {code:scala} > @Test > def debug(): Unit = { > tEnv.executeSql( > s""" >|CREATE TEMPORARY TABLE source_kafka_wip_his_all ( >| GUID varchar, >| OPERATION varchar, >| PRODUCTID varchar, >| LOTNO varchar, >| SERIALNO varchar, >| QUERYSERIALNO varchar, >| SERIALNO1 varchar, >| SERIALNO2 varchar, >| WIPORDERNO varchar, >| WIPORDERTYPE varchar, >| VIRTUALLOT varchar, >| PREOPERATION varchar, >| NORMALPREOPERATION varchar, >| PROCESSID varchar, >| EQUIPMENT varchar, >| INBOUNDDATE varchar, >| OUTBOUNDDATE varchar, >| REWORK varchar, >| REWORKPROCESSID varchar, >| CONTAINER varchar, >| WIPCONTENTCLASSID varchar, >| STATUSCODE varchar, >| WIPSTATUS varchar, >| TESTPROCESSID varchar, >| TESTORDERTYPE varchar, >| TESTORDER varchar, >| TEST varchar, >| SORTINGPROCESSID varchar, >| SORTINGORDERTYPE varchar, >| SORTINGORDER varchar, >| SORTING varchar, >| MINO varchar, >| GROUPCODE varchar, >| HIGHLOWGROUP varchar, >| PRODUCTNO varchar, >| FACILITY varchar, >| WIPLINE varchar, >| CHILDEQUCODE varchar, >| STATION varchar, >| QTY varchar, >| PASS_FLAG varchar, >| DEFECTCODELIST varchar, >| ISFIRST varchar, >| PARALIST ARRAY string,OPERATION string,REWORKPROCESSID string,CHARACTERISTIC > string,CHARACTERISTICREVISION string,CHARACTERISTICTYPE > string,CHARACTERISTICCLASS string,UPPERCONTROLLIMIT string,TARGETVALUE > string,LOWERCONTROLLIMIT string,TESTVALUE string,TESTATTRIBUTE > string,TESTINGSTARTDATE string,TESTFINISHDATE string,UOMCODE > string,DEFECTCODE string,SPECPARAMID string,STATION string,GP_TIME > string,REFERENCEID string,LASTUPDATEON string,LASTUPDATEDBY string,CREATEDON > string,CREATEDBY string,ACTIVE string,LASTDELETEON string,LASTDELETEDBY > string,LASTREACTIVATEON string,LASTREACTIVATEDBY string,ARCHIVEID > string,LASTARCHIVEON string,LASTARCHIVEDBY string,LASTRESTOREON > string,LASTRESTOREDBY string,ROWVERSIONSTAMP string)>, >| REFERENCEID varchar, >| LASTUPDATEON varchar, >| LASTUPDATEDBY varchar, >| CREATEDON varchar, >| CREATEDBY varchar, >| ACTIVE varchar, >| LASTDELETEON varchar, >| LASTDELETEDBY varchar, >| LASTREACTIVATEON varchar, >| LASTREACTIVATEDBY varchar, >| ARCHIVEID varchar, >| LASTARCHIVEON varchar, >| LASTARCHIVEDBY varchar, >| LASTRESTOREON varchar, >| LASTRESTOREDBY varchar, >| ROWVERSIONSTAMP varchar, >| proctime as PROCTIME() >| ) with ( >| 'connector' = 'datagen' >|) >|""".stripMargin) > tEnv.executeSql( > s""" >|create TEMPORARY view transform_main_data as >|select >| r.GUID as wip_his_guid, >| r.EQUIPMENT as equipment, >| r.WIPLINE as wipline, >| r.STATION as station, >| cast(r.PROCESSID as decimal) as processid, >| r.PRODUCTNO as productno, >| t.TESTFINISHDATE as testfinishdate, >| t.OPERATION as operation, >| t.CHARACTERISTIC as characteristic, >| t.LOWERCONTROLLIMIT as lowercontrollimit, >| t.UPPERCONTROLLIMIT as uppercontrollimit, >| t.TARGETVALUE as targetvalue, >| t.DEFECTCODE as defectcode, >| t.TESTVALUE as testvalue, >| t.CHARACTERISTICTYPE as characteristictype, >| proctime >| from >| (select >| GUID, >| EQUIPMENT, >| WIPLINE, >| STATION, >| PROCESSID, >| PRODUCTNO, >| PARALIST, >| proctime >| FROM source_kafka_wip_his_all) r >| cross join >| unnest(PARALIST) as t > (GUID,WIP_HIS_GUID,QUERYSERIALNO,OPERATION,REWORK
[jira] [Updated] (FLINK-28981) Release Testing: Verify FLIP-245 sources speculative execution
[ https://issues.apache.org/jira/browse/FLINK-28981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-28981: Description: Speculative execution is introduced in Flink 1.16 to deal with temporary slow tasks caused by slow nodes. This feature currently consists of 4 FLIPs: - FLIP-168: Speculative Execution core part - FLIP-224: Blocklist Mechanism - FLIP-245: Source Supports Speculative Execution - FLIP-249: Flink Web UI Enhancement for Speculative Execution This ticket aims for verifying FLIP-245, along with FLIP-168, FLIP-224 and FLIP-249. More details about this feature and how to use it can be found in this documentation [PR|https://github.com/apache/flink/pull/20507]. To do the verification, the process can be: - Write Flink jobs which has some {{source}} subtasks running much slower than others. 3 kinds of sources should be verified, including -- [Source functions|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java] -- [InputFormat sources|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java] -- [FLIP-27 new sources|https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java] - Modify Flink configuration file to enable speculative execution and tune the configuration as you like - Submit the job. Checking the web UI, logs, metrics and produced result. was: Speculative execution is introduced in Flink 1.16 to deal with temporary slow tasks caused by slow nodes. This feature currently consists of 4 FLIPs: - FLIP-168: Speculative Execution core part - FLIP-224: Blocklist Mechanism - FLIP-245: Source Supports Speculative Execution - FLIP-249: Flink Web UI Enhancement for Speculative Execution This ticket aims for verifying FLIP-245, along with FLIP-168, FLIP-224 and FLIP-249. More details about this feature and how to use it can be found in this documentation [PR|https://github.com/apache/flink/pull/20507]. To do the verification, the process can be: - Write Flink jobs which has some {{source}} subtasks running much slower than others. 3 kinds of sources should be verified, including - [Source functions|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java] - [InputFormat sources|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java] - [FLIP-27 new sources|https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java] - Modify Flink configuration file to enable speculative execution and tune the configuration as you like - Submit the job. Checking the web UI, logs, metrics and produced result. > Release Testing: Verify FLIP-245 sources speculative execution > -- > > Key: FLINK-28981 > URL: https://issues.apache.org/jira/browse/FLINK-28981 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common, Runtime / Coordination >Reporter: Zhu Zhu >Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > Speculative execution is introduced in Flink 1.16 to deal with temporary slow > tasks caused by slow nodes. This feature currently consists of 4 FLIPs: > - FLIP-168: Speculative Execution core part > - FLIP-224: Blocklist Mechanism > - FLIP-245: Source Supports Speculative Execution > - FLIP-249: Flink Web UI Enhancement for Speculative Execution > This ticket aims for verifying FLIP-245, along with FLIP-168, FLIP-224 and > FLIP-249. > More details about this feature and how to use it can be found in this > documentation [PR|https://github.com/apache/flink/pull/20507]. > To do the verification, the process can be: > - Write Flink jobs which has some {{source}} subtasks running much slower > than others. 3 kinds of sources should be verified, including > -- [Source > functions|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java] > -- [InputFormat > sources|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java] > -- [FLIP-27 new > sources|https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java] > - Modify Flink configuration file to enable speculative execution and tune > the configuration as you like > - Submit the job. Checking the web UI, logs, metrics and produced result. -- This message was s
[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution
[ https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-28980: Labels: release-testing (was: test-stability) > Release Testing: Verify FLIP-168 speculative execution > -- > > Key: FLINK-28980 > URL: https://issues.apache.org/jira/browse/FLINK-28980 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Zhu Zhu >Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > Speculative execution is introduced in Flink 1.16 to deal with temporary slow > tasks caused by slow nodes. This feature currently consists of 4 FLIPs: > - FLIP-168: Speculative Execution core part > - FLIP-224: Blocklist Mechanism > - FLIP-245: Source Supports Speculative Execution > - FLIP-249: Flink Web UI Enhancement for Speculative Execution > This ticket aims for verifying FLIP-168, along with FLIP-224 and FLIP-249. > More details about this feature and how to use it can be found in this > documentation [PR|https://github.com/apache/flink/pull/20507]. > To do the verification, the process can be: > - Write a Flink job which has a subtask running much slower than others > (e.g. sleep indefinitely if it runs on a certain host, the hostname can be > retrieved via InetAddress.getLocalHost().getHostName(), or if its > (subtaskIndex + attemptNumer) % 2 == 0) > - Modify Flink configuration file to enable speculative execution and tune > the configuration as you like > - Submit the job. Checking the web UI, logs, metrics and produced result. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28981) Release Testing: Verify FLIP-245 sources speculative execution
[ https://issues.apache.org/jira/browse/FLINK-28981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-28981: Description: Speculative execution is introduced in Flink 1.16 to deal with temporary slow tasks caused by slow nodes. This feature currently consists of 4 FLIPs: - FLIP-168: Speculative Execution core part - FLIP-224: Blocklist Mechanism - FLIP-245: Source Supports Speculative Execution - FLIP-249: Flink Web UI Enhancement for Speculative Execution This ticket aims for verifying FLIP-245, along with FLIP-168, FLIP-224 and FLIP-249. More details about this feature and how to use it can be found in this documentation [PR|https://github.com/apache/flink/pull/20507]. To do the verification, the process can be: - Write Flink jobs which has some {{source}} subtasks running much slower than others. 3 kinds of sources should be verified, including - [Source functions|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java] - [InputFormat sources|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java] - [FLIP-27 new sources|https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java] - Modify Flink configuration file to enable speculative execution and tune the configuration as you like - Submit the job. Checking the web UI, logs, metrics and produced result. was: Speculative execution is introduced in Flink 1.16 to deal with temporary slow tasks caused by slow nodes. This feature currently consists of 4 FLIPs: - FLIP-168: Speculative Execution core part - FLIP-224: Blocklist Mechanism - FLIP-245: Source Supports Speculative Execution - FLIP-249: Flink Web UI Enhancement for Speculative Execution This ticket aims for verifying FLIP-245, along with FLIP-168, FLIP-224 and FLIP-249. More details about this feature and how to use it can be found in this documentation [PR|https://github.com/apache/flink/pull/20507]. To do the verification, the process can be: - Write Flink jobs which has some {{source}} subtasks running much slower than others. 3 kinds of sources should be verified, including - [Source functions|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java] - [InputFormat sources|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java] - [FLIP-27 new sources|https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java] - Modify Flink configuration file to enable speculative execution and tune the configuration as you like - Submit the job. Checking the web UI, logs, metrics and produced result. > Release Testing: Verify FLIP-245 sources speculative execution > -- > > Key: FLINK-28981 > URL: https://issues.apache.org/jira/browse/FLINK-28981 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common, Runtime / Coordination >Reporter: Zhu Zhu >Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > Speculative execution is introduced in Flink 1.16 to deal with temporary slow > tasks caused by slow nodes. This feature currently consists of 4 FLIPs: > - FLIP-168: Speculative Execution core part > - FLIP-224: Blocklist Mechanism > - FLIP-245: Source Supports Speculative Execution > - FLIP-249: Flink Web UI Enhancement for Speculative Execution > This ticket aims for verifying FLIP-245, along with FLIP-168, FLIP-224 and > FLIP-249. > More details about this feature and how to use it can be found in this > documentation [PR|https://github.com/apache/flink/pull/20507]. > To do the verification, the process can be: > - Write Flink jobs which has some {{source}} subtasks running much slower > than others. 3 kinds of sources should be verified, including > - [Source > functions|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java] > - [InputFormat > sources|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java] > - [FLIP-27 new > sources|https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java] > - Modify Flink configuration file to enable speculative execution and tune > the configuration as you like > - Submit the job. Checking the web UI, logs, metrics and produced result. -- This message was sent by Atlas
[jira] [Updated] (FLINK-28981) Release Testing: Verify FLIP-245 sources speculative execution
[ https://issues.apache.org/jira/browse/FLINK-28981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-28981: Labels: release-testing (was: test-stability) > Release Testing: Verify FLIP-245 sources speculative execution > -- > > Key: FLINK-28981 > URL: https://issues.apache.org/jira/browse/FLINK-28981 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common, Runtime / Coordination >Reporter: Zhu Zhu >Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > Speculative execution is introduced in Flink 1.16 to deal with temporary slow > tasks caused by slow nodes. This feature currently consists of 4 FLIPs: > - FLIP-168: Speculative Execution core part > - FLIP-224: Blocklist Mechanism > - FLIP-245: Source Supports Speculative Execution > - FLIP-249: Flink Web UI Enhancement for Speculative Execution > This ticket aims for verifying FLIP-245, along with FLIP-168, FLIP-224 and > FLIP-249. > More details about this feature and how to use it can be found in this > documentation [PR|https://github.com/apache/flink/pull/20507]. > To do the verification, the process can be: > - Write Flink jobs which has some {{source}} subtasks running much slower > than others. 3 kinds of sources should be verified, including >- [Source > functions|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java] >- [InputFormat > sources|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java] >- [FLIP-27 new > sources|https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java] > - Modify Flink configuration file to enable speculative execution and tune > the configuration as you like > - Submit the job. Checking the web UI, logs, metrics and produced result. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-28986) UNNEST function with nested fails to generate plan
[ https://issues.apache.org/jira/browse/FLINK-28986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580080#comment-17580080 ] Jane Chan edited comment on FLINK-28986 at 8/16/22 6:37 AM: The reason is that during the decorrelation rewrite, the top filter rel node was pushed down to form a nested filter pattern. Since the filter merge rule is not defined in the default rewrite rule sets, the nested filter rel nodes rendered the LogicalUnnestRule unmatched. This can be fixed by adding CoreRules.FILTER_MERGE to FlinkStreamRuleSets. cc [~godfrey] !image-2022-08-16-14-36-07-061.png|width=1047,height=177! was (Author: qingyue): The reason is that during the decorrelation rewrite, the top filter rel node was pushed down to form a nested filter pattern. Since the filter merge rule is not defined in the default rewrite rule sets, the nested filter rel nodes rendered the LogicalUnnestRule unmatched. This can be fixed by adding CoreRules.FILTER_MERGE to FlinkStreamRuleSets. !image-2022-08-16-14-36-07-061.png|width=1047,height=177! > UNNEST function with nested fails to generate plan > -- > > Key: FLINK-28986 > URL: https://issues.apache.org/jira/browse/FLINK-28986 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: Jane Chan >Priority: Major > Attachments: image-2022-08-16-14-36-07-061.png > > > h3. How to reproduce > add the following case to TableEnvironmentITCase > {code:scala} > @Test > def debug(): Unit = { > tEnv.executeSql( > s""" >|CREATE TEMPORARY TABLE source_kafka_wip_his_all ( >| GUID varchar, >| OPERATION varchar, >| PRODUCTID varchar, >| LOTNO varchar, >| SERIALNO varchar, >| QUERYSERIALNO varchar, >| SERIALNO1 varchar, >| SERIALNO2 varchar, >| WIPORDERNO varchar, >| WIPORDERTYPE varchar, >| VIRTUALLOT varchar, >| PREOPERATION varchar, >| NORMALPREOPERATION varchar, >| PROCESSID varchar, >| EQUIPMENT varchar, >| INBOUNDDATE varchar, >| OUTBOUNDDATE varchar, >| REWORK varchar, >| REWORKPROCESSID varchar, >| CONTAINER varchar, >| WIPCONTENTCLASSID varchar, >| STATUSCODE varchar, >| WIPSTATUS varchar, >| TESTPROCESSID varchar, >| TESTORDERTYPE varchar, >| TESTORDER varchar, >| TEST varchar, >| SORTINGPROCESSID varchar, >| SORTINGORDERTYPE varchar, >| SORTINGORDER varchar, >| SORTING varchar, >| MINO varchar, >| GROUPCODE varchar, >| HIGHLOWGROUP varchar, >| PRODUCTNO varchar, >| FACILITY varchar, >| WIPLINE varchar, >| CHILDEQUCODE varchar, >| STATION varchar, >| QTY varchar, >| PASS_FLAG varchar, >| DEFECTCODELIST varchar, >| ISFIRST varchar, >| PARALIST ARRAY string,OPERATION string,REWORKPROCESSID string,CHARACTERISTIC > string,CHARACTERISTICREVISION string,CHARACTERISTICTYPE > string,CHARACTERISTICCLASS string,UPPERCONTROLLIMIT string,TARGETVALUE > string,LOWERCONTROLLIMIT string,TESTVALUE string,TESTATTRIBUTE > string,TESTINGSTARTDATE string,TESTFINISHDATE string,UOMCODE > string,DEFECTCODE string,SPECPARAMID string,STATION string,GP_TIME > string,REFERENCEID string,LASTUPDATEON string,LASTUPDATEDBY string,CREATEDON > string,CREATEDBY string,ACTIVE string,LASTDELETEON string,LASTDELETEDBY > string,LASTREACTIVATEON string,LASTREACTIVATEDBY string,ARCHIVEID > string,LASTARCHIVEON string,LASTARCHIVEDBY string,LASTRESTOREON > string,LASTRESTOREDBY string,ROWVERSIONSTAMP string)>, >| REFERENCEID varchar, >| LASTUPDATEON varchar, >| LASTUPDATEDBY varchar, >| CREATEDON varchar, >| CREATEDBY varchar, >| ACTIVE varchar, >| LASTDELETEON varchar, >| LASTDELETEDBY varchar, >| LASTREACTIVATEON varchar, >| LASTREACTIVATEDBY varchar, >| ARCHIVEID varchar, >| LASTARCHIVEON varchar, >| LASTARCHIVEDBY varchar, >| LASTRESTOREON varchar, >| LASTRESTOREDBY varchar, >| ROWVERSIONSTAMP varchar, >| proctime as PROCTIME() >| ) with ( >| 'connector' = 'datagen' >|) >|""".stripMargin) > tEnv.executeSql( > s""" >|create TEMPORARY view transform_main_data as >|select >| r.GUID as wip_his_guid, >| r.EQUIPMENT as equipment, >| r.WIPLINE as wipline, >| r.STATION as station, >| cast(r.PROCESSID as decimal) as processid, >|
[jira] [Commented] (FLINK-28986) UNNEST function with nested fails to generate plan
[ https://issues.apache.org/jira/browse/FLINK-28986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580080#comment-17580080 ] Jane Chan commented on FLINK-28986: --- The reason is that during the decorrelation rewrite, the top filter rel node was pushed down to form a nested filter pattern. Since the filter merge rule is not defined in the default rewrite rule sets, the nested filter rel nodes rendered the LogicalUnnestRule unmatched. This can be fixed by adding CoreRules.FILTER_MERGE to FlinkStreamRuleSets. !image-2022-08-16-14-36-07-061.png|width=1047,height=177! > UNNEST function with nested fails to generate plan > -- > > Key: FLINK-28986 > URL: https://issues.apache.org/jira/browse/FLINK-28986 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: Jane Chan >Priority: Major > Attachments: image-2022-08-16-14-36-07-061.png > > > h3. How to reproduce > add the following case to TableEnvironmentITCase > {code:scala} > @Test > def debug(): Unit = { > tEnv.executeSql( > s""" >|CREATE TEMPORARY TABLE source_kafka_wip_his_all ( >| GUID varchar, >| OPERATION varchar, >| PRODUCTID varchar, >| LOTNO varchar, >| SERIALNO varchar, >| QUERYSERIALNO varchar, >| SERIALNO1 varchar, >| SERIALNO2 varchar, >| WIPORDERNO varchar, >| WIPORDERTYPE varchar, >| VIRTUALLOT varchar, >| PREOPERATION varchar, >| NORMALPREOPERATION varchar, >| PROCESSID varchar, >| EQUIPMENT varchar, >| INBOUNDDATE varchar, >| OUTBOUNDDATE varchar, >| REWORK varchar, >| REWORKPROCESSID varchar, >| CONTAINER varchar, >| WIPCONTENTCLASSID varchar, >| STATUSCODE varchar, >| WIPSTATUS varchar, >| TESTPROCESSID varchar, >| TESTORDERTYPE varchar, >| TESTORDER varchar, >| TEST varchar, >| SORTINGPROCESSID varchar, >| SORTINGORDERTYPE varchar, >| SORTINGORDER varchar, >| SORTING varchar, >| MINO varchar, >| GROUPCODE varchar, >| HIGHLOWGROUP varchar, >| PRODUCTNO varchar, >| FACILITY varchar, >| WIPLINE varchar, >| CHILDEQUCODE varchar, >| STATION varchar, >| QTY varchar, >| PASS_FLAG varchar, >| DEFECTCODELIST varchar, >| ISFIRST varchar, >| PARALIST ARRAY string,OPERATION string,REWORKPROCESSID string,CHARACTERISTIC > string,CHARACTERISTICREVISION string,CHARACTERISTICTYPE > string,CHARACTERISTICCLASS string,UPPERCONTROLLIMIT string,TARGETVALUE > string,LOWERCONTROLLIMIT string,TESTVALUE string,TESTATTRIBUTE > string,TESTINGSTARTDATE string,TESTFINISHDATE string,UOMCODE > string,DEFECTCODE string,SPECPARAMID string,STATION string,GP_TIME > string,REFERENCEID string,LASTUPDATEON string,LASTUPDATEDBY string,CREATEDON > string,CREATEDBY string,ACTIVE string,LASTDELETEON string,LASTDELETEDBY > string,LASTREACTIVATEON string,LASTREACTIVATEDBY string,ARCHIVEID > string,LASTARCHIVEON string,LASTARCHIVEDBY string,LASTRESTOREON > string,LASTRESTOREDBY string,ROWVERSIONSTAMP string)>, >| REFERENCEID varchar, >| LASTUPDATEON varchar, >| LASTUPDATEDBY varchar, >| CREATEDON varchar, >| CREATEDBY varchar, >| ACTIVE varchar, >| LASTDELETEON varchar, >| LASTDELETEDBY varchar, >| LASTREACTIVATEON varchar, >| LASTREACTIVATEDBY varchar, >| ARCHIVEID varchar, >| LASTARCHIVEON varchar, >| LASTARCHIVEDBY varchar, >| LASTRESTOREON varchar, >| LASTRESTOREDBY varchar, >| ROWVERSIONSTAMP varchar, >| proctime as PROCTIME() >| ) with ( >| 'connector' = 'datagen' >|) >|""".stripMargin) > tEnv.executeSql( > s""" >|create TEMPORARY view transform_main_data as >|select >| r.GUID as wip_his_guid, >| r.EQUIPMENT as equipment, >| r.WIPLINE as wipline, >| r.STATION as station, >| cast(r.PROCESSID as decimal) as processid, >| r.PRODUCTNO as productno, >| t.TESTFINISHDATE as testfinishdate, >| t.OPERATION as operation, >| t.CHARACTERISTIC as characteristic, >| t.LOWERCONTROLLIMIT as lowercontrollimit, >| t.UPPERCONTROLLIMIT as uppercontrollimit, >| t.TARGETVALUE as targetvalue, >| t.DEFECTCODE as defectcode, >| t.TESTVALUE as testvalue, >| t.CHARACTERISTICTYPE as characteristictype, >|
[jira] [Updated] (FLINK-28986) UNNEST function with nested fails to generate plan
[ https://issues.apache.org/jira/browse/FLINK-28986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jane Chan updated FLINK-28986: -- Attachment: image-2022-08-16-14-36-07-061.png > UNNEST function with nested fails to generate plan > -- > > Key: FLINK-28986 > URL: https://issues.apache.org/jira/browse/FLINK-28986 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: Jane Chan >Priority: Major > Attachments: image-2022-08-16-14-36-07-061.png > > > h3. How to reproduce > add the following case to TableEnvironmentITCase > {code:scala} > @Test > def debug(): Unit = { > tEnv.executeSql( > s""" >|CREATE TEMPORARY TABLE source_kafka_wip_his_all ( >| GUID varchar, >| OPERATION varchar, >| PRODUCTID varchar, >| LOTNO varchar, >| SERIALNO varchar, >| QUERYSERIALNO varchar, >| SERIALNO1 varchar, >| SERIALNO2 varchar, >| WIPORDERNO varchar, >| WIPORDERTYPE varchar, >| VIRTUALLOT varchar, >| PREOPERATION varchar, >| NORMALPREOPERATION varchar, >| PROCESSID varchar, >| EQUIPMENT varchar, >| INBOUNDDATE varchar, >| OUTBOUNDDATE varchar, >| REWORK varchar, >| REWORKPROCESSID varchar, >| CONTAINER varchar, >| WIPCONTENTCLASSID varchar, >| STATUSCODE varchar, >| WIPSTATUS varchar, >| TESTPROCESSID varchar, >| TESTORDERTYPE varchar, >| TESTORDER varchar, >| TEST varchar, >| SORTINGPROCESSID varchar, >| SORTINGORDERTYPE varchar, >| SORTINGORDER varchar, >| SORTING varchar, >| MINO varchar, >| GROUPCODE varchar, >| HIGHLOWGROUP varchar, >| PRODUCTNO varchar, >| FACILITY varchar, >| WIPLINE varchar, >| CHILDEQUCODE varchar, >| STATION varchar, >| QTY varchar, >| PASS_FLAG varchar, >| DEFECTCODELIST varchar, >| ISFIRST varchar, >| PARALIST ARRAY string,OPERATION string,REWORKPROCESSID string,CHARACTERISTIC > string,CHARACTERISTICREVISION string,CHARACTERISTICTYPE > string,CHARACTERISTICCLASS string,UPPERCONTROLLIMIT string,TARGETVALUE > string,LOWERCONTROLLIMIT string,TESTVALUE string,TESTATTRIBUTE > string,TESTINGSTARTDATE string,TESTFINISHDATE string,UOMCODE > string,DEFECTCODE string,SPECPARAMID string,STATION string,GP_TIME > string,REFERENCEID string,LASTUPDATEON string,LASTUPDATEDBY string,CREATEDON > string,CREATEDBY string,ACTIVE string,LASTDELETEON string,LASTDELETEDBY > string,LASTREACTIVATEON string,LASTREACTIVATEDBY string,ARCHIVEID > string,LASTARCHIVEON string,LASTARCHIVEDBY string,LASTRESTOREON > string,LASTRESTOREDBY string,ROWVERSIONSTAMP string)>, >| REFERENCEID varchar, >| LASTUPDATEON varchar, >| LASTUPDATEDBY varchar, >| CREATEDON varchar, >| CREATEDBY varchar, >| ACTIVE varchar, >| LASTDELETEON varchar, >| LASTDELETEDBY varchar, >| LASTREACTIVATEON varchar, >| LASTREACTIVATEDBY varchar, >| ARCHIVEID varchar, >| LASTARCHIVEON varchar, >| LASTARCHIVEDBY varchar, >| LASTRESTOREON varchar, >| LASTRESTOREDBY varchar, >| ROWVERSIONSTAMP varchar, >| proctime as PROCTIME() >| ) with ( >| 'connector' = 'datagen' >|) >|""".stripMargin) > tEnv.executeSql( > s""" >|create TEMPORARY view transform_main_data as >|select >| r.GUID as wip_his_guid, >| r.EQUIPMENT as equipment, >| r.WIPLINE as wipline, >| r.STATION as station, >| cast(r.PROCESSID as decimal) as processid, >| r.PRODUCTNO as productno, >| t.TESTFINISHDATE as testfinishdate, >| t.OPERATION as operation, >| t.CHARACTERISTIC as characteristic, >| t.LOWERCONTROLLIMIT as lowercontrollimit, >| t.UPPERCONTROLLIMIT as uppercontrollimit, >| t.TARGETVALUE as targetvalue, >| t.DEFECTCODE as defectcode, >| t.TESTVALUE as testvalue, >| t.CHARACTERISTICTYPE as characteristictype, >| proctime >| from >| (select >| GUID, >| EQUIPMENT, >| WIPLINE, >| STATION, >| PROCESSID, >| PRODUCTNO, >| PARALIST, >| proctime >| FROM source_kafka_wip_his_all) r >| cross join >| unnest(PARALIST) as t > (GUID,WIP_HIS_GUID,QUERYSERIALNO,OPERATION,REWORKPROCESSID,CHARAC
[jira] [Commented] (FLINK-28912) Add Part of "Who Use Flink" In ReadMe file and https://flink.apache.org/
[ https://issues.apache.org/jira/browse/FLINK-28912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580077#comment-17580077 ] Zhou Yao commented on FLINK-28912: -- Great~,thanks > Add Part of "Who Use Flink" In ReadMe file and https://flink.apache.org/ > - > > Key: FLINK-28912 > URL: https://issues.apache.org/jira/browse/FLINK-28912 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Zhou Yao >Priority: Minor > Labels: doc > Attachments: image-2022-08-10-20-15-10-418.png > > > May be ,we can learn from website of [Apache > Kylin|https://kylin.apache.org/], add part of "Who Use Flink" in Readme or > website. This can make Flink more frendly > !image-2022-08-10-20-15-10-418.png|width=147,height=99! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28986) UNNEST function with nested fails to generate plan
[ https://issues.apache.org/jira/browse/FLINK-28986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jane Chan updated FLINK-28986: -- Description: h3. How to reproduce add the following case to TableEnvironmentITCase {code:scala} @Test def debug(): Unit = { tEnv.executeSql( s""" |CREATE TEMPORARY TABLE source_kafka_wip_his_all ( | GUID varchar, | OPERATION varchar, | PRODUCTID varchar, | LOTNO varchar, | SERIALNO varchar, | QUERYSERIALNO varchar, | SERIALNO1 varchar, | SERIALNO2 varchar, | WIPORDERNO varchar, | WIPORDERTYPE varchar, | VIRTUALLOT varchar, | PREOPERATION varchar, | NORMALPREOPERATION varchar, | PROCESSID varchar, | EQUIPMENT varchar, | INBOUNDDATE varchar, | OUTBOUNDDATE varchar, | REWORK varchar, | REWORKPROCESSID varchar, | CONTAINER varchar, | WIPCONTENTCLASSID varchar, | STATUSCODE varchar, | WIPSTATUS varchar, | TESTPROCESSID varchar, | TESTORDERTYPE varchar, | TESTORDER varchar, | TEST varchar, | SORTINGPROCESSID varchar, | SORTINGORDERTYPE varchar, | SORTINGORDER varchar, | SORTING varchar, | MINO varchar, | GROUPCODE varchar, | HIGHLOWGROUP varchar, | PRODUCTNO varchar, | FACILITY varchar, | WIPLINE varchar, | CHILDEQUCODE varchar, | STATION varchar, | QTY varchar, | PASS_FLAG varchar, | DEFECTCODELIST varchar, | ISFIRST varchar, | PARALIST ARRAY, | REFERENCEID varchar, | LASTUPDATEON varchar, | LASTUPDATEDBY varchar, | CREATEDON varchar, | CREATEDBY varchar, | ACTIVE varchar, | LASTDELETEON varchar, | LASTDELETEDBY varchar, | LASTREACTIVATEON varchar, | LASTREACTIVATEDBY varchar, | ARCHIVEID varchar, | LASTARCHIVEON varchar, | LASTARCHIVEDBY varchar, | LASTRESTOREON varchar, | LASTRESTOREDBY varchar, | ROWVERSIONSTAMP varchar, | proctime as PROCTIME() | ) with ( | 'connector' = 'datagen' |) |""".stripMargin) tEnv.executeSql( s""" |create TEMPORARY view transform_main_data as |select | r.GUID as wip_his_guid, | r.EQUIPMENT as equipment, | r.WIPLINE as wipline, | r.STATION as station, | cast(r.PROCESSID as decimal) as processid, | r.PRODUCTNO as productno, | t.TESTFINISHDATE as testfinishdate, | t.OPERATION as operation, | t.CHARACTERISTIC as characteristic, | t.LOWERCONTROLLIMIT as lowercontrollimit, | t.UPPERCONTROLLIMIT as uppercontrollimit, | t.TARGETVALUE as targetvalue, | t.DEFECTCODE as defectcode, | t.TESTVALUE as testvalue, | t.CHARACTERISTICTYPE as characteristictype, | proctime | from | (select | GUID, | EQUIPMENT, | WIPLINE, | STATION, | PROCESSID, | PRODUCTNO, | PARALIST, | proctime | FROM source_kafka_wip_his_all) r | cross join | unnest(PARALIST) as t (GUID,WIP_HIS_GUID,QUERYSERIALNO,OPERATION,REWORKPROCESSID,CHARACTERISTIC,CHARACTERISTICREVISION,CHARACTERISTICTYPE,CHARACTERISTICCLASS,UPPERCONTROLLIMIT,TARGETVALUE,LOWERCONTROLLIMIT,TESTVALUE,TESTATTRIBUTE,TESTINGSTARTDATE,TESTFINISHDATE,UOMCODE,DEFECTCODE,SPECPARAMID,STATION,GP_TIME,REFERENCEID,LASTUPDATEON,LASTUPDATEDBY,CREATEDON,CREATEDBY,ACTIVE,LASTDELETEON,LASTDELETEDBY,LASTREACTIVATEON,LASTREACTIVATEDBY,ARCHIVEID,LASTARCHIVEON,LASTARCHIVEDBY,LASTRESTOREON,LASTRESTOREDBY,ROWVERSIONSTAMP) | where t.CHARACTERISTICTYPE = '2' |""".stripMargin) tEnv.executeSql( s""" |explain plan for |select * from transform_main_data |where operation not in ('G1208','G1209','G1211','G1213','G1206','G1207','G1214','G1215','G1282','G1292','G1216') |""".stripMargin).print() } {code} Stacktrace {code:java} org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: LogicalProject(inputs=[0..3], exprs=[[CAST($4):DECIMAL(10, 0), $5, $23, $11, $13, $19, $17, $18, $25, $20, $15, $7]]) +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{6}]) :- LogicalProject(inputs=[0], exprs=[[$14, $36, $38, $13, $34, $43, PROCTIME()]]) : +- LogicalTableScan(table=[[default_catalog, default_database, source_kafka_wip_his_all]]) +- LogicalFilter(condition=[AND(SEARCH($7, Sarg[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), SEARCH($3, Sarg[(-∞
[jira] [Assigned] (FLINK-28939) Release Testing: Verify FLIP-241 ANALYZE TABLE
[ https://issues.apache.org/jira/browse/FLINK-28939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he reassigned FLINK-28939: -- Assignee: Yunhong Zheng > Release Testing: Verify FLIP-241 ANALYZE TABLE > -- > > Key: FLINK-28939 > URL: https://issues.apache.org/jira/browse/FLINK-28939 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API, Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: godfrey he >Assignee: Yunhong Zheng >Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > This issue aims to verify FLIP-240: > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386481 > We can verify it in SQL client after we build the flink-dist package. > 1. create a partition table and a non-partition table (with/without compute > column/metadata column, with different columns), and then insert some data > 2. verify the different statements, please refer to the FLIP doc examples > 3. verify the result in catalog. Currently, {{describe extended}} statement > does not support show the statistics in catalog, we should write some code to > get the statistics from catalog, or we can use hive cli if the catalog is > hive catalog > 4. verify the unsupported cases, > 4.1 analyze non-existed table > 4.2 analyze view > 4.3 analyze a partition table with non-existed partition > 4.4. analyze a non-partition table with a partition > 4.5. analyze a non-existed column > 4.6. analyze a computed column > 4.6. analyze a metadata column -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28986) UNNEST function with nested fails to generate plan
Jane Chan created FLINK-28986: - Summary: UNNEST function with nested fails to generate plan Key: FLINK-28986 URL: https://issues.apache.org/jira/browse/FLINK-28986 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.16.0 Reporter: Jane Chan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] tsreaper opened a new pull request, #267: [hotfix] Fix github workflow build-different-versions.yaml
tsreaper opened a new pull request, #267: URL: https://github.com/apache/flink-table-store/pull/267 Fix syntax of `build-different-versions.yaml`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27199) Bump Pulsar to 2.10.0 for fixing the unstable Pulsar test environment.
[ https://issues.apache.org/jira/browse/FLINK-27199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zili Chen updated FLINK-27199: -- Labels: pull-request-available (was: pull-request-available stale-assigned) > Bump Pulsar to 2.10.0 for fixing the unstable Pulsar test environment. > -- > > Key: FLINK-27199 > URL: https://issues.apache.org/jira/browse/FLINK-27199 > Project: Flink > Issue Type: Improvement > Components: Connectors / Pulsar >Affects Versions: 1.14.4, 1.15.0, 1.16.0 >Reporter: Yufan Sheng >Assignee: Yufan Sheng >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0, 1.15.2 > > > Pulsar's transaction is not stable. The standalone cluster often hangs the > test, then we will meet a timeout for the tests at last. > The latest Pulsar 2.10.0 drops the zookeeper and fixes a lot of issues in the > Pulsar transaction. Bump to this version would resolve the current test > issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails
[ https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reassigned FLINK-28927: Assignee: ChangjiGuo > Can not clean up the uploaded shared files when the checkpoint fails > > > Key: FLINK-28927 > URL: https://issues.apache.org/jira/browse/FLINK-28927 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.11.6, 1.15.1 > Environment: Flink-1.11 >Reporter: ChangjiGuo >Assignee: ChangjiGuo >Priority: Major > > If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable > thread and do some cleanup work, including the following: > * Cancel all AsyncSnapshotCallable thread. > * If the thread has finished, it will clean up all state object. > * If the thread has not completed, it will be interrupted(maybe). > * Close snapshotCloseableRegistry. > In my case, the thread was interrupted while waiting for the file upload to > complete, but the file was not cleaned up. > RocksDBStateUploader.java > {code:java} > FutureUtils.waitForAll(futures.values()).get(); > {code} > It will wait for all files to be uploaded here. Although it has been > interrupted, the uploaded files will not be cleaned up. The remaining files > are mainly divided into: > * Files that have finished uploading before the thread is canceled. > * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry > has not been closed. > How to reproduce? > Shorten the checkpoint timeout time, making the checkpoint fail. Then check > if there are any files in the shared directory. > I'm testing on Flink-1.11, but I found the code from the latest branch may > have the same problem. I tried to fix it and it works well so far. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails
[ https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580068#comment-17580068 ] Yun Tang commented on FLINK-28927: -- [~ChangjiGuo] It seems you did not give a clear idea of how to solve the problem of cleaning up interrupted uploaded files. I could assign the task to you first, and hope you could give clear design then. > Can not clean up the uploaded shared files when the checkpoint fails > > > Key: FLINK-28927 > URL: https://issues.apache.org/jira/browse/FLINK-28927 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.11.6, 1.15.1 > Environment: Flink-1.11 >Reporter: ChangjiGuo >Priority: Major > > If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable > thread and do some cleanup work, including the following: > * Cancel all AsyncSnapshotCallable thread. > * If the thread has finished, it will clean up all state object. > * If the thread has not completed, it will be interrupted(maybe). > * Close snapshotCloseableRegistry. > In my case, the thread was interrupted while waiting for the file upload to > complete, but the file was not cleaned up. > RocksDBStateUploader.java > {code:java} > FutureUtils.waitForAll(futures.values()).get(); > {code} > It will wait for all files to be uploaded here. Although it has been > interrupted, the uploaded files will not be cleaned up. The remaining files > are mainly divided into: > * Files that have finished uploading before the thread is canceled. > * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry > has not been closed. > How to reproduce? > Shorten the checkpoint timeout time, making the checkpoint fail. Then check > if there are any files in the shared directory. > I'm testing on Flink-1.11, but I found the code from the latest branch may > have the same problem. I tried to fix it and it works well so far. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-27199) Bump Pulsar to 2.10.0 for fixing the unstable Pulsar test environment.
[ https://issues.apache.org/jira/browse/FLINK-27199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zili Chen closed FLINK-27199. - Resolution: Fixed 1.15 via 65f79130e4b24a2b9d7035bc0751842889729f63 Closed. > Bump Pulsar to 2.10.0 for fixing the unstable Pulsar test environment. > -- > > Key: FLINK-27199 > URL: https://issues.apache.org/jira/browse/FLINK-27199 > Project: Flink > Issue Type: Improvement > Components: Connectors / Pulsar >Affects Versions: 1.14.4, 1.15.0, 1.16.0 >Reporter: Yufan Sheng >Assignee: Yufan Sheng >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.16.0, 1.15.2 > > > Pulsar's transaction is not stable. The standalone cluster often hangs the > test, then we will meet a timeout for the tests at last. > The latest Pulsar 2.10.0 drops the zookeeper and fixes a lot of issues in the > Pulsar transaction. Bump to this version would resolve the current test > issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28967) Fix Hive multi-version support for Table Store
[ https://issues.apache.org/jira/browse/FLINK-28967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated FLINK-28967: - Fix Version/s: (was: table-store-0.3.0) > Fix Hive multi-version support for Table Store > -- > > Key: FLINK-28967 > URL: https://issues.apache.org/jira/browse/FLINK-28967 > Project: Flink > Issue Type: Bug > Components: Table Store >Affects Versions: table-store-0.2.0, table-store-0.3.0 >Reporter: Caizhi Weng >Assignee: Caizhi Weng >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.2.0 > > > Currently table store Hive catalog and connector can only work for Hive 2.3. > Support for Hive 2.1 and 2.2 should be fixed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-27199) Bump Pulsar to 2.10.0 for fixing the unstable Pulsar test environment.
[ https://issues.apache.org/jira/browse/FLINK-27199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zili Chen updated FLINK-27199: -- Fix Version/s: (was: 1.14.6) > Bump Pulsar to 2.10.0 for fixing the unstable Pulsar test environment. > -- > > Key: FLINK-27199 > URL: https://issues.apache.org/jira/browse/FLINK-27199 > Project: Flink > Issue Type: Improvement > Components: Connectors / Pulsar >Affects Versions: 1.14.4, 1.15.0, 1.16.0 >Reporter: Yufan Sheng >Assignee: Yufan Sheng >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.16.0, 1.15.2 > > > Pulsar's transaction is not stable. The standalone cluster often hangs the > test, then we will meet a timeout for the tests at last. > The latest Pulsar 2.10.0 drops the zookeeper and fixes a lot of issues in the > Pulsar transaction. Bump to this version would resolve the current test > issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-28967) Fix Hive multi-version support for Table Store
[ https://issues.apache.org/jira/browse/FLINK-28967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-28967. Assignee: Caizhi Weng Resolution: Fixed master: 6652905c4c0ff7f2533353c2d7d3d6552eb978e3 release-0.2: cb5a24e4fc804df4ea746182ea28d03e1866971c > Fix Hive multi-version support for Table Store > -- > > Key: FLINK-28967 > URL: https://issues.apache.org/jira/browse/FLINK-28967 > Project: Flink > Issue Type: Bug > Components: Table Store >Affects Versions: table-store-0.2.0, table-store-0.3.0 >Reporter: Caizhi Weng >Assignee: Caizhi Weng >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.2.0, table-store-0.3.0 > > > Currently table store Hive catalog and connector can only work for Hive 2.3. > Support for Hive 2.1 and 2.2 should be fixed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] tisonkun commented on pull request #20582: [BK-1.15][FLINK-27199][Connector/Pulsar] Bump pulsar to 2.10.0
tisonkun commented on PR #20582: URL: https://github.com/apache/flink/pull/20582#issuecomment-1216187797 Merging... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] tisonkun merged pull request #20582: [BK-1.15][FLINK-27199][Connector/Pulsar] Bump pulsar to 2.10.0
tisonkun merged PR #20582: URL: https://github.com/apache/flink/pull/20582 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi merged pull request #266: [FLINK-28967] Fix Hive multi-version support for Table Store
JingsongLi merged PR #266: URL: https://github.com/apache/flink-table-store/pull/266 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28981) Release Testing: Verify FLIP-245 sources speculative execution
[ https://issues.apache.org/jira/browse/FLINK-28981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he updated FLINK-28981: --- Priority: Blocker (was: Major) > Release Testing: Verify FLIP-245 sources speculative execution > -- > > Key: FLINK-28981 > URL: https://issues.apache.org/jira/browse/FLINK-28981 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common, Runtime / Coordination >Reporter: Zhu Zhu >Priority: Blocker > Labels: test-stability > Fix For: 1.16.0 > > > Speculative execution is introduced in Flink 1.16 to deal with temporary slow > tasks caused by slow nodes. This feature currently consists of 4 FLIPs: > - FLIP-168: Speculative Execution core part > - FLIP-224: Blocklist Mechanism > - FLIP-245: Source Supports Speculative Execution > - FLIP-249: Flink Web UI Enhancement for Speculative Execution > This ticket aims for verifying FLIP-245, along with FLIP-168, FLIP-224 and > FLIP-249. > More details about this feature and how to use it can be found in this > documentation [PR|https://github.com/apache/flink/pull/20507]. > To do the verification, the process can be: > - Write Flink jobs which has some {{source}} subtasks running much slower > than others. 3 kinds of sources should be verified, including >- [Source > functions|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java] >- [InputFormat > sources|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java] >- [FLIP-27 new > sources|https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java] > - Modify Flink configuration file to enable speculative execution and tune > the configuration as you like > - Submit the job. Checking the web UI, logs, metrics and produced result. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution
[ https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he updated FLINK-28980: --- Priority: Blocker (was: Major) > Release Testing: Verify FLIP-168 speculative execution > -- > > Key: FLINK-28980 > URL: https://issues.apache.org/jira/browse/FLINK-28980 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Zhu Zhu >Priority: Blocker > Labels: test-stability > Fix For: 1.16.0 > > > Speculative execution is introduced in Flink 1.16 to deal with temporary slow > tasks caused by slow nodes. This feature currently consists of 4 FLIPs: > - FLIP-168: Speculative Execution core part > - FLIP-224: Blocklist Mechanism > - FLIP-245: Source Supports Speculative Execution > - FLIP-249: Flink Web UI Enhancement for Speculative Execution > This ticket aims for verifying FLIP-168, along with FLIP-224 and FLIP-249. > More details about this feature and how to use it can be found in this > documentation [PR|https://github.com/apache/flink/pull/20507]. > To do the verification, the process can be: > - Write a Flink job which has a subtask running much slower than others > (e.g. sleep indefinitely if it runs on a certain host, the hostname can be > retrieved via InetAddress.getLocalHost().getHostName(), or if its > (subtaskIndex + attemptNumer) % 2 == 0) > - Modify Flink configuration file to enable speculative execution and tune > the configuration as you like > - Submit the job. Checking the web UI, logs, metrics and produced result. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28981) Release Testing: Verify FLIP-245 sources speculative execution
[ https://issues.apache.org/jira/browse/FLINK-28981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he updated FLINK-28981: --- Labels: test-stability (was: ) > Release Testing: Verify FLIP-245 sources speculative execution > -- > > Key: FLINK-28981 > URL: https://issues.apache.org/jira/browse/FLINK-28981 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common, Runtime / Coordination >Reporter: Zhu Zhu >Priority: Major > Labels: test-stability > Fix For: 1.16.0 > > > Speculative execution is introduced in Flink 1.16 to deal with temporary slow > tasks caused by slow nodes. This feature currently consists of 4 FLIPs: > - FLIP-168: Speculative Execution core part > - FLIP-224: Blocklist Mechanism > - FLIP-245: Source Supports Speculative Execution > - FLIP-249: Flink Web UI Enhancement for Speculative Execution > This ticket aims for verifying FLIP-245, along with FLIP-168, FLIP-224 and > FLIP-249. > More details about this feature and how to use it can be found in this > documentation [PR|https://github.com/apache/flink/pull/20507]. > To do the verification, the process can be: > - Write Flink jobs which has some {{source}} subtasks running much slower > than others. 3 kinds of sources should be verified, including >- [Source > functions|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java] >- [InputFormat > sources|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java] >- [FLIP-27 new > sources|https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java] > - Modify Flink configuration file to enable speculative execution and tune > the configuration as you like > - Submit the job. Checking the web UI, logs, metrics and produced result. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution
[ https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he updated FLINK-28980: --- Labels: test-stability (was: ) > Release Testing: Verify FLIP-168 speculative execution > -- > > Key: FLINK-28980 > URL: https://issues.apache.org/jira/browse/FLINK-28980 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Zhu Zhu >Priority: Major > Labels: test-stability > Fix For: 1.16.0 > > > Speculative execution is introduced in Flink 1.16 to deal with temporary slow > tasks caused by slow nodes. This feature currently consists of 4 FLIPs: > - FLIP-168: Speculative Execution core part > - FLIP-224: Blocklist Mechanism > - FLIP-245: Source Supports Speculative Execution > - FLIP-249: Flink Web UI Enhancement for Speculative Execution > This ticket aims for verifying FLIP-168, along with FLIP-224 and FLIP-249. > More details about this feature and how to use it can be found in this > documentation [PR|https://github.com/apache/flink/pull/20507]. > To do the verification, the process can be: > - Write a Flink job which has a subtask running much slower than others > (e.g. sleep indefinitely if it runs on a certain host, the hostname can be > retrieved via InetAddress.getLocalHost().getHostName(), or if its > (subtaskIndex + attemptNumer) % 2 == 0) > - Modify Flink configuration file to enable speculative execution and tune > the configuration as you like > - Submit the job. Checking the web UI, logs, metrics and produced result. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28925) Fix the concurrency problem in hybrid shuffle
[ https://issues.apache.org/jira/browse/FLINK-28925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580062#comment-17580062 ] godfrey he commented on FLINK-28925: [~Weijie Guo] would you like to fix it ? > Fix the concurrency problem in hybrid shuffle > - > > Key: FLINK-28925 > URL: https://issues.apache.org/jira/browse/FLINK-28925 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.16.0 >Reporter: Weijie Guo >Priority: Blocker > Fix For: 1.16.0 > > > Through tpc-ds testing and code analysis, I found some thread unsafe problems > in hybrid shuffle: > # HsSubpartitionMemeoryDataManager#consumeBuffer should return a > readOnlySlice buffer to downstream instead of original buffer: If the > spilling thread is processing while downstream task is consuming the same > buffer, the amount of data written to the disk will be smaller than the > actual value. To solve this, we should let the consuming thread and the > spilling thread share the same data but not index. > # HsSubpartitionMemoryDataManager#releaseSubpartitionBuffers should ignore > the release decision if the buffer already removed from bufferIndexToContexts > instead of throw an exception. It should be pointed out that although the > actual release operation is synchronous, a double release can still happen. > The reason is that non-global decisions do not need to be synchronized. In > other words, the main task thread and the consumer thread may decide to > release a buffer at the same time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally
[ https://issues.apache.org/jira/browse/FLINK-28984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580061#comment-17580061 ] Yun Tang commented on FLINK-28984: -- I think the second analysis looks a bit weird. If the FSDataOutputStream has not been created, it will not call {{FsCheckpointStateOutputStream#createStream}} , which means we will not get the generated random path. The SafetyNetCloseableRegistry would help close the unreleased closable on JVM GC, it must be we forget to close them in somewhere. > FsCheckpointStateOutputStream is not being released normally > > > Key: FLINK-28984 > URL: https://issues.apache.org/jira/browse/FLINK-28984 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.6, 1.15.1 >Reporter: ChangjiGuo >Priority: Major > > If the checkpoint is aborted, AsyncSnapshotCallable will close the > snapshotCloseableRegistry when it is canceled. There may be two situations > here: > # The FSDataOutputStream has been created and closed while closing > FsCheckpointStateOutputStream. > # The FSDataOutputStream has not been created yet, but closed flag has been > set to true. You can see this in log: > {code:java} > 2022-08-16 12:55:44,161 WARN > org.apache.flink.core.fs.SafetyNetCloseableRegistry - Closing > unclosed resource via safety-net: > ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64) > : > x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a > {code} > The output stream will be automatically closed by the > SafetyNetCloseableRegistry but the file will not be deleted. > The second case usually occurs when the storage system has high latency in > creating files. > How to reproduce? > This is not easy to reproduce, but you can try to set a smaller checkpoint > timeout and increase the parallelism of the flink job. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] SteNicholas commented on pull request #266: [FLINK-28967] Fix Hive multi-version support for Table Store
SteNicholas commented on PR #266: URL: https://github.com/apache/flink-table-store/pull/266#issuecomment-1216181701 @tsreaper, the Hive 2.1 and 2.2 use the latest version 2.1.1 and 2.2.0, hence does the Hive 2.3 need to support the latest version 2.3.9? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28429) Reduce pyflink tests time
[ https://issues.apache.org/jira/browse/FLINK-28429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo updated FLINK-28429: - Labels: (was: test-stability) > Reduce pyflink tests time > - > > Key: FLINK-28429 > URL: https://issues.apache.org/jira/browse/FLINK-28429 > Project: Flink > Issue Type: Improvement > Components: API / Python >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Major > Fix For: 1.16.0 > > > Currently, it costs about 1 hour 30mins in pyflink tests. We need to optimize > it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-28920) Release Testing: Verify Python DataStream Window
[ https://issues.apache.org/jira/browse/FLINK-28920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo resolved FLINK-28920. -- Resolution: Done > Release Testing: Verify Python DataStream Window > > > Key: FLINK-28920 > URL: https://issues.apache.org/jira/browse/FLINK-28920 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: Luning Wang >Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > * Build flink source code and compile source code > {code:bash} > $ cd {flink-source-code} > $ mvn clean install -DskipTests > {code} > * Prepare a Python Virtual Environment > {code:bash} > $ cd flink-python/dev > $ ./lint-python.sh -s basic > $ source .conda/bin/activate > {code} > * Install PyFlink from source code. For more details, you can refer to the > [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink] > {code:bash} > $ cd flink-python/apache-flink-libraries > $ python setup.py sdist > $ pip install dist/*.tar.gz > $ cd .. > $ pip install -r dev/dev-requirements.txt > $ python setpy.py sdist > $ pip install dist/*.tar.gz > {code} > h1. Test > * Write a python datastream window job in thread mode. For details of > Window, you can refer to the > [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/]. > {code:python} > from typing import Tuple > from pyflink.common import Configuration > from pyflink.common.time import Time > from pyflink.common.typeinfo import Types > from pyflink.common.watermark_strategy import WatermarkStrategy, > TimestampAssigner > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.datastream.functions import AggregateFunction > from pyflink.datastream.window import EventTimeSessionWindows > class SecondColumnTimestampAssigner(TimestampAssigner): > def extract_timestamp(self, value, record_timestamp) -> int: > return int(value[1]) > def main(): > config = Configuration() > # thread mode > config.set_string("python.execution-mode", "thread") > # process mode > # config.set_string("python.execution-mode", "process") > env = StreamExecutionEnvironment.get_execution_environment(config) > data_stream = env.from_collection([ > ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', > 15)], > type_info=Types.TUPLE([Types.STRING(), Types.INT()])) # type: > DataStream > watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > class MyAggregateFunction(AggregateFunction): > def create_accumulator(self) -> Tuple[int, str]: > return 0, '' > def add(self, value: Tuple[str, int], accumulator: Tuple[int, str]) > -> Tuple[int, str]: > return value[1] + accumulator[0], value[0] > def get_result(self, accumulator: Tuple[str, int]): > return accumulator[1], accumulator[0] > def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]): > return acc_a[0] + acc_b[0], acc_a[1] > ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ > .key_by(lambda x: x[0], key_type=Types.STRING()) \ > .window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \ > .aggregate(MyAggregateFunction(), >accumulator_type=Types.TUPLE([Types.INT(), > Types.STRING()]), >output_type=Types.TUPLE([Types.STRING(), Types.INT()])) > ds.print() > env.execute('test_window_aggregate_accumulator_type') > if __name__ == '__main__': > main() > {code} > * run the python datastream window job and watch the result > {code:bash} > $ python demo.py > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-28918) Release Testing: Verify FLIP-206 in Python DataStream API
[ https://issues.apache.org/jira/browse/FLINK-28918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo resolved FLINK-28918. -- Resolution: Done > Release Testing: Verify FLIP-206 in Python DataStream API > - > > Key: FLINK-28918 > URL: https://issues.apache.org/jira/browse/FLINK-28918 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: Luning Wang >Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > * Build flink source code and compile source code > {code:bash} > $ cd {flink-source-code} > $ mvn clean install -DskipTests > {code} > * Prepare a Python Virtual Environment > {code:bash} > $ cd flink-python/dev > $ ./lint-python.sh -s basic > $ source .conda/bin/activate > {code} > * Install PyFlink from source code. For more details, you can refer to the > [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink] > {code:bash} > $ cd flink-python/apache-flink-libraries > $ python setup.py sdist > $ pip install dist/*.tar.gz > $ cd .. > $ pip install -r dev/dev-requirements.txt > $ python setpy.py sdist > $ pip install dist/*.tar.gz > {code} > h1. Test > * Write a python datastream job in thread mode > {code:python} > from pyflink.common import Configuration > from pyflink.common.typeinfo import Types > from pyflink.datastream import StreamExecutionEnvironment > def main(): > config = Configuration() > config.set_string("python.execution-mode", "thread") > env = StreamExecutionEnvironment.get_execution_environment(config) > ds = env.from_collection( > [(1, '9', 0), (1, '5', 1), (1, '6', 2), (5, '5', 0), (5, '3', 1)], > type_info=Types.ROW_NAMED(["v1", "v2", "v3"], > [Types.INT(), Types.STRING(), Types.INT()])) > def flat_map_func1(data): > for i in data: > yield int(i), 1 > def flat_map_func2(data): > for i in data: > yield i > ds = ds.key_by(lambda x: x[0]) \ > .min_by("v2") \ > .map(lambda x: (x[0], x[1], x[2]), > output_type=Types.TUPLE([Types.INT(), Types.STRING(), > Types.INT()])) \ > .key_by(lambda x: x[2]) \ > .max_by(0) \ > .flat_map(flat_map_func1, output_type=Types.TUPLE([Types.INT(), > Types.INT()])) \ > .key_by(lambda x: [1]) \ > .min_by() \ > .flat_map(flat_map_func2, output_type=Types.INT()) \ > .key_by(lambda x: x) \ > .max_by() > ds.print() > env.execute("key_by_min_by_max_by_test_batch") > if __name__ == '__main__': > main() > {code} > * run the python datastream job and watch the result > {code:bash} > $ python demo.py > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28983) using serviceaccount in FlinkDeployment not works when sink to aws s3
[ https://issues.apache.org/jira/browse/FLINK-28983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lichuan Shang updated FLINK-28983: -- Description: I am deploying a Flink CDC job using sql-runner example from official examples(see [https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example).] The _flink_ service account has all s3 permissions (`s3:*` in iam policy) but the k8s pod keeps on restarting and there's too much errors on pod's log: {code:java} Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at akka.actor.ActorCell.invoke(ActorCell.scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ... 4 more Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: doesBucketExist on nwlogs: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: B3ZNHK1DSM4JDJ39; S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=; Proxy: null), S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=:400 Bad Request: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: B3ZNHK1DSM4JDJ39; S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=; Proxy: null) at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:224) at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111) at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265) at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322) at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261) at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236) at org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:391) at org.apache.
[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #141: [FLINK-28805] Add Transformer for HashingTF
yunfengzhou-hub commented on code in PR #141: URL: https://github.com/apache/flink-ml/pull/141#discussion_r946314824 ## flink-ml-lib/src/main/java/org/apache/flink/ml/feature/hashingtf/HashingTF.java: ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.feature.hashingtf; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.ml.api.Transformer; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.linalg.Vectors; +import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.ArrayUtils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.shaded.guava30.com.google.common.hash.Hashing.murmur3_32; + +/** + * A Transformer that maps a sequence of terms(strings, numbers, booleans) to a sparse vector with a Review Comment: nit: This should be "vector" instead of "sparse vector" corresponding to the `VectorTypeInfo.INSTANCE` used when generating `outputTypeInfo`. ## flink-ml-lib/src/main/java/org/apache/flink/ml/feature/hashingtf/HashingTF.java: ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.feature.hashingtf; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.ml.api.Transformer; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.linalg.Vectors; +import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.ArrayUtils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.shaded.guava30.com.google.common.hash.Hashing.murmur3_32; + +/** + * A Transformer that maps a sequence of terms(strings, numbers, booleans) to a sparse vector with a + * specified dimension using the hashing trick. + * + * If multiple features are projected into the same column, the output values are accumulated by + * default. Users could also enforce all non-zero output values as 1 by setting {@link + * HashingTFParams#BINARY} as true. + * + * For the hashing trick, see https://en.wikipedia.org/wiki/Feature_hashing
[jira] [Commented] (FLINK-28918) Release Testing: Verify FLIP-206 in Python DataStream API
[ https://issues.apache.org/jira/browse/FLINK-28918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580058#comment-17580058 ] Huang Xingbo commented on FLINK-28918: -- [~ana4] Thanks a lot for the test. > Release Testing: Verify FLIP-206 in Python DataStream API > - > > Key: FLINK-28918 > URL: https://issues.apache.org/jira/browse/FLINK-28918 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: Luning Wang >Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > * Build flink source code and compile source code > {code:bash} > $ cd {flink-source-code} > $ mvn clean install -DskipTests > {code} > * Prepare a Python Virtual Environment > {code:bash} > $ cd flink-python/dev > $ ./lint-python.sh -s basic > $ source .conda/bin/activate > {code} > * Install PyFlink from source code. For more details, you can refer to the > [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink] > {code:bash} > $ cd flink-python/apache-flink-libraries > $ python setup.py sdist > $ pip install dist/*.tar.gz > $ cd .. > $ pip install -r dev/dev-requirements.txt > $ python setpy.py sdist > $ pip install dist/*.tar.gz > {code} > h1. Test > * Write a python datastream job in thread mode > {code:python} > from pyflink.common import Configuration > from pyflink.common.typeinfo import Types > from pyflink.datastream import StreamExecutionEnvironment > def main(): > config = Configuration() > config.set_string("python.execution-mode", "thread") > env = StreamExecutionEnvironment.get_execution_environment(config) > ds = env.from_collection( > [(1, '9', 0), (1, '5', 1), (1, '6', 2), (5, '5', 0), (5, '3', 1)], > type_info=Types.ROW_NAMED(["v1", "v2", "v3"], > [Types.INT(), Types.STRING(), Types.INT()])) > def flat_map_func1(data): > for i in data: > yield int(i), 1 > def flat_map_func2(data): > for i in data: > yield i > ds = ds.key_by(lambda x: x[0]) \ > .min_by("v2") \ > .map(lambda x: (x[0], x[1], x[2]), > output_type=Types.TUPLE([Types.INT(), Types.STRING(), > Types.INT()])) \ > .key_by(lambda x: x[2]) \ > .max_by(0) \ > .flat_map(flat_map_func1, output_type=Types.TUPLE([Types.INT(), > Types.INT()])) \ > .key_by(lambda x: [1]) \ > .min_by() \ > .flat_map(flat_map_func2, output_type=Types.INT()) \ > .key_by(lambda x: x) \ > .max_by() > ds.print() > env.execute("key_by_min_by_max_by_test_batch") > if __name__ == '__main__': > main() > {code} > * run the python datastream job and watch the result > {code:bash} > $ python demo.py > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28920) Release Testing: Verify Python DataStream Window
[ https://issues.apache.org/jira/browse/FLINK-28920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580057#comment-17580057 ] Huang Xingbo commented on FLINK-28920: -- [~ana4] Thanks a lot for the test. > Release Testing: Verify Python DataStream Window > > > Key: FLINK-28920 > URL: https://issues.apache.org/jira/browse/FLINK-28920 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: Luning Wang >Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > * Build flink source code and compile source code > {code:bash} > $ cd {flink-source-code} > $ mvn clean install -DskipTests > {code} > * Prepare a Python Virtual Environment > {code:bash} > $ cd flink-python/dev > $ ./lint-python.sh -s basic > $ source .conda/bin/activate > {code} > * Install PyFlink from source code. For more details, you can refer to the > [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink] > {code:bash} > $ cd flink-python/apache-flink-libraries > $ python setup.py sdist > $ pip install dist/*.tar.gz > $ cd .. > $ pip install -r dev/dev-requirements.txt > $ python setpy.py sdist > $ pip install dist/*.tar.gz > {code} > h1. Test > * Write a python datastream window job in thread mode. For details of > Window, you can refer to the > [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/]. > {code:python} > from typing import Tuple > from pyflink.common import Configuration > from pyflink.common.time import Time > from pyflink.common.typeinfo import Types > from pyflink.common.watermark_strategy import WatermarkStrategy, > TimestampAssigner > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.datastream.functions import AggregateFunction > from pyflink.datastream.window import EventTimeSessionWindows > class SecondColumnTimestampAssigner(TimestampAssigner): > def extract_timestamp(self, value, record_timestamp) -> int: > return int(value[1]) > def main(): > config = Configuration() > # thread mode > config.set_string("python.execution-mode", "thread") > # process mode > # config.set_string("python.execution-mode", "process") > env = StreamExecutionEnvironment.get_execution_environment(config) > data_stream = env.from_collection([ > ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', > 15)], > type_info=Types.TUPLE([Types.STRING(), Types.INT()])) # type: > DataStream > watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > class MyAggregateFunction(AggregateFunction): > def create_accumulator(self) -> Tuple[int, str]: > return 0, '' > def add(self, value: Tuple[str, int], accumulator: Tuple[int, str]) > -> Tuple[int, str]: > return value[1] + accumulator[0], value[0] > def get_result(self, accumulator: Tuple[str, int]): > return accumulator[1], accumulator[0] > def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]): > return acc_a[0] + acc_b[0], acc_a[1] > ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ > .key_by(lambda x: x[0], key_type=Types.STRING()) \ > .window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \ > .aggregate(MyAggregateFunction(), >accumulator_type=Types.TUPLE([Types.INT(), > Types.STRING()]), >output_type=Types.TUPLE([Types.STRING(), Types.INT()])) > ds.print() > env.execute('test_window_aggregate_accumulator_type') > if __name__ == '__main__': > main() > {code} > * run the python datastream window job and watch the result > {code:bash} > $ python demo.py > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] syhily commented on a diff in pull request #20567: [FLINK-27917][Connector/Pulsar] Fix message assert error in PulsarUnorderedPartitionSplitReaderTest
syhily commented on code in PR #20567: URL: https://github.com/apache/flink/pull/20567#discussion_r946357858 ## flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java: ## @@ -198,7 +213,7 @@ void pollMessageAfterTimeout(PulsarPartitionSplitReaderBase splitReader) String topicName = randomAlphabetic(10); // Add a split -seekStartPositionAndHandleSplit(splitReader, topicName, 0); +handleSplit(splitReader, topicName, 0, MessageId.latest); Review Comment: @tisonkun Yep. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28920) Release Testing: Verify Python DataStream Window
[ https://issues.apache.org/jira/browse/FLINK-28920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580053#comment-17580053 ] Luning Wang commented on FLINK-28920: - This test can successfully run on an M1 chip. [~hxbks2ks] > Release Testing: Verify Python DataStream Window > > > Key: FLINK-28920 > URL: https://issues.apache.org/jira/browse/FLINK-28920 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: Luning Wang >Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > * Build flink source code and compile source code > {code:bash} > $ cd {flink-source-code} > $ mvn clean install -DskipTests > {code} > * Prepare a Python Virtual Environment > {code:bash} > $ cd flink-python/dev > $ ./lint-python.sh -s basic > $ source .conda/bin/activate > {code} > * Install PyFlink from source code. For more details, you can refer to the > [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink] > {code:bash} > $ cd flink-python/apache-flink-libraries > $ python setup.py sdist > $ pip install dist/*.tar.gz > $ cd .. > $ pip install -r dev/dev-requirements.txt > $ python setpy.py sdist > $ pip install dist/*.tar.gz > {code} > h1. Test > * Write a python datastream window job in thread mode. For details of > Window, you can refer to the > [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/]. > {code:python} > from typing import Tuple > from pyflink.common import Configuration > from pyflink.common.time import Time > from pyflink.common.typeinfo import Types > from pyflink.common.watermark_strategy import WatermarkStrategy, > TimestampAssigner > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.datastream.functions import AggregateFunction > from pyflink.datastream.window import EventTimeSessionWindows > class SecondColumnTimestampAssigner(TimestampAssigner): > def extract_timestamp(self, value, record_timestamp) -> int: > return int(value[1]) > def main(): > config = Configuration() > # thread mode > config.set_string("python.execution-mode", "thread") > # process mode > # config.set_string("python.execution-mode", "process") > env = StreamExecutionEnvironment.get_execution_environment(config) > data_stream = env.from_collection([ > ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', > 15)], > type_info=Types.TUPLE([Types.STRING(), Types.INT()])) # type: > DataStream > watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > class MyAggregateFunction(AggregateFunction): > def create_accumulator(self) -> Tuple[int, str]: > return 0, '' > def add(self, value: Tuple[str, int], accumulator: Tuple[int, str]) > -> Tuple[int, str]: > return value[1] + accumulator[0], value[0] > def get_result(self, accumulator: Tuple[str, int]): > return accumulator[1], accumulator[0] > def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]): > return acc_a[0] + acc_b[0], acc_a[1] > ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ > .key_by(lambda x: x[0], key_type=Types.STRING()) \ > .window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \ > .aggregate(MyAggregateFunction(), >accumulator_type=Types.TUPLE([Types.INT(), > Types.STRING()]), >output_type=Types.TUPLE([Types.STRING(), Types.INT()])) > ds.print() > env.execute('test_window_aggregate_accumulator_type') > if __name__ == '__main__': > main() > {code} > * run the python datastream window job and watch the result > {code:bash} > $ python demo.py > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28918) Release Testing: Verify FLIP-206 in Python DataStream API
[ https://issues.apache.org/jira/browse/FLINK-28918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580054#comment-17580054 ] Luning Wang commented on FLINK-28918: - This test can successfully run on an M1 chip. [~hxbks2ks] > Release Testing: Verify FLIP-206 in Python DataStream API > - > > Key: FLINK-28918 > URL: https://issues.apache.org/jira/browse/FLINK-28918 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: Luning Wang >Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > * Build flink source code and compile source code > {code:bash} > $ cd {flink-source-code} > $ mvn clean install -DskipTests > {code} > * Prepare a Python Virtual Environment > {code:bash} > $ cd flink-python/dev > $ ./lint-python.sh -s basic > $ source .conda/bin/activate > {code} > * Install PyFlink from source code. For more details, you can refer to the > [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink] > {code:bash} > $ cd flink-python/apache-flink-libraries > $ python setup.py sdist > $ pip install dist/*.tar.gz > $ cd .. > $ pip install -r dev/dev-requirements.txt > $ python setpy.py sdist > $ pip install dist/*.tar.gz > {code} > h1. Test > * Write a python datastream job in thread mode > {code:python} > from pyflink.common import Configuration > from pyflink.common.typeinfo import Types > from pyflink.datastream import StreamExecutionEnvironment > def main(): > config = Configuration() > config.set_string("python.execution-mode", "thread") > env = StreamExecutionEnvironment.get_execution_environment(config) > ds = env.from_collection( > [(1, '9', 0), (1, '5', 1), (1, '6', 2), (5, '5', 0), (5, '3', 1)], > type_info=Types.ROW_NAMED(["v1", "v2", "v3"], > [Types.INT(), Types.STRING(), Types.INT()])) > def flat_map_func1(data): > for i in data: > yield int(i), 1 > def flat_map_func2(data): > for i in data: > yield i > ds = ds.key_by(lambda x: x[0]) \ > .min_by("v2") \ > .map(lambda x: (x[0], x[1], x[2]), > output_type=Types.TUPLE([Types.INT(), Types.STRING(), > Types.INT()])) \ > .key_by(lambda x: x[2]) \ > .max_by(0) \ > .flat_map(flat_map_func1, output_type=Types.TUPLE([Types.INT(), > Types.INT()])) \ > .key_by(lambda x: [1]) \ > .min_by() \ > .flat_map(flat_map_func2, output_type=Types.INT()) \ > .key_by(lambda x: x) \ > .max_by() > ds.print() > env.execute("key_by_min_by_max_by_test_batch") > if __name__ == '__main__': > main() > {code} > * run the python datastream job and watch the result > {code:bash} > $ python demo.py > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails
[ https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580044#comment-17580044 ] ChangjiGuo edited comment on FLINK-28927 at 8/16/22 5:31 AM: - I also found another problem when debugging the code. See [here|https://issues.apache.org/jira/browse/FLINK-28984] [~yunta] Can you assign these two issues to me? Thanks! was (Author: changjiguo): I also found another problem when debugging the code. See [https://issues.apache.org/jira/projects/FLINK/issues/FLINK-28984] [~yunta] Can you assign these two issues to me? Thanks! > Can not clean up the uploaded shared files when the checkpoint fails > > > Key: FLINK-28927 > URL: https://issues.apache.org/jira/browse/FLINK-28927 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.11.6, 1.15.1 > Environment: Flink-1.11 >Reporter: ChangjiGuo >Priority: Major > > If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable > thread and do some cleanup work, including the following: > * Cancel all AsyncSnapshotCallable thread. > * If the thread has finished, it will clean up all state object. > * If the thread has not completed, it will be interrupted(maybe). > * Close snapshotCloseableRegistry. > In my case, the thread was interrupted while waiting for the file upload to > complete, but the file was not cleaned up. > RocksDBStateUploader.java > {code:java} > FutureUtils.waitForAll(futures.values()).get(); > {code} > It will wait for all files to be uploaded here. Although it has been > interrupted, the uploaded files will not be cleaned up. The remaining files > are mainly divided into: > * Files that have finished uploading before the thread is canceled. > * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry > has not been closed. > How to reproduce? > Shorten the checkpoint timeout time, making the checkpoint fail. Then check > if there are any files in the shared directory. > I'm testing on Flink-1.11, but I found the code from the latest branch may > have the same problem. I tried to fix it and it works well so far. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28985) support create table like view
waywtdcc created FLINK-28985: Summary: support create table like view Key: FLINK-28985 URL: https://issues.apache.org/jira/browse/FLINK-28985 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.16.0 Reporter: waywtdcc Fix For: 1.16.0 At present, to create a table based on table like, you can only use the table type table, not the view type. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally
[ https://issues.apache.org/jira/browse/FLINK-28984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ChangjiGuo updated FLINK-28984: --- Description: If the checkpoint is aborted, AsyncSnapshotCallable will close the snapshotCloseableRegistry when it is canceled. There may be two situations here: # The FSDataOutputStream has been created and closed while closing FsCheckpointStateOutputStream. # The FSDataOutputStream has not been created yet, but closed flag has been set to true. You can see this in log: {code:java} 2022-08-16 12:55:44,161 WARN org.apache.flink.core.fs.SafetyNetCloseableRegistry - Closing unclosed resource via safety-net: ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64) : x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a {code} The output stream will be automatically closed by the SafetyNetCloseableRegistry but the file will not be deleted. The second case usually occurs when the storage system has high latency in creating files. How to reproduce? This is not easy to reproduce, but you can try to set a smaller checkpoint timeout and increase the parallelism of the flink job. was: If the checkpoint is aborted, AsyncSnapshotCallable will close the snapshotCloseableRegistry when it is canceled. There may be two situations here: # The FSDataOutputStream has been created and closed while closing FsCheckpointStateOutputStream. # The FSDataOutputStream has not been created yet, but closed flag has been set to true. You can see this in log: {code:java} 2022-08-16 12:55:44,161 WARN org.apache.flink.core.fs.SafetyNetCloseableRegistry - Closing unclosed resource via safety-net: ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64) : x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a {code} The output stream will be automatically closed by the SafetyNetCloseableRegistry but the file will not be deleted. The second case usually occurs when the storage system has high latency in creating files. > FsCheckpointStateOutputStream is not being released normally > > > Key: FLINK-28984 > URL: https://issues.apache.org/jira/browse/FLINK-28984 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.6, 1.15.1 >Reporter: ChangjiGuo >Priority: Major > > If the checkpoint is aborted, AsyncSnapshotCallable will close the > snapshotCloseableRegistry when it is canceled. There may be two situations > here: > # The FSDataOutputStream has been created and closed while closing > FsCheckpointStateOutputStream. > # The FSDataOutputStream has not been created yet, but closed flag has been > set to true. You can see this in log: > {code:java} > 2022-08-16 12:55:44,161 WARN > org.apache.flink.core.fs.SafetyNetCloseableRegistry - Closing > unclosed resource via safety-net: > ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64) > : > x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a > {code} > The output stream will be automatically closed by the > SafetyNetCloseableRegistry but the file will not be deleted. > The second case usually occurs when the storage system has high latency in > creating files. > How to reproduce? > This is not easy to reproduce, but you can try to set a smaller checkpoint > timeout and increase the parallelism of the flink job. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally
[ https://issues.apache.org/jira/browse/FLINK-28984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ChangjiGuo updated FLINK-28984: --- External issue URL: https://issues.apache.org/jira/browse/FLINK-28927 > FsCheckpointStateOutputStream is not being released normally > > > Key: FLINK-28984 > URL: https://issues.apache.org/jira/browse/FLINK-28984 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.6, 1.15.1 >Reporter: ChangjiGuo >Priority: Major > > If the checkpoint is aborted, AsyncSnapshotCallable will close the > snapshotCloseableRegistry when it is canceled. There may be two situations > here: > # The FSDataOutputStream has been created and closed while closing > FsCheckpointStateOutputStream. > # The FSDataOutputStream has not been created yet, but closed flag has been > set to true. You can see this in log: > {code:java} > 2022-08-16 12:55:44,161 WARN > org.apache.flink.core.fs.SafetyNetCloseableRegistry - Closing > unclosed resource via safety-net: > ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64) > : > x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a > {code} > The output stream will be automatically closed by the > SafetyNetCloseableRegistry but the file will not be deleted. > The second case usually occurs when the storage system has high latency in > creating files. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally
[ https://issues.apache.org/jira/browse/FLINK-28984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ChangjiGuo updated FLINK-28984: --- External issue URL: (was: https://issues.apache.org/jira/browse/FLINK-28927) > FsCheckpointStateOutputStream is not being released normally > > > Key: FLINK-28984 > URL: https://issues.apache.org/jira/browse/FLINK-28984 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.6, 1.15.1 >Reporter: ChangjiGuo >Priority: Major > > If the checkpoint is aborted, AsyncSnapshotCallable will close the > snapshotCloseableRegistry when it is canceled. There may be two situations > here: > # The FSDataOutputStream has been created and closed while closing > FsCheckpointStateOutputStream. > # The FSDataOutputStream has not been created yet, but closed flag has been > set to true. You can see this in log: > {code:java} > 2022-08-16 12:55:44,161 WARN > org.apache.flink.core.fs.SafetyNetCloseableRegistry - Closing > unclosed resource via safety-net: > ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64) > : > x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a > {code} > The output stream will be automatically closed by the > SafetyNetCloseableRegistry but the file will not be deleted. > The second case usually occurs when the storage system has high latency in > creating files. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails
[ https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580044#comment-17580044 ] ChangjiGuo edited comment on FLINK-28927 at 8/16/22 5:15 AM: - I also found another problem when debugging the code. See [https://issues.apache.org/jira/projects/FLINK/issues/FLINK-28984] [~yunta] Can you assign these two issues to me? Thanks! was (Author: changjiguo): I also found another problem when debugging the code. See [https://issues.apache.org/jira/projects/FLINK/issues/FLINK-28984] > Can not clean up the uploaded shared files when the checkpoint fails > > > Key: FLINK-28927 > URL: https://issues.apache.org/jira/browse/FLINK-28927 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.11.6, 1.15.1 > Environment: Flink-1.11 >Reporter: ChangjiGuo >Priority: Major > > If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable > thread and do some cleanup work, including the following: > * Cancel all AsyncSnapshotCallable thread. > * If the thread has finished, it will clean up all state object. > * If the thread has not completed, it will be interrupted(maybe). > * Close snapshotCloseableRegistry. > In my case, the thread was interrupted while waiting for the file upload to > complete, but the file was not cleaned up. > RocksDBStateUploader.java > {code:java} > FutureUtils.waitForAll(futures.values()).get(); > {code} > It will wait for all files to be uploaded here. Although it has been > interrupted, the uploaded files will not be cleaned up. The remaining files > are mainly divided into: > * Files that have finished uploading before the thread is canceled. > * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry > has not been closed. > How to reproduce? > Shorten the checkpoint timeout time, making the checkpoint fail. Then check > if there are any files in the shared directory. > I'm testing on Flink-1.11, but I found the code from the latest branch may > have the same problem. I tried to fix it and it works well so far. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails
[ https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580044#comment-17580044 ] ChangjiGuo commented on FLINK-28927: I also found another problem when debugging the code. See [https://issues.apache.org/jira/projects/FLINK/issues/FLINK-28984] > Can not clean up the uploaded shared files when the checkpoint fails > > > Key: FLINK-28927 > URL: https://issues.apache.org/jira/browse/FLINK-28927 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.11.6, 1.15.1 > Environment: Flink-1.11 >Reporter: ChangjiGuo >Priority: Major > > If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable > thread and do some cleanup work, including the following: > * Cancel all AsyncSnapshotCallable thread. > * If the thread has finished, it will clean up all state object. > * If the thread has not completed, it will be interrupted(maybe). > * Close snapshotCloseableRegistry. > In my case, the thread was interrupted while waiting for the file upload to > complete, but the file was not cleaned up. > RocksDBStateUploader.java > {code:java} > FutureUtils.waitForAll(futures.values()).get(); > {code} > It will wait for all files to be uploaded here. Although it has been > interrupted, the uploaded files will not be cleaned up. The remaining files > are mainly divided into: > * Files that have finished uploading before the thread is canceled. > * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry > has not been closed. > How to reproduce? > Shorten the checkpoint timeout time, making the checkpoint fail. Then check > if there are any files in the shared directory. > I'm testing on Flink-1.11, but I found the code from the latest branch may > have the same problem. I tried to fix it and it works well so far. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally
[ https://issues.apache.org/jira/browse/FLINK-28984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ChangjiGuo updated FLINK-28984: --- Description: If the checkpoint is aborted, AsyncSnapshotCallable will close the snapshotCloseableRegistry when it is canceled. There may be two situations here: # The FSDataOutputStream has been created and closed while closing FsCheckpointStateOutputStream. # The FSDataOutputStream has not been created yet, but closed flag has been set to true. You can see this in log: {code:java} 2022-08-16 12:55:44,161 WARN org.apache.flink.core.fs.SafetyNetCloseableRegistry - Closing unclosed resource via safety-net: ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64) : x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a {code} The output stream will be automatically closed by the SafetyNetCloseableRegistry but the file will not be deleted. The second case usually occurs when the storage system has high latency in creating files. was: If the checkpoint is aborted, AsyncSnapshotCallable will close the snapshotCloseableRegistry when it is canceled. There may be two situations here: # The FSDataOutputStream has been created and closed while closing FsCheckpointStateOutputStream. # The FSDataOutputStream has not been created yet, but closed flag has been set to true. You can see this in log: {code:java} 2022-08-16 12:55:44,161 WARN org.apache.flink.core.fs.SafetyNetCloseableRegistry - Closing unclosed resource via safety-net: ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64) : x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a {code} The output stream will be automatically closed by the SafetyNetCloseableRegistry but the file will not be deleted. The second case usually occurs when the storage system has high latency in creating files. > FsCheckpointStateOutputStream is not being released normally > > > Key: FLINK-28984 > URL: https://issues.apache.org/jira/browse/FLINK-28984 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.6, 1.15.1 >Reporter: ChangjiGuo >Priority: Major > > If the checkpoint is aborted, AsyncSnapshotCallable will close the > snapshotCloseableRegistry when it is canceled. There may be two situations > here: > # The FSDataOutputStream has been created and closed while closing > FsCheckpointStateOutputStream. > # The FSDataOutputStream has not been created yet, but closed flag has been > set to true. You can see this in log: > {code:java} > 2022-08-16 12:55:44,161 WARN > org.apache.flink.core.fs.SafetyNetCloseableRegistry - Closing > unclosed resource via safety-net: > ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64) > : > x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a > {code} > The output stream will be automatically closed by the > SafetyNetCloseableRegistry but the file will not be deleted. > The second case usually occurs when the storage system has high latency in > creating files. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-28954) Release Testing: Verify FLIP-223 HiveServer2 Endpoint
[ https://issues.apache.org/jira/browse/FLINK-28954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-28954: --- Assignee: luoyuxia > Release Testing: Verify FLIP-223 HiveServer2 Endpoint > - > > Key: FLINK-28954 > URL: https://issues.apache.org/jira/browse/FLINK-28954 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive, Table SQL / Gateway >Affects Versions: 1.16.0 >Reporter: Shengkai Fang >Assignee: luoyuxia >Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > HiveServer2 Endpoint is ready to use in this version. I think we can verify: > # We can start the SQL Gateway with HiveServer2 Endpoint > # User is able to sumit SQL with Hive beeline > # User is able to sumit SQL with DBeaver -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally
ChangjiGuo created FLINK-28984: -- Summary: FsCheckpointStateOutputStream is not being released normally Key: FLINK-28984 URL: https://issues.apache.org/jira/browse/FLINK-28984 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.15.1, 1.11.6 Reporter: ChangjiGuo If the checkpoint is aborted, AsyncSnapshotCallable will close the snapshotCloseableRegistry when it is canceled. There may be two situations here: # The FSDataOutputStream has been created and closed while closing FsCheckpointStateOutputStream. # The FSDataOutputStream has not been created yet, but closed flag has been set to true. You can see this in log: {code:java} 2022-08-16 12:55:44,161 WARN org.apache.flink.core.fs.SafetyNetCloseableRegistry - Closing unclosed resource via safety-net: ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64) : x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a {code} The output stream will be automatically closed by the SafetyNetCloseableRegistry but the file will not be deleted. The second case usually occurs when the storage system has high latency in creating files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-28952) Release Testing: Veify Hive dialect
[ https://issues.apache.org/jira/browse/FLINK-28952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-28952: --- Assignee: Shengkai Fang > Release Testing: Veify Hive dialect > > > Key: FLINK-28952 > URL: https://issues.apache.org/jira/browse/FLINK-28952 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive, Table SQL / API >Affects Versions: 1.16.0 >Reporter: luoyuxia >Assignee: Shengkai Fang >Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28983) using serviceaccount in FlinkDeployment not works when sink to aws s3
[ https://issues.apache.org/jira/browse/FLINK-28983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lichuan Shang updated FLINK-28983: -- Description: I am deploying a Flink CDC job using sql-runner example from official examples(see [https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example).] The _flink_ service account has all s3 permissions (`s3:*` in iam policy) but the k8s pod keeps on restarting and there's too much errors on pod's log: {code:java} Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at akka.actor.ActorCell.invoke(ActorCell.scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ... 4 more Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: doesBucketExist on nwlogs: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: B3ZNHK1DSM4JDJ39; S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=; Proxy: null), S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=:400 Bad Request: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: B3ZNHK1DSM4JDJ39; S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=; Proxy: null) at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:224) at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111) at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265) at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322) at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261) at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236) at org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:391) at org.apache.
[jira] [Created] (FLINK-28983) using serviceaccount in FlinkDeployment not works when sink to aws s3
Lichuan Shang created FLINK-28983: - Summary: using serviceaccount in FlinkDeployment not works when sink to aws s3 Key: FLINK-28983 URL: https://issues.apache.org/jira/browse/FLINK-28983 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.1.0 Reporter: Lichuan Shang I am deploying a Flink CDC job using sql-runner example from official examples(see [https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example).] The `flink` service account has all s3 permissions (`s3:*` in iam policy) but the k8s pod keeps on restarting and there's too much errors on pod's log: ``` Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at akka.actor.ActorCell.invoke(ActorCell.scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ... 4 more Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: doesBucketExist on nwlogs: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: B3ZNHK1DSM4JDJ39; S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=; Proxy: null), S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=:400 Bad Request: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: B3ZNHK1DSM4JDJ39; S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=; Proxy: null) at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:224) at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111) at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265) at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322) at org.apache.had
[GitHub] [flink] imaffe commented on a diff in pull request #20567: [FLINK-27917][Connector/Pulsar] Fix message assert error in PulsarUnorderedPartitionSplitReaderTest
imaffe commented on code in PR #20567: URL: https://github.com/apache/flink/pull/20567#discussion_r946332754 ## flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java: ## @@ -198,7 +213,7 @@ void pollMessageAfterTimeout(PulsarPartitionSplitReaderBase splitReader) String topicName = randomAlphabetic(10); // Add a split -seekStartPositionAndHandleSplit(splitReader, topicName, 0); +handleSplit(splitReader, topicName, 0, MessageId.latest); Review Comment: Here we should use seekStartPositionAndHandleSplit(). The difference is whether we want to do a seek before we call handleSplitChanges. Here we want to receive 0 message in the next poll() call, so we need to seek to the `latest` cursor so no data will be consumed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] tisonkun commented on a diff in pull request #20567: [FLINK-27917][Connector/Pulsar] Fix message assert error in PulsarUnorderedPartitionSplitReaderTest
tisonkun commented on code in PR #20567: URL: https://github.com/apache/flink/pull/20567#discussion_r946324038 ## flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java: ## @@ -198,7 +213,7 @@ void pollMessageAfterTimeout(PulsarPartitionSplitReaderBase splitReader) String topicName = randomAlphabetic(10); // Add a split -seekStartPositionAndHandleSplit(splitReader, topicName, 0); +handleSplit(splitReader, topicName, 0, MessageId.latest); Review Comment: @syhily so you mean that these two changes are all about non-existed topic, and you leave those for existing topic as is? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-28965) Should create empty partition when it's for dynamic partition
[ https://issues.apache.org/jira/browse/FLINK-28965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-28965: --- Assignee: tartarus > Should create empty partition when it's for dynamic partition > - > > Key: FLINK-28965 > URL: https://issues.apache.org/jira/browse/FLINK-28965 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.16.0 >Reporter: luoyuxia >Assignee: tartarus >Priority: Critical > Fix For: 1.16.0 > > > Can be reproduced by the following code in HiveDialectTest: > > {code:java} > tableEnv.executeSql( > "create table over1k_part_orc(\n" > + " si smallint,\n" > + " i int,\n" > + " b bigint,\n" > + " f float)\n" > + " partitioned by (ds string, t tinyint) stored as > orc"); > tableEnv.executeSql( > "create table over1k(\n" > + " t tinyint,\n" > + " si smallint,\n" > + " i int,\n" > + " b bigint,\n" > + " f float,\n" > + " d double,\n" > + " bo boolean,\n" > + " s string,\n" > + " ts timestamp,\n" > + " dec decimal(4,2),\n" > + " bin binary)"); > tableEnv.executeSql( > "insert overwrite table over1k_part_orc partition(ds=\"foo\", > t)" > + " select si,i,b,f,t from over1k where t is null or > t=27 order by si") > .await(); {code} > > Althogh it's for dynamic partition, the current code will try to create a > partition for it (ds='foo') when there's no data wrotten to it , so the > exception will be thrown since the partition spec (ds='foo') is not full path > {code:java} > Caused by: MetaException(message:Invalid partition key & values; keys [ds, t, > ], values [foo, ]) {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-28548) The commit partition base path is not created when no data is sent which may cause FileNotFoundException
[ https://issues.apache.org/jira/browse/FLINK-28548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-28548. --- Assignee: Liu Resolution: Fixed Fixed in master: f2abb51ac91c8b0e9bdd261de791d3aa1ba033dd > The commit partition base path is not created when no data is sent which may > cause FileNotFoundException > > > Key: FLINK-28548 > URL: https://issues.apache.org/jira/browse/FLINK-28548 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.14.5, 1.15.1, 1.16.0 >Reporter: Liu >Assignee: Liu >Priority: Critical > Labels: pull-request-available > Fix For: 1.16.0 > > > The commit partition base path is not created when no data is sent which may > cause FileNotFoundException. The exception is as following: > {code:java} > Caused by: java.io.FileNotFoundException: File > /home/ljg/test_sql.db/flink_batch_test/.staging_1657697612169 does not exist. > at > org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:771) > ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?] > at > org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:120) > ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?] > at > org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:828) > ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?] > at > org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:824) > ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?] > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > ~[hadoop-common-2.6.0U203-cdh5.4.4.jar:?] > at > org.apache.hadoop.hdfs.perflog.FileSystemLinkResolverWithStatistics$1.doCall(FileSystemLinkResolverWithStatistics.java:37) > ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?] > at org.apache.hadoop.hdfs.perflog.PerfProxy.call(PerfProxy.java:49) > ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?] > at > org.apache.hadoop.hdfs.perflog.FileSystemLinkResolverWithStatistics.resolve(FileSystemLinkResolverWithStatistics.java:39) > ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?] > at > org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:835) > ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?] > at > org.apache.hadoop.fs.FilterFileSystem.listStatus(FilterFileSystem.java:238) > ~[hadoop-common-2.6.0U203-cdh5.4.4.jar:?] > at > org.apache.hadoop.fs.FilterFileSystem.listStatus(FilterFileSystem.java:238) > ~[hadoop-common-2.6.0U203-cdh5.4.4.jar:?] > at > org.apache.hadoop.fs.viewfs.ChRootedFileSystem.listStatus(ChRootedFileSystem.java:241) > ~[hadoop-common-2.6.0U203-cdh5.4.4.jar:?] > at > org.apache.hadoop.fs.viewfs.ViewFileSystem.listStatus(ViewFileSystem.java:376) > ~[hadoop-common-2.6.0U203-cdh5.4.4.jar:?] > at > org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) > ~[flink-sql-connector-hive-2.3.6_2.12-1.15.0.jar:1.15.0] > at > org.apache.flink.connector.file.table.PartitionTempFileManager.listTaskTemporaryPaths(PartitionTempFileManager.java:87) > ~[flink-connector-files-1.15.0.jar:1.15.0] > at > org.apache.flink.connector.file.table.FileSystemCommitter.commitPartitions(FileSystemCommitter.java:78) > ~[flink-connector-files-1.15.0.jar:1.15.0] > at > org.apache.flink.connector.file.table.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:89) > ~[flink-connector-files-1.15.0.jar:1.15.0] > at > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:153) > ~[flink-dist-1.15.0.jar:1.15.0] > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.jobFinished(DefaultExecutionGraph.java:1190) > ~[flink-dist-1.15.0.jar:1.15.0] > ... 43 more {code} > We should check whether the base path exists before listStatus for the path. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wuchong merged pull request #20269: [FLINK-28548][Connectors / FileSystem] Fix the exception FileNotFoundException when the commit partition base path is not created
wuchong merged PR #20269: URL: https://github.com/apache/flink/pull/20269 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-28003) Hive sql is wrongly modified by SqlCompleter in SQL client when using -f {file}
[ https://issues.apache.org/jira/browse/FLINK-28003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-28003. --- Resolution: Fixed Fixed in master: 226f1602c047092cc1997f6e861aa37858df21f7 > Hive sql is wrongly modified by SqlCompleter in SQL client when using -f > {file} > --- > > Key: FLINK-28003 > URL: https://issues.apache.org/jira/browse/FLINK-28003 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.15.0 >Reporter: Jing Zhang >Assignee: Liu >Priority: Critical > Labels: pull-request-available > Fix For: 1.16.0 > > Attachments: zj_test.sql > > > When I run the following sql in SqlClient using 'sql-client.sh -f zj_test.sql' > {code:java} > create table if not exists db.zj_test( > pos int, > rank_cmd string > ) > partitioned by ( > `p_date` string, > `p_hourmin` string); > INSERT OVERWRITE TABLE db.zj_test PARTITION (p_date='20220605', p_hourmin = > '0100') > SELECT > pos , > rank_cmd > FROM db.sourceT > where p_date = '20220605' and p_hourmin = '0100'; {code} > An error would be thrown out because the 'pos' field is changed to > 'POSITION'. I guess `SqlCompleter` in sqlClient module might do something > wrong. > The error could be reproduced using the attached file. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28982) Start TaskInterrupter when task switch from DEPLOYING to CANCELING
LI Mingkun created FLINK-28982: -- Summary: Start TaskInterrupter when task switch from DEPLOYING to CANCELING Key: FLINK-28982 URL: https://issues.apache.org/jira/browse/FLINK-28982 Project: Flink Issue Type: Technical Debt Components: Runtime / Web Frontend Reporter: LI Mingkun Attachments: image-2022-08-16-12-10-43-894.png Task will start TaskInterrupter only when `ExecutionState` is INITIALIZING or RUNNING in the function: org.apache.flink.runtime.taskmanager.Task#cancelOrFailAndCancelInvokableInternal I met a dead lock in multi task which caused by Flink Remote Shuffle's sharing TCP connection bug and blocked tasks destruction when I use Flink Remote Shuffle. stack as following: !image-2022-08-16-12-10-43-894.png! My question: Why not start the TaskInterrupter when cancel a deploying task? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wuchong merged pull request #20182: [FLINK-28003][Table SQL / Client] Disable SqlCompleter when using -f {file}
wuchong merged PR #20182: URL: https://github.com/apache/flink/pull/20182 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-28773) Fix Hive sink not write a success file after finish writing in batch mode
[ https://issues.apache.org/jira/browse/FLINK-28773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-28773. --- Assignee: tartarus Resolution: Fixed Fixed in master: 00cded670753e2795c774c159facb801bbeb9927 to b66680bb658dfb550e60a5e7440ee904c87219c6 > Fix Hive sink not write a success file after finish writing in batch mode > - > > Key: FLINK-28773 > URL: https://issues.apache.org/jira/browse/FLINK-28773 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: luoyuxia >Assignee: tartarus >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > Currently, in stream mode, we allow user to configure commit policy, but in > batch mode, we don't provide such way and it will always commit to metastore. > But user expect other commit policy such like write a success file in batch > mode. So, it'll be better to support write a success file after finish > writing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wuchong merged pull request #20469: [FLINK-28773][hive] Fix Hive sink not write a success file after finish writing in batch mode
wuchong merged PR #20469: URL: https://github.com/apache/flink/pull/20469 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on a diff in pull request #20585: [WIP][FLINK-28976][state/changelog] Don't add extra delay to the 1st materialization
curcur commented on code in PR #20585: URL: https://github.com/apache/flink/pull/20585#discussion_r946315050 ## flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java: ## @@ -118,7 +118,7 @@ public void start() { LOG.info("Task {} starts periodic materialization", subtaskName); -scheduleNextMaterialization(periodicMaterializeDelay + initialDelay); +scheduleNextMaterialization(initialDelay); Review Comment: Let's introduce a way to disable the materialization (flag). I am not a fan of using `periodicMaterializationDelay` very long to disable materialization. It is implicit and may lead to error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on a diff in pull request #20585: [WIP][FLINK-28976][state/changelog] Don't add extra delay to the 1st materialization
curcur commented on code in PR #20585: URL: https://github.com/apache/flink/pull/20585#discussion_r946315050 ## flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java: ## @@ -118,7 +118,7 @@ public void start() { LOG.info("Task {} starts periodic materialization", subtaskName); -scheduleNextMaterialization(periodicMaterializeDelay + initialDelay); +scheduleNextMaterialization(initialDelay); Review Comment: I think we should have some way to disable the materialization (flag). I am not a fan of using `periodicMaterializationDelay` very long to disable materialization. It is implicit and may lead error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-28130) FLIP-224: Blocklist Mechanism
[ https://issues.apache.org/jira/browse/FLINK-28130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu closed FLINK-28130. --- Resolution: Done > FLIP-224: Blocklist Mechanism > - > > Key: FLINK-28130 > URL: https://issues.apache.org/jira/browse/FLINK-28130 > Project: Flink > Issue Type: New Feature > Components: Runtime / Coordination >Affects Versions: 1.16.0 >Reporter: Lijie Wang >Assignee: Lijie Wang >Priority: Major > Fix For: 1.16.0 > > > In order to support speculative execution for batch > jobs([FLIP-168|https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job]), > we need a mechanism to block resources on nodes where the slow tasks are > located. We propose to introduce a blocklist mechanism as follows: Once a > node is marked as blocked, future slots should not be allocated from the > blocked node, but the slots that are already allocated will not be affected. > More details see > [FLIP-224|https://cwiki.apache.org/confluence/display/FLINK/FLIP-224%3A+Blocklist+Mechanism] > This is the umbrella ticket to track all the changes of this feature. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-28587) FLIP-249: Flink Web UI Enhancement for Speculative Execution
[ https://issues.apache.org/jira/browse/FLINK-28587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu closed FLINK-28587. --- Fix Version/s: 1.16.0 Resolution: Done > FLIP-249: Flink Web UI Enhancement for Speculative Execution > > > Key: FLINK-28587 > URL: https://issues.apache.org/jira/browse/FLINK-28587 > Project: Flink > Issue Type: New Feature > Components: Runtime / REST, Runtime / Web Frontend >Affects Versions: 1.16.0 >Reporter: Gen Luo >Assignee: Gen Luo >Priority: Major > Fix For: 1.16.0 > > > As a follow-up step of FLIP-168 and FLIP-224, the Flink Web UI needs to be > enhanced to display the related information if the speculative execution > mechanism is enabled. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-28397) [FLIP-245] Source Supports Speculative Execution For Batch Job
[ https://issues.apache.org/jira/browse/FLINK-28397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu closed FLINK-28397. --- Resolution: Done > [FLIP-245] Source Supports Speculative Execution For Batch Job > -- > > Key: FLINK-28397 > URL: https://issues.apache.org/jira/browse/FLINK-28397 > Project: Flink > Issue Type: New Feature > Components: Runtime / Coordination >Affects Versions: 1.16.0 >Reporter: Jing Zhang >Assignee: Jing Zhang >Priority: Major > Fix For: 1.16.0 > > > This is the umbrella ticket of > [FLIP-245|https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job]. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] SteNicholas commented on pull request #266: [FLINK-28967] Fix Hive multi-version support for Table Store
SteNicholas commented on PR #266: URL: https://github.com/apache/flink-table-store/pull/266#issuecomment-1216113060 @tsreaper @JingsongLi , does the NOTICE file of this module need to be updated? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-28981) Release Testing: Verify FLIP-245 sources speculative execution
Zhu Zhu created FLINK-28981: --- Summary: Release Testing: Verify FLIP-245 sources speculative execution Key: FLINK-28981 URL: https://issues.apache.org/jira/browse/FLINK-28981 Project: Flink Issue Type: Sub-task Components: Connectors / Common, Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.16.0 Speculative execution is introduced in Flink 1.16 to deal with temporary slow tasks caused by slow nodes. This feature currently consists of 4 FLIPs: - FLIP-168: Speculative Execution core part - FLIP-224: Blocklist Mechanism - FLIP-245: Source Supports Speculative Execution - FLIP-249: Flink Web UI Enhancement for Speculative Execution This ticket aims for verifying FLIP-245, along with FLIP-168, FLIP-224 and FLIP-249. More details about this feature and how to use it can be found in this documentation [PR|https://github.com/apache/flink/pull/20507]. To do the verification, the process can be: - Write Flink jobs which has some {{source}} subtasks running much slower than others. 3 kinds of sources should be verified, including - [Source functions|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java] - [InputFormat sources|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java] - [FLIP-27 new sources|https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java] - Modify Flink configuration file to enable speculative execution and tune the configuration as you like - Submit the job. Checking the web UI, logs, metrics and produced result. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution
[ https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-28980: Description: Speculative execution is introduced in Flink 1.16 to deal with temporary slow tasks caused by slow nodes. This feature currently consists of 4 FLIPs: - FLIP-168: Speculative Execution core part - FLIP-224: Blocklist Mechanism - FLIP-245: Source Supports Speculative Execution - FLIP-249: Flink Web UI Enhancement for Speculative Execution This ticket aims for verifying FLIP-168, along with FLIP-224 and FLIP-249. More details about this feature and how to use it can be found in this documentation [PR|https://github.com/apache/flink/pull/20507]. To do the verification, the process can be: - Write a Flink job which has a subtask running much slower than others (e.g. sleep indefinitely if it runs on a certain host, the hostname can be retrieved via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + attemptNumer) % 2 == 0) - Modify Flink configuration file to enable speculative execution and tune the configuration as you like - Submit the job. Checking the web UI, logs, metrics and produced result. was: Speculative execution is introduced in Flink 1.16 to deal with temporary slow tasks caused by slow nodes. This feature currently consists of 4 FLIPs: - FLIP-168: Speculative Execution core part - FLIP-224: Blocklist Mechanism - FLIP-245: Source Supports Speculative Execution - FLIP-249: Flink Web UI Enhancement for Speculative Execution This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249. More details about this feature and how to use it can be found in this documentation [PR|https://github.com/apache/flink/pull/20507]. To do the verification, the process can be: - Write a Flink job which has a subtask running much slower than others (e.g. sleep indefinitely if it runs on a certain host, the hostname can be retrieved via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + attemptNumer) % 2 == 0) - Modify Flink configuration file to enable speculative execution and tune the configuration as you like - Submit the job. Checking the web UI, logs, metrics and produced result. > Release Testing: Verify FLIP-168 speculative execution > -- > > Key: FLINK-28980 > URL: https://issues.apache.org/jira/browse/FLINK-28980 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Zhu Zhu >Priority: Major > Fix For: 1.16.0 > > > Speculative execution is introduced in Flink 1.16 to deal with temporary slow > tasks caused by slow nodes. This feature currently consists of 4 FLIPs: > - FLIP-168: Speculative Execution core part > - FLIP-224: Blocklist Mechanism > - FLIP-245: Source Supports Speculative Execution > - FLIP-249: Flink Web UI Enhancement for Speculative Execution > This ticket aims for verifying FLIP-168, along with FLIP-224 and FLIP-249. > More details about this feature and how to use it can be found in this > documentation [PR|https://github.com/apache/flink/pull/20507]. > To do the verification, the process can be: > - Write a Flink job which has a subtask running much slower than others > (e.g. sleep indefinitely if it runs on a certain host, the hostname can be > retrieved via InetAddress.getLocalHost().getHostName(), or if its > (subtaskIndex + attemptNumer) % 2 == 0) > - Modify Flink configuration file to enable speculative execution and tune > the configuration as you like > - Submit the job. Checking the web UI, logs, metrics and produced result. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28920) Release Testing: Verify Python DataStream Window
[ https://issues.apache.org/jira/browse/FLINK-28920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580026#comment-17580026 ] Huang Xingbo commented on FLINK-28920: -- Thanks [~ana4] for the test. I have assigned it to you. If you encountered any problem, feel free to contact me. > Release Testing: Verify Python DataStream Window > > > Key: FLINK-28920 > URL: https://issues.apache.org/jira/browse/FLINK-28920 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: Luning Wang >Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > * Build flink source code and compile source code > {code:bash} > $ cd {flink-source-code} > $ mvn clean install -DskipTests > {code} > * Prepare a Python Virtual Environment > {code:bash} > $ cd flink-python/dev > $ ./lint-python.sh -s basic > $ source .conda/bin/activate > {code} > * Install PyFlink from source code. For more details, you can refer to the > [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink] > {code:bash} > $ cd flink-python/apache-flink-libraries > $ python setup.py sdist > $ pip install dist/*.tar.gz > $ cd .. > $ pip install -r dev/dev-requirements.txt > $ python setpy.py sdist > $ pip install dist/*.tar.gz > {code} > h1. Test > * Write a python datastream window job in thread mode. For details of > Window, you can refer to the > [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/]. > {code:python} > from typing import Tuple > from pyflink.common import Configuration > from pyflink.common.time import Time > from pyflink.common.typeinfo import Types > from pyflink.common.watermark_strategy import WatermarkStrategy, > TimestampAssigner > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.datastream.functions import AggregateFunction > from pyflink.datastream.window import EventTimeSessionWindows > class SecondColumnTimestampAssigner(TimestampAssigner): > def extract_timestamp(self, value, record_timestamp) -> int: > return int(value[1]) > def main(): > config = Configuration() > # thread mode > config.set_string("python.execution-mode", "thread") > # process mode > # config.set_string("python.execution-mode", "process") > env = StreamExecutionEnvironment.get_execution_environment(config) > data_stream = env.from_collection([ > ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', > 15)], > type_info=Types.TUPLE([Types.STRING(), Types.INT()])) # type: > DataStream > watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > class MyAggregateFunction(AggregateFunction): > def create_accumulator(self) -> Tuple[int, str]: > return 0, '' > def add(self, value: Tuple[str, int], accumulator: Tuple[int, str]) > -> Tuple[int, str]: > return value[1] + accumulator[0], value[0] > def get_result(self, accumulator: Tuple[str, int]): > return accumulator[1], accumulator[0] > def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]): > return acc_a[0] + acc_b[0], acc_a[1] > ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ > .key_by(lambda x: x[0], key_type=Types.STRING()) \ > .window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \ > .aggregate(MyAggregateFunction(), >accumulator_type=Types.TUPLE([Types.INT(), > Types.STRING()]), >output_type=Types.TUPLE([Types.STRING(), Types.INT()])) > ds.print() > env.execute('test_window_aggregate_accumulator_type') > if __name__ == '__main__': > main() > {code} > * run the python datastream window job and watch the result > {code:bash} > $ python demo.py > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28918) Release Testing: Verify FLIP-206 in Python DataStream API
[ https://issues.apache.org/jira/browse/FLINK-28918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580027#comment-17580027 ] Huang Xingbo commented on FLINK-28918: -- Thanks [~ana4] for the test. I have assigned it to you. If you encountered any problem, feel free to contact me. > Release Testing: Verify FLIP-206 in Python DataStream API > - > > Key: FLINK-28918 > URL: https://issues.apache.org/jira/browse/FLINK-28918 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: Luning Wang >Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > * Build flink source code and compile source code > {code:bash} > $ cd {flink-source-code} > $ mvn clean install -DskipTests > {code} > * Prepare a Python Virtual Environment > {code:bash} > $ cd flink-python/dev > $ ./lint-python.sh -s basic > $ source .conda/bin/activate > {code} > * Install PyFlink from source code. For more details, you can refer to the > [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink] > {code:bash} > $ cd flink-python/apache-flink-libraries > $ python setup.py sdist > $ pip install dist/*.tar.gz > $ cd .. > $ pip install -r dev/dev-requirements.txt > $ python setpy.py sdist > $ pip install dist/*.tar.gz > {code} > h1. Test > * Write a python datastream job in thread mode > {code:python} > from pyflink.common import Configuration > from pyflink.common.typeinfo import Types > from pyflink.datastream import StreamExecutionEnvironment > def main(): > config = Configuration() > config.set_string("python.execution-mode", "thread") > env = StreamExecutionEnvironment.get_execution_environment(config) > ds = env.from_collection( > [(1, '9', 0), (1, '5', 1), (1, '6', 2), (5, '5', 0), (5, '3', 1)], > type_info=Types.ROW_NAMED(["v1", "v2", "v3"], > [Types.INT(), Types.STRING(), Types.INT()])) > def flat_map_func1(data): > for i in data: > yield int(i), 1 > def flat_map_func2(data): > for i in data: > yield i > ds = ds.key_by(lambda x: x[0]) \ > .min_by("v2") \ > .map(lambda x: (x[0], x[1], x[2]), > output_type=Types.TUPLE([Types.INT(), Types.STRING(), > Types.INT()])) \ > .key_by(lambda x: x[2]) \ > .max_by(0) \ > .flat_map(flat_map_func1, output_type=Types.TUPLE([Types.INT(), > Types.INT()])) \ > .key_by(lambda x: [1]) \ > .min_by() \ > .flat_map(flat_map_func2, output_type=Types.INT()) \ > .key_by(lambda x: x) \ > .max_by() > ds.print() > env.execute("key_by_min_by_max_by_test_batch") > if __name__ == '__main__': > main() > {code} > * run the python datastream job and watch the result > {code:bash} > $ python demo.py > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-28920) Release Testing: Verify Python DataStream Window
[ https://issues.apache.org/jira/browse/FLINK-28920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo reassigned FLINK-28920: Assignee: Luning Wang > Release Testing: Verify Python DataStream Window > > > Key: FLINK-28920 > URL: https://issues.apache.org/jira/browse/FLINK-28920 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: Luning Wang >Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > * Build flink source code and compile source code > {code:bash} > $ cd {flink-source-code} > $ mvn clean install -DskipTests > {code} > * Prepare a Python Virtual Environment > {code:bash} > $ cd flink-python/dev > $ ./lint-python.sh -s basic > $ source .conda/bin/activate > {code} > * Install PyFlink from source code. For more details, you can refer to the > [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink] > {code:bash} > $ cd flink-python/apache-flink-libraries > $ python setup.py sdist > $ pip install dist/*.tar.gz > $ cd .. > $ pip install -r dev/dev-requirements.txt > $ python setpy.py sdist > $ pip install dist/*.tar.gz > {code} > h1. Test > * Write a python datastream window job in thread mode. For details of > Window, you can refer to the > [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/]. > {code:python} > from typing import Tuple > from pyflink.common import Configuration > from pyflink.common.time import Time > from pyflink.common.typeinfo import Types > from pyflink.common.watermark_strategy import WatermarkStrategy, > TimestampAssigner > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.datastream.functions import AggregateFunction > from pyflink.datastream.window import EventTimeSessionWindows > class SecondColumnTimestampAssigner(TimestampAssigner): > def extract_timestamp(self, value, record_timestamp) -> int: > return int(value[1]) > def main(): > config = Configuration() > # thread mode > config.set_string("python.execution-mode", "thread") > # process mode > # config.set_string("python.execution-mode", "process") > env = StreamExecutionEnvironment.get_execution_environment(config) > data_stream = env.from_collection([ > ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', > 15)], > type_info=Types.TUPLE([Types.STRING(), Types.INT()])) # type: > DataStream > watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > class MyAggregateFunction(AggregateFunction): > def create_accumulator(self) -> Tuple[int, str]: > return 0, '' > def add(self, value: Tuple[str, int], accumulator: Tuple[int, str]) > -> Tuple[int, str]: > return value[1] + accumulator[0], value[0] > def get_result(self, accumulator: Tuple[str, int]): > return accumulator[1], accumulator[0] > def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]): > return acc_a[0] + acc_b[0], acc_a[1] > ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ > .key_by(lambda x: x[0], key_type=Types.STRING()) \ > .window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \ > .aggregate(MyAggregateFunction(), >accumulator_type=Types.TUPLE([Types.INT(), > Types.STRING()]), >output_type=Types.TUPLE([Types.STRING(), Types.INT()])) > ds.print() > env.execute('test_window_aggregate_accumulator_type') > if __name__ == '__main__': > main() > {code} > * run the python datastream window job and watch the result > {code:bash} > $ python demo.py > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-28918) Release Testing: Verify FLIP-206 in Python DataStream API
[ https://issues.apache.org/jira/browse/FLINK-28918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo reassigned FLINK-28918: Assignee: Luning Wang > Release Testing: Verify FLIP-206 in Python DataStream API > - > > Key: FLINK-28918 > URL: https://issues.apache.org/jira/browse/FLINK-28918 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: Luning Wang >Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > * Build flink source code and compile source code > {code:bash} > $ cd {flink-source-code} > $ mvn clean install -DskipTests > {code} > * Prepare a Python Virtual Environment > {code:bash} > $ cd flink-python/dev > $ ./lint-python.sh -s basic > $ source .conda/bin/activate > {code} > * Install PyFlink from source code. For more details, you can refer to the > [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink] > {code:bash} > $ cd flink-python/apache-flink-libraries > $ python setup.py sdist > $ pip install dist/*.tar.gz > $ cd .. > $ pip install -r dev/dev-requirements.txt > $ python setpy.py sdist > $ pip install dist/*.tar.gz > {code} > h1. Test > * Write a python datastream job in thread mode > {code:python} > from pyflink.common import Configuration > from pyflink.common.typeinfo import Types > from pyflink.datastream import StreamExecutionEnvironment > def main(): > config = Configuration() > config.set_string("python.execution-mode", "thread") > env = StreamExecutionEnvironment.get_execution_environment(config) > ds = env.from_collection( > [(1, '9', 0), (1, '5', 1), (1, '6', 2), (5, '5', 0), (5, '3', 1)], > type_info=Types.ROW_NAMED(["v1", "v2", "v3"], > [Types.INT(), Types.STRING(), Types.INT()])) > def flat_map_func1(data): > for i in data: > yield int(i), 1 > def flat_map_func2(data): > for i in data: > yield i > ds = ds.key_by(lambda x: x[0]) \ > .min_by("v2") \ > .map(lambda x: (x[0], x[1], x[2]), > output_type=Types.TUPLE([Types.INT(), Types.STRING(), > Types.INT()])) \ > .key_by(lambda x: x[2]) \ > .max_by(0) \ > .flat_map(flat_map_func1, output_type=Types.TUPLE([Types.INT(), > Types.INT()])) \ > .key_by(lambda x: [1]) \ > .min_by() \ > .flat_map(flat_map_func2, output_type=Types.INT()) \ > .key_by(lambda x: x) \ > .max_by() > ds.print() > env.execute("key_by_min_by_max_by_test_batch") > if __name__ == '__main__': > main() > {code} > * run the python datastream job and watch the result > {code:bash} > $ python demo.py > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution
[ https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-28980: Description: Speculative execution is introduced in Flink 1.16 to deal with temporary slow tasks caused by slow nodes. This feature currently consists of 4 FLIPs: - FLIP-168: Speculative Execution core part - FLIP-224: Blocklist Mechanism - FLIP-245: Source Supports Speculative Execution - FLIP-249: Flink Web UI Enhancement for Speculative Execution This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249. More details about this feature and how to use it can be found in this documentation [PR|https://github.com/apache/flink/pull/20507]. To do the verification, the process can be: - Write a Flink job which has a subtask running much slower than others (e.g. sleep indefinitely if it runs on a certain host, the hostname can be retrieved via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + attemptNumer) % 2 == 0) - Modify Flink configuration file to enable speculative execution and tune the configuration as you like - Submit the job. Checking the web UI, logs, metrics and produced result. was: Speculative execution is introduced in Flink 1.16 to deal with temporary slow tasks caused by slow nodes. This feature currently consists of 4 FLIPs: - FLIP-168: Speculative Execution core part - FLIP-224: Blocklist Mechanism - FLIP-245: Source Supports Speculative Execution - FLIP-249: Flink Web UI Enhancement for Speculative Execution This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249. More details about this feature and how to use it can be found in this documentation [PR|https://github.com/apache/flink/pull/20507]. To do the verification, the process can be: - Write a Flink job which has a subtask running much slower than others (e.g. sleep indefinitely if it runs on a certain host, the hostname can be retrieved via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + attemptNumer) % 2 == 0) - Modify Flink configuration file to enable speculative execution and tune the configuration as you like - Submit the job. Checking the web UI, logs, metrics and produced result. > Release Testing: Verify FLIP-168 speculative execution > -- > > Key: FLINK-28980 > URL: https://issues.apache.org/jira/browse/FLINK-28980 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Zhu Zhu >Priority: Major > Fix For: 1.16.0 > > > Speculative execution is introduced in Flink 1.16 to deal with temporary slow > tasks caused by slow nodes. This feature currently consists of 4 FLIPs: > - FLIP-168: Speculative Execution core part > - FLIP-224: Blocklist Mechanism > - FLIP-245: Source Supports Speculative Execution > - FLIP-249: Flink Web UI Enhancement for Speculative Execution > This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249. > More details about this feature and how to use it can be found in this > documentation [PR|https://github.com/apache/flink/pull/20507]. > To do the verification, the process can be: > - Write a Flink job which has a subtask running much slower than others > (e.g. sleep indefinitely if it runs on a certain host, the hostname can be > retrieved via InetAddress.getLocalHost().getHostName(), or if its > (subtaskIndex + attemptNumer) % 2 == 0) > - Modify Flink configuration file to enable speculative execution and tune > the configuration as you like > - Submit the job. Checking the web UI, logs, metrics and produced result. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution
[ https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-28980: Description: Speculative execution is introduced in Flink 1.16 to deal with temporary slow tasks caused by slow nodes. This feature currently consists of 4 FLIPs: - FLIP-168: Speculative Execution core part - FLIP-224: Blocklist Mechanism - FLIP-245: Source Supports Speculative Execution - FLIP-249: Flink Web UI Enhancement for Speculative Execution This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249. More details about this feature and how to use it can be found in this documentation [PR|https://github.com/apache/flink/pull/20507]. To do the verification, the process can be: - Write a Flink job which has a subtask running much slower than others (e.g. sleep indefinitely if it runs on a certain host, the hostname can be retrieved via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + attemptNumer) % 2 == 0) - Modify Flink configuration file to enable speculative execution and tune the configuration as you like - Submit the job. Checking the web UI, logs, metrics and produced result. was: Speculative execution is introduced in Flink 1.16 to deal with temporary slow tasks caused by slow nodes. This feature currently consists of 4 FLIPs: - FLIP-168: Speculative Execution core part - FLIP-224: Blocklist Mechanism - FLIP-245: Source Supports Speculative Execution - FLIP-249: Flink Web UI Enhancement for Speculative Execution This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249. More details about this feature and how to use it can be found in this documentation [PR|https://github.com/apache/flink/pull/20507]. To do the verification, the process can be: - Write a Flink job which has a subtask running much slower than others (e.g. sleep indefinitely if it runs on a certain host, the hostname can be retrieved via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + attemptNumer) % 2 == 0) - Modify Flink configuration file to enable speculative execution and tune the configuration as you like - Submit the job. Checking the web UI, logs, metrics and produced result. > Release Testing: Verify FLIP-168 speculative execution > -- > > Key: FLINK-28980 > URL: https://issues.apache.org/jira/browse/FLINK-28980 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Zhu Zhu >Priority: Major > Fix For: 1.16.0 > > > Speculative execution is introduced in Flink 1.16 to deal with temporary slow > tasks caused by slow nodes. This feature currently consists of 4 FLIPs: > - FLIP-168: Speculative Execution core part > - FLIP-224: Blocklist Mechanism > - FLIP-245: Source Supports Speculative Execution > - FLIP-249: Flink Web UI Enhancement for Speculative Execution > This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249. > More details about this feature and how to use it can be found in this > documentation [PR|https://github.com/apache/flink/pull/20507]. > To do the verification, the process can be: > - Write a Flink job which has a subtask running much slower than others > (e.g. sleep indefinitely if it runs on a certain host, the hostname can be > retrieved via InetAddress.getLocalHost().getHostName(), or if its > (subtaskIndex + attemptNumer) % 2 == 0) > - Modify Flink configuration file to enable speculative execution and tune > the configuration as you like > - Submit the job. Checking the web UI, logs, metrics and produced result. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution
[ https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-28980: Description: Speculative execution is introduced in Flink 1.16 to deal with temporary slow tasks caused by slow nodes. This feature currently consists of 4 FLIPs: - FLIP-168: Speculative Execution core part - FLIP-224: Blocklist Mechanism - FLIP-245: Source Supports Speculative Execution - FLIP-249: Flink Web UI Enhancement for Speculative Execution This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249. More details about this feature and how to use it can be found in this documentation [PR|https://github.com/apache/flink/pull/20507]. To do the verification, the process can be: - Write a Flink job which has a subtask running much slower than others (e.g. sleep indefinitely if it runs on a certain host, the hostname can be retrieved via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + attemptNumer) % 2 == 0) - Modify Flink configuration file to enable speculative execution and tune the configuration as you like - Submit the job. Checking the web UI, logs, metrics and produced result. was: Speculative execution is introduced in Flink 1.16 to deal with temporary slow tasks caused by slow nodes. More details about this feature can be found in this documentation [PR|https://github.com/apache/flink/pull/20507]. This feature currently consists of 4 FLIPs: - FLIP-168: Speculative Execution core part - FLIP-224: Blocklist Mechanism - FLIP-245: Source Supports Speculative Execution - FLIP-249: Flink Web UI Enhancement for Speculative Execution This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249. To do the verification, the process can be: - Write a Flink job which has a subtask running much slower than others (e.g. sleep indefinitely if it runs on a certain host, the hostname can be retrieved via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + attemptNumer) % 2 == 0) - Modify Flink configuration file to enable speculative execution and tune the configuration as you like - Submit the job. Checking the web UI, logs, metrics and produced result. > Release Testing: Verify FLIP-168 speculative execution > -- > > Key: FLINK-28980 > URL: https://issues.apache.org/jira/browse/FLINK-28980 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Zhu Zhu >Priority: Major > Fix For: 1.16.0 > > > Speculative execution is introduced in Flink 1.16 to deal with temporary slow > tasks caused by slow nodes. > This feature currently consists of 4 FLIPs: > - FLIP-168: Speculative Execution core part > - FLIP-224: Blocklist Mechanism > - FLIP-245: Source Supports Speculative Execution > - FLIP-249: Flink Web UI Enhancement for Speculative Execution > This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249. > More details about this feature and how to use it can be found in this > documentation [PR|https://github.com/apache/flink/pull/20507]. > To do the verification, the process can be: > - Write a Flink job which has a subtask running much slower than others > (e.g. sleep indefinitely if it runs on a certain host, the hostname can be > retrieved via InetAddress.getLocalHost().getHostName(), or if its > (subtaskIndex + attemptNumer) % 2 == 0) > - Modify Flink configuration file to enable speculative execution and tune > the configuration as you like > - Submit the job. Checking the web UI, logs, metrics and produced result. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28920) Release Testing: Verify Python DataStream Window
[ https://issues.apache.org/jira/browse/FLINK-28920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580025#comment-17580025 ] Luning Wang commented on FLINK-28920: - I will test this on M1 Chip. > Release Testing: Verify Python DataStream Window > > > Key: FLINK-28920 > URL: https://issues.apache.org/jira/browse/FLINK-28920 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > * Build flink source code and compile source code > {code:bash} > $ cd {flink-source-code} > $ mvn clean install -DskipTests > {code} > * Prepare a Python Virtual Environment > {code:bash} > $ cd flink-python/dev > $ ./lint-python.sh -s basic > $ source .conda/bin/activate > {code} > * Install PyFlink from source code. For more details, you can refer to the > [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink] > {code:bash} > $ cd flink-python/apache-flink-libraries > $ python setup.py sdist > $ pip install dist/*.tar.gz > $ cd .. > $ pip install -r dev/dev-requirements.txt > $ python setpy.py sdist > $ pip install dist/*.tar.gz > {code} > h1. Test > * Write a python datastream window job in thread mode. For details of > Window, you can refer to the > [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/]. > {code:python} > from typing import Tuple > from pyflink.common import Configuration > from pyflink.common.time import Time > from pyflink.common.typeinfo import Types > from pyflink.common.watermark_strategy import WatermarkStrategy, > TimestampAssigner > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.datastream.functions import AggregateFunction > from pyflink.datastream.window import EventTimeSessionWindows > class SecondColumnTimestampAssigner(TimestampAssigner): > def extract_timestamp(self, value, record_timestamp) -> int: > return int(value[1]) > def main(): > config = Configuration() > # thread mode > config.set_string("python.execution-mode", "thread") > # process mode > # config.set_string("python.execution-mode", "process") > env = StreamExecutionEnvironment.get_execution_environment(config) > data_stream = env.from_collection([ > ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', > 15)], > type_info=Types.TUPLE([Types.STRING(), Types.INT()])) # type: > DataStream > watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > class MyAggregateFunction(AggregateFunction): > def create_accumulator(self) -> Tuple[int, str]: > return 0, '' > def add(self, value: Tuple[str, int], accumulator: Tuple[int, str]) > -> Tuple[int, str]: > return value[1] + accumulator[0], value[0] > def get_result(self, accumulator: Tuple[str, int]): > return accumulator[1], accumulator[0] > def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]): > return acc_a[0] + acc_b[0], acc_a[1] > ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ > .key_by(lambda x: x[0], key_type=Types.STRING()) \ > .window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \ > .aggregate(MyAggregateFunction(), >accumulator_type=Types.TUPLE([Types.INT(), > Types.STRING()]), >output_type=Types.TUPLE([Types.STRING(), Types.INT()])) > ds.print() > env.execute('test_window_aggregate_accumulator_type') > if __name__ == '__main__': > main() > {code} > * run the python datastream window job and watch the result > {code:bash} > $ python demo.py > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28918) Release Testing: Verify FLIP-206 in Python DataStream API
[ https://issues.apache.org/jira/browse/FLINK-28918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580024#comment-17580024 ] Luning Wang commented on FLINK-28918: - I will test this on M1 Chip. > Release Testing: Verify FLIP-206 in Python DataStream API > - > > Key: FLINK-28918 > URL: https://issues.apache.org/jira/browse/FLINK-28918 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > * Build flink source code and compile source code > {code:bash} > $ cd {flink-source-code} > $ mvn clean install -DskipTests > {code} > * Prepare a Python Virtual Environment > {code:bash} > $ cd flink-python/dev > $ ./lint-python.sh -s basic > $ source .conda/bin/activate > {code} > * Install PyFlink from source code. For more details, you can refer to the > [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink] > {code:bash} > $ cd flink-python/apache-flink-libraries > $ python setup.py sdist > $ pip install dist/*.tar.gz > $ cd .. > $ pip install -r dev/dev-requirements.txt > $ python setpy.py sdist > $ pip install dist/*.tar.gz > {code} > h1. Test > * Write a python datastream job in thread mode > {code:python} > from pyflink.common import Configuration > from pyflink.common.typeinfo import Types > from pyflink.datastream import StreamExecutionEnvironment > def main(): > config = Configuration() > config.set_string("python.execution-mode", "thread") > env = StreamExecutionEnvironment.get_execution_environment(config) > ds = env.from_collection( > [(1, '9', 0), (1, '5', 1), (1, '6', 2), (5, '5', 0), (5, '3', 1)], > type_info=Types.ROW_NAMED(["v1", "v2", "v3"], > [Types.INT(), Types.STRING(), Types.INT()])) > def flat_map_func1(data): > for i in data: > yield int(i), 1 > def flat_map_func2(data): > for i in data: > yield i > ds = ds.key_by(lambda x: x[0]) \ > .min_by("v2") \ > .map(lambda x: (x[0], x[1], x[2]), > output_type=Types.TUPLE([Types.INT(), Types.STRING(), > Types.INT()])) \ > .key_by(lambda x: x[2]) \ > .max_by(0) \ > .flat_map(flat_map_func1, output_type=Types.TUPLE([Types.INT(), > Types.INT()])) \ > .key_by(lambda x: [1]) \ > .min_by() \ > .flat_map(flat_map_func2, output_type=Types.INT()) \ > .key_by(lambda x: x) \ > .max_by() > ds.print() > env.execute("key_by_min_by_max_by_test_batch") > if __name__ == '__main__': > main() > {code} > * run the python datastream job and watch the result > {code:bash} > $ python demo.py > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution
[ https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-28980: Description: Speculative execution is introduced in Flink 1.16 to deal with temporary slow tasks caused by slow nodes. More details about this feature can be found in this documentation [PR|https://github.com/apache/flink/pull/20507]. This feature currently consists of 4 FLIPs: - FLIP-168: Speculative Execution core part - FLIP-224: Blocklist Mechanism - FLIP-245: Source Supports Speculative Execution - FLIP-249: Flink Web UI Enhancement for Speculative Execution This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249. To do the verification, the process can be: - Write a Flink job which has a subtask running much slower than others (e.g. sleep indefinitely if it runs on a certain host, the hostname can be retrieved via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + attemptNumer) % 2 == 0) - Modify Flink configuration file to enable speculative execution and tune the configuration as you like - Submit the job. Checking the web UI, logs, metrics and produced result. was: Speculative execution is introduced in Flink 1.16 to deal with temporary slow tasks caused by slow nodes. More details about this feature can be found in this documentation [PR|https://github.com/apache/flink/pull/20507]. This feature currently consists of 4 FLIPs: - FLIP-168: Speculative Execution core part - FLIP-224: Blocklist Mechanism - FLIP-245: Source Supports Speculative Execution - FLIP-249: Flink Web UI Enhancement for Speculative Execution This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249. To do the verification, the process can be: - Write a Flink job which has a subtask running much slower than others (e.g. sleep indefinitely if it runs on a certain host, the hostname can be retrieved via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + attemptNumer) % 2 == 0) - Modify Flink configuration file to enable speculative execution and tune the configuration as you like - Submit the job. Checking the web UI, logs, metrics and produced result. > Release Testing: Verify FLIP-168 speculative execution > -- > > Key: FLINK-28980 > URL: https://issues.apache.org/jira/browse/FLINK-28980 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Zhu Zhu >Priority: Major > Fix For: 1.16.0 > > > Speculative execution is introduced in Flink 1.16 to deal with temporary slow > tasks caused by slow nodes. More details about this feature can be found in > this documentation [PR|https://github.com/apache/flink/pull/20507]. > This feature currently consists of 4 FLIPs: > - FLIP-168: Speculative Execution core part > - FLIP-224: Blocklist Mechanism > - FLIP-245: Source Supports Speculative Execution > - FLIP-249: Flink Web UI Enhancement for Speculative Execution > This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249. > To do the verification, the process can be: > - Write a Flink job which has a subtask running much slower than others > (e.g. sleep indefinitely if it runs on a certain host, the hostname can be > retrieved via InetAddress.getLocalHost().getHostName(), or if its > (subtaskIndex + attemptNumer) % 2 == 0) > - Modify Flink configuration file to enable speculative execution and tune > the configuration as you like > - Submit the job. Checking the web UI, logs, metrics and produced result. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution
[ https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-28980: Description: Speculative execution is introduced in Flink 1.16 to deal with temporary slow tasks caused by slow nodes. More details about this feature can be found in this documentation [PR|https://github.com/apache/flink/pull/20507]. This feature currently consists of 4 FLIPs: - FLIP-168: Speculative Execution core part - FLIP-224: Blocklist Mechanism - FLIP-245: Source Supports Speculative Execution - FLIP-249: Flink Web UI Enhancement for Speculative Execution This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249. To do the verification, the process can be: - Write a Flink job which has a subtask running much slower than others (e.g. sleep indefinitely if it runs on a certain host, the hostname can be retrieved via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + attemptNumer) % 2 == 0) - Modify Flink configuration file to enable speculative execution and tune the configuration as you like - Submit the job. Checking the web UI, logs, metrics and produced result. was: Speculative execution is introduced in Flink 1.16 to deal with temporary slow tasks caused by slow nodes. More details about this feature can be found in this documentation [PR|https://github.com/apache/flink/pull/20507]. This feature currently consists of 4 FLIPs: - FLIP-168: Speculative Execution core part - FLIP-224: Blocklist Mechanism - FLIP-245: Source Supports Speculative Execution - FLIP-249: Flink Web UI Enhancement for Speculative Execution This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249. To do the verification, the process can be: - Write a Flink job which has a subtask running much slower than others (e.g. sleep indefinitely if it runs on a certain host, the hostname can be retrieved via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + attemptNumer) % 2 == 0) - Modify Flink configuration file to enable speculative execution and tune the configuration as you like - Submit the job. Checking the web UI, logs, metrics and produced result. > Release Testing: Verify FLIP-168 speculative execution > -- > > Key: FLINK-28980 > URL: https://issues.apache.org/jira/browse/FLINK-28980 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Zhu Zhu >Priority: Major > Fix For: 1.16.0 > > > Speculative execution is introduced in Flink 1.16 to deal with temporary slow > tasks caused by slow nodes. More details about this feature can be found in > this documentation [PR|https://github.com/apache/flink/pull/20507]. > This feature currently consists of 4 FLIPs: > - FLIP-168: Speculative Execution core part > - FLIP-224: Blocklist Mechanism > - FLIP-245: Source Supports Speculative Execution > - FLIP-249: Flink Web UI Enhancement for Speculative Execution > This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249. > To do the verification, the process can be: > - Write a Flink job which has a subtask running much slower than others > (e.g. sleep indefinitely if it runs on a certain host, the hostname can be > retrieved via InetAddress.getLocalHost().getHostName(), or if its > (subtaskIndex + attemptNumer) % 2 == 0) > - Modify Flink configuration file to enable speculative execution and tune > the configuration as you like > - Submit the job. Checking the web UI, logs, metrics and produced result. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution
[ https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-28980: Description: Speculative execution is introduced in Flink 1.16 to deal with temporary slow tasks caused by slow nodes. More details about this feature can be found in this documentation [PR|https://github.com/apache/flink/pull/20507]. This feature currently consists of 4 FLIPs: - FLIP-168: Speculative Execution core part - FLIP-224: Blocklist Mechanism - FLIP-245: Source Supports Speculative Execution - FLIP-249: Flink Web UI Enhancement for Speculative Execution This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249. To do the verification, the process can be: - Write a Flink job which has a subtask running much slower than others (e.g. sleep indefinitely if it runs on a certain host, the hostname can be retrieved via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + attemptNumer) % 2 == 0) - Modify Flink configuration file to enable speculative execution and tune the configuration as you like - Submit the job. Checking the web UI, logs, metrics and produced result. was: Speculative execution is introduced in Flink 1.16 to deal with temporary slow tasks caused by slow nodes. More details about this feature can be found in this documentation [PR|https://github.com/apache/flink/pull/20507]. This feature currently consists of 4 FLIPs: - FLIP-168: Speculative Execution core part - FLIP-224: Blocklist Mechanism - FLIP-245: Source Supports Speculative Execution - FLIP-249: Flink Web UI Enhancement for Speculative Execution This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249. To do the verification, the process can be: - Write a Flink job which has a subtask running much slower than others (e.g. sleep indefinitely if it runs on a certain host, the hostname can be retrieved via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + attemptNumer) % 2 == 0) - Modify Flink configuration file to enable speculative execution and tune the configuration as you like - Submit the job. Checking the web UI, logs, metrics and produced result. > Release Testing: Verify FLIP-168 speculative execution > -- > > Key: FLINK-28980 > URL: https://issues.apache.org/jira/browse/FLINK-28980 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Zhu Zhu >Priority: Major > Fix For: 1.16.0 > > > Speculative execution is introduced in Flink 1.16 to deal with temporary slow > tasks caused by slow nodes. More details about this feature can be found in > this documentation [PR|https://github.com/apache/flink/pull/20507]. > This feature currently consists of 4 FLIPs: > - FLIP-168: Speculative Execution core part > - FLIP-224: Blocklist Mechanism > - FLIP-245: Source Supports Speculative Execution > - FLIP-249: Flink Web UI Enhancement for Speculative Execution > This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249. > To do the verification, the process can be: > - Write a Flink job which has a subtask running much slower than others > (e.g. sleep indefinitely if it runs on a certain host, the hostname can be > retrieved via InetAddress.getLocalHost().getHostName(), or if its > (subtaskIndex + attemptNumer) % 2 == 0) > - Modify Flink configuration file to enable speculative execution and tune > the configuration as you like > - Submit the job. Checking the web UI, logs, metrics and produced result. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution
Zhu Zhu created FLINK-28980: --- Summary: Release Testing: Verify FLIP-168 speculative execution Key: FLINK-28980 URL: https://issues.apache.org/jira/browse/FLINK-28980 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.16.0 Speculative execution is introduced in Flink 1.16 to deal with temporary slow tasks caused by slow nodes. More details about this feature can be found in this documentation [PR|https://github.com/apache/flink/pull/20507]. This feature currently consists of 4 FLIPs: - FLIP-168: Speculative Execution core part - FLIP-224: Blocklist Mechanism - FLIP-245: Source Supports Speculative Execution - FLIP-249: Flink Web UI Enhancement for Speculative Execution This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249. To do the verification, the process can be: - Write a Flink job which has a subtask running much slower than others (e.g. sleep indefinitely if it runs on a certain host, the hostname can be retrieved via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + attemptNumer) % 2 == 0) - Modify Flink configuration file to enable speculative execution and tune the configuration as you like - Submit the job. Checking the web UI, logs, metrics and produced result. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-27535) Optimize the unit test execution time
[ https://issues.apache.org/jira/browse/FLINK-27535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-27535: --- Labels: pull-request-available (was: ) > Optimize the unit test execution time > - > > Key: FLINK-27535 > URL: https://issues.apache.org/jira/browse/FLINK-27535 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Reporter: Dong Lin >Assignee: Yunfeng Zhou >Priority: Critical > Labels: pull-request-available > > Currently `mvn package` takes 10 minutes to complete in Github actions. A lot > of time is spent in running unit tests for algorithms. For example, > LogisticRegressionTest takes 82 seconds and KMeansTest takes 43 seconds in > [1]. > This time appears to be more than expected. And it will considerably reduce > developer velocity if a developer needs to wait for hours to get test results > once we have 100+ algorithms in Flink ML. > We should understand why it takes 82 seconds to run e.g. > LogisticRegressionTest and see if there is a way to optimize the test > execution time. > [1] https://github.com/apache/flink-ml/runs/6319402103?check_suite_focus=true. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-ml] yunfengzhou-hub opened a new pull request, #143: [FLINK-27535] Optimize the unit test execution time
yunfengzhou-hub opened a new pull request, #143: URL: https://github.com/apache/flink-ml/pull/143 ## What is the purpose of the change This PR reduces the total execution time of Flink ML CIs so as to improve the efficiency of Flink ML development. This PR mainly focuses on optimizing the execution time of Java tests. Python tests might be further optimized in separate PRs. ## Brief change log - Make sure all flink-ml-lib tests create a static mini-cluster for all test cases in its class, reducing the overhead to start and shut down clusters - Add cache for maven and pip dependencies - Adjust algorithm parameters, like learning rate, to make the algorithms converge and finish faster in test cases - Disable unaligned checkpoints in test cases ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes) - Dependencies are added to `flink-ml-examples` to support running each module individually. - The public API, i.e., is any changed class annotated with @public(Evolving): (no) - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (N/A) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-28926) Release Testing: Verify flip-235 hybrid shuffle mode
[ https://issues.apache.org/jira/browse/FLINK-28926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo reassigned FLINK-28926: Assignee: Yu Chen > Release Testing: Verify flip-235 hybrid shuffle mode > > > Key: FLINK-28926 > URL: https://issues.apache.org/jira/browse/FLINK-28926 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Affects Versions: 1.16.0 >Reporter: Weijie Guo >Assignee: Yu Chen >Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > * Please refer to release note of FLINK-27862 for a list of changes need to > be verified. > * Please refer to out document for more details > [https://nightlies.apache.org/flink/flink-docs-master/docs/ops/batch/batch_shuffle|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/batch/batch_shuffle/] > * Hybrid shuffle have some known limitations: No support for Slot Sharing, > Adaptive Batch Scheduler and Speculative Execution. Please make sure you do > not using this features in testing. > * The changes should be verified only in batch execution mode. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28926) Release Testing: Verify flip-235 hybrid shuffle mode
[ https://issues.apache.org/jira/browse/FLINK-28926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580019#comment-17580019 ] Weijie Guo commented on FLINK-28926: [~Yu Chen] Thank you for your interest. Yes, you can take over the test. It should be noted that there are still some known problems that have not been completely fixed. For example, compression is not supported at present. If you encounter problems in the test, feel free to contact me~ > Release Testing: Verify flip-235 hybrid shuffle mode > > > Key: FLINK-28926 > URL: https://issues.apache.org/jira/browse/FLINK-28926 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Affects Versions: 1.16.0 >Reporter: Weijie Guo >Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > * Please refer to release note of FLINK-27862 for a list of changes need to > be verified. > * Please refer to out document for more details > [https://nightlies.apache.org/flink/flink-docs-master/docs/ops/batch/batch_shuffle|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/batch/batch_shuffle/] > * Hybrid shuffle have some known limitations: No support for Slot Sharing, > Adaptive Batch Scheduler and Speculative Execution. Please make sure you do > not using this features in testing. > * The changes should be verified only in batch execution mode. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28915) Flink Native k8s mode jar localtion support s3 schema
[ https://issues.apache.org/jira/browse/FLINK-28915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580018#comment-17580018 ] Yang Wang commented on FLINK-28915: --- Thanks [~aitozi] and [~hjw] for the volunteering. I will assign this ticket to [~hjw] and I believe we could work together. > Flink Native k8s mode jar localtion support s3 schema > -- > > Key: FLINK-28915 > URL: https://issues.apache.org/jira/browse/FLINK-28915 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes, flink-contrib >Affects Versions: 1.15.0, 1.15.1 >Reporter: hjw >Priority: Major > > As the Flink document show , local is the only supported scheme in Native k8s > deployment. > Is there have a plan to support s3 filesystem? thx. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-28915) Flink Native k8s mode jar localtion support s3 schema
[ https://issues.apache.org/jira/browse/FLINK-28915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yang Wang reassigned FLINK-28915: - Assignee: hjw > Flink Native k8s mode jar localtion support s3 schema > -- > > Key: FLINK-28915 > URL: https://issues.apache.org/jira/browse/FLINK-28915 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes, flink-contrib >Affects Versions: 1.15.0, 1.15.1 >Reporter: hjw >Assignee: hjw >Priority: Major > > As the Flink document show , local is the only supported scheme in Native k8s > deployment. > Is there have a plan to support s3 filesystem? thx. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #20589: [FLINK-28972][python][connector/pulsar] Align Start/StopCursor method to java
flinkbot commented on PR #20589: URL: https://github.com/apache/flink/pull/20589#issuecomment-1216086074 ## CI report: * c053812eb15ef64282bc24a8d735fe0a693bac00 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #139: [FLINK-28803] Add Transformer and Estimator for KBinsDiscretizer
yunfengzhou-hub commented on code in PR #139: URL: https://github.com/apache/flink-ml/pull/139#discussion_r946290131 ## flink-ml-lib/src/test/java/org/apache/flink/ml/feature/KBinsDiscretizerTest.java: ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.feature; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.ml.feature.kbinsdiscretizer.KBinsDiscretizer; +import org.apache.flink.ml.feature.kbinsdiscretizer.KBinsDiscretizerModel; +import org.apache.flink.ml.feature.kbinsdiscretizer.KBinsDiscretizerModelData; +import org.apache.flink.ml.feature.kbinsdiscretizer.KBinsDiscretizerParams; +import org.apache.flink.ml.linalg.Vector; +import org.apache.flink.ml.linalg.Vectors; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.ml.util.TestUtils; +import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.types.Row; + +import org.apache.commons.collections.IteratorUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.table.api.Expressions.$; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** Tests {@link KBinsDiscretizer} and {@link KBinsDiscretizerModel}. */ +public class KBinsDiscretizerTest extends AbstractTestBase { +@Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); +private StreamExecutionEnvironment env; +private StreamTableEnvironment tEnv; +private Table trainTable; +private Table testTable; + +// Column0 for normal cases, column1 for constant cases, column2 for numDistinct < numBins +// cases. +private static final List TRAIN_INPUT = +Arrays.asList( +Row.of(Vectors.dense(1, 10, 0)), +Row.of(Vectors.dense(1, 10, 0)), +Row.of(Vectors.dense(1, 10, 0)), +Row.of(Vectors.dense(4, 10, 0)), +Row.of(Vectors.dense(5, 10, 0)), +Row.of(Vectors.dense(6, 10, 0)), +Row.of(Vectors.dense(7, 10, 0)), +Row.of(Vectors.dense(10, 10, 0)), +Row.of(Vectors.dense(13, 10, 3))); + +private static final List TEST_INPUT = +Arrays.asList( +Row.of(Vectors.dense(-1, 0, 0)), +Row.of(Vectors.dense(1, 1, 1)), +Row.of(Vectors.dense(1.5, 1, 2)), +Row.of(Vectors.dense(5, 2, 3)), +Row.of(Vectors.dense(7.25, 3, 4)), +Row.of(Vectors.dense(13, 4, 5)), +Row.of(Vectors.dense(15, 4, 6))); + +private static final double[][] UNIFORM_MODEL_DATA = +new double[][] { +new double[] {1, 5, 9, 13}, +new double[] {Double.MIN_VALUE, Double.MAX_VALUE}, +new double[] {0, 1, 2, 3} +}; + +private static final List UNIFORM_OUTPUT = +Arrays.asList( +Row.of(Vectors.dense(0, 0, 0)), +Row.of(Vectors.dense(0, 0, 1)), +Row.of(Vectors.dense(0, 0, 2)), +Row.of(Vectors.dense(1, 0, 2)), +Row.of(Vectors.dense(1, 0, 2)), +Row.of(Vectors.dense(2, 0, 2)), +Row.of(Vectors.dense(2, 0, 2))); + +private static final List QUANTILE_OUTPUT = +Arrays.asList( +Row.of(Vectors.dense(0, 0, 0)), +Row.of(Vectors.dense(0, 0, 0)), +R
[jira] [Updated] (FLINK-28979) Add another owner into the JM deployment's owner references
[ https://issues.apache.org/jira/browse/FLINK-28979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xin Hao updated FLINK-28979: Description: This action will link the resources into the FlinkDeployment The JM deployment will look like {code:java} ownerReferences: - apiVersion: apps/v1 blockOwnerDeletion: true controller: true kind: Deployment name: xxx - apiVersion: flink.apache.org/v1beta1 blockOwnerDeletion: true controller: false kind: FlinkDeployment name: xxx{code} was: This action will link the resources into the FlinkDeployment Will look like {code:java} ownerReferences: - apiVersion: apps/v1 blockOwnerDeletion: true controller: true kind: Deployment name: xxx - apiVersion: flink.apache.org/v1beta1 blockOwnerDeletion: true controller: false kind: FlinkDeployment name: xxx{code} > Add another owner into the JM deployment's owner references > --- > > Key: FLINK-28979 > URL: https://issues.apache.org/jira/browse/FLINK-28979 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Xin Hao >Priority: Minor > > This action will link the resources into the FlinkDeployment > The JM deployment will look like > {code:java} > ownerReferences: > - apiVersion: apps/v1 > blockOwnerDeletion: true > controller: true > kind: Deployment > name: xxx > - apiVersion: flink.apache.org/v1beta1 > blockOwnerDeletion: true > controller: false > kind: FlinkDeployment > name: xxx{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28979) Add another owner into the JM deployment's owner references
Xin Hao created FLINK-28979: --- Summary: Add another owner into the JM deployment's owner references Key: FLINK-28979 URL: https://issues.apache.org/jira/browse/FLINK-28979 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Xin Hao This action will link the resources into the FlinkDeployment Will look like {code:java} ownerReferences: - apiVersion: apps/v1 blockOwnerDeletion: true controller: true kind: Deployment name: xxx - apiVersion: flink.apache.org/v1beta1 blockOwnerDeletion: true controller: false kind: FlinkDeployment name: xxx{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28972) Add methods of StartCursor and StopCursor to align the Java
[ https://issues.apache.org/jira/browse/FLINK-28972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28972: --- Labels: pull-request-available (was: ) > Add methods of StartCursor and StopCursor to align the Java > --- > > Key: FLINK-28972 > URL: https://issues.apache.org/jira/browse/FLINK-28972 > Project: Flink > Issue Type: Improvement > Components: API / Python, Connectors / Pulsar >Affects Versions: 1.14.5, 1.15.1, 1.16.0 >Reporter: Luning Wang >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0, 1.15.2, 1.14.6 > > > Add fromPublishTime in the StartCursor class > Add afterEventTime and afterPublishTime in the StopCursor class -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] deadwind4 opened a new pull request, #20589: [FLINK-28972][python][connector/pulsar] Align Start/StopCursor method to java
deadwind4 opened a new pull request, #20589: URL: https://github.com/apache/flink/pull/20589 ## What is the purpose of the change Add fromPublishTime in the StartCursor class Add afterEventTime and afterPublishTime in the StopCursor class ## Brief change log - *Add from_publish_time in the StartCursor class* - *Add after_event_time and after_publish_time in the StopCursor class* - *Add EN and CN docs* ## Verifying this change This change added tests and can be verified as follows: - *Added StopCursor event_time and publist_time test case* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (docs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org