[GitHub] [flink-table-store] tsreaper commented on pull request #267: [hotfix] Fix github workflow build-different-versions.yaml

2022-08-15 Thread GitBox


tsreaper commented on PR #267:
URL: 
https://github.com/apache/flink-table-store/pull/267#issuecomment-1216223634

   Tests passed in https://github.com/tsreaper/flink-table-store/actions


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28265) Inconsistency in Kubernetes HA service: broken state handle

2022-08-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-28265:
---
Labels: pull-request-available  (was: )

> Inconsistency in Kubernetes HA service: broken state handle
> ---
>
> Key: FLINK-28265
> URL: https://issues.apache.org/jira/browse/FLINK-28265
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Priority: Major
>  Labels: pull-request-available
> Attachments: flink_checkpoint_issue.txt
>
>
> I have a JobManager, which at some point failed to acknowledge a checkpoint:
> {code}
> Error while processing AcknowledgeCheckpoint message
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
> the pending checkpoint 193393. Failure reason: Failure to finalize checkpoint.
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1255)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1100)
>   at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>   at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> Source)
>   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source)
>   at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: 
> org.apache.flink.runtime.persistence.StateHandleStore$AlreadyExistException: 
> checkpointID-0193393 already exists in ConfigMap 
> cm--jobmanager-leader
>   at 
> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.getKeyAlreadyExistException(KubernetesStateHandleStore.java:534)
>   at 
> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.lambda$addAndLock$0(KubernetesStateHandleStore.java:155)
>   at 
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$attemptCheckAndUpdateConfigMap$11(Fabric8FlinkKubeClient.java:316)
>   at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
> Source)
>   ... 3 common frames omitted
> {code}
> the JobManager creates subsequent checkpoints successfully.
> Upon failure, it tries to recover this checkpoint (0193393), but 
> fails to do so because of:
> {code}
> Caused by: org.apache.flink.util.FlinkException: Could not retrieve 
> checkpoint 193393 from state handle under checkpointID-0193393. 
> This indicates that the retrieved state handle is broken. Try cleaning the 
> state handle store ... Caused by: java.io.FileNotFoundException: No such file 
> or directory: s3://xxx/flink-ha/xxx/completedCheckpoint72e30229420c
> {code}
> I'm running Flink 1.14.4.
> Note: This issue has been first discussed here: 
> https://github.com/apache/flink/pull/15832#pullrequestreview-1005973050 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] wangyang0918 opened a new pull request, #20590: [FLINK-28265][k8s] Do not discard state when the AlreadyExistException is caused by retries

2022-08-15 Thread GitBox


wangyang0918 opened a new pull request, #20590:
URL: https://github.com/apache/flink/pull/20590

   
   
   ## What is the purpose of the change
   
   If something is temporarily wrong with the JobManager network, 
`Fabric8FlinkKubeClient#checkAndUpdateConfigMap` failed with 
`KubernetesException` in the first run and retried again. However, the http 
request is actually sent successfully and handled by the K8s APIServer, which 
means the entry was added to the ConfigMap. This will cause the second retry 
fails with `AlreadyExistException` and then discard the state. If the 
JobManager crashed exactly, it will throw the `FileNotFoundException: No such 
file or directory: s3://xxx/flink-ha/xxx/completedCheckpoint72e30229420c` in 
the following attempts since added entry is not cleaned up.
   
   By make the `AlreadyExistException` in 
`KubernetesStateHandleStore#addAndLock` caused by 
`PossibleInconsistentStateException` we could avoid discarding the state.
   
   ## Brief change log
   
   *  Do not discard state when the AlreadyExistException is caused by retries
   
   
   ## Verifying this change
   
   * Add a new unit test 
`testAddWithAlreadyExistExceptionCausedByRetriesShouldNotDiscardState`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (**yes** / no / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28986) UNNEST function with nested fails to generate plan

2022-08-15 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580082#comment-17580082
 ] 

godfrey he commented on FLINK-28986:


[~qingyue] Thanks for reporting this, assign to you 

> UNNEST function with nested fails to generate plan
> --
>
> Key: FLINK-28986
> URL: https://issues.apache.org/jira/browse/FLINK-28986
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
> Attachments: image-2022-08-16-14-36-07-061.png
>
>
> h3. How to reproduce
> add the following case to TableEnvironmentITCase
> {code:scala}
> @Test
> def debug(): Unit = {
>   tEnv.executeSql(
> s"""
>|CREATE TEMPORARY TABLE source_kafka_wip_his_all (
>|  GUID varchar,
>|  OPERATION varchar,
>|  PRODUCTID varchar,
>|  LOTNO varchar,
>|  SERIALNO varchar,
>|  QUERYSERIALNO varchar,
>|  SERIALNO1 varchar,
>|  SERIALNO2 varchar,
>|  WIPORDERNO varchar,
>|  WIPORDERTYPE varchar,
>|  VIRTUALLOT varchar,
>|  PREOPERATION varchar,
>|  NORMALPREOPERATION varchar,
>|  PROCESSID varchar,
>|  EQUIPMENT varchar,
>|  INBOUNDDATE varchar,
>|  OUTBOUNDDATE varchar,
>|  REWORK varchar,
>|  REWORKPROCESSID varchar,
>|  CONTAINER varchar,
>|  WIPCONTENTCLASSID varchar,
>|  STATUSCODE varchar,
>|  WIPSTATUS varchar,
>|  TESTPROCESSID varchar,
>|  TESTORDERTYPE varchar,
>|  TESTORDER varchar,
>|  TEST varchar,
>|  SORTINGPROCESSID varchar,
>|  SORTINGORDERTYPE varchar,
>|  SORTINGORDER varchar,
>|  SORTING varchar,
>|  MINO varchar,
>|  GROUPCODE varchar,
>|  HIGHLOWGROUP varchar,
>|  PRODUCTNO varchar,
>|  FACILITY varchar,
>|  WIPLINE varchar,
>|  CHILDEQUCODE varchar,
>|  STATION varchar,
>|  QTY varchar,
>|  PASS_FLAG varchar,
>|  DEFECTCODELIST varchar,
>|  ISFIRST varchar,
>|  PARALIST ARRAY string,OPERATION string,REWORKPROCESSID string,CHARACTERISTIC 
> string,CHARACTERISTICREVISION string,CHARACTERISTICTYPE 
> string,CHARACTERISTICCLASS string,UPPERCONTROLLIMIT string,TARGETVALUE 
> string,LOWERCONTROLLIMIT string,TESTVALUE string,TESTATTRIBUTE 
> string,TESTINGSTARTDATE string,TESTFINISHDATE string,UOMCODE 
> string,DEFECTCODE string,SPECPARAMID string,STATION string,GP_TIME 
> string,REFERENCEID string,LASTUPDATEON string,LASTUPDATEDBY string,CREATEDON 
> string,CREATEDBY string,ACTIVE string,LASTDELETEON string,LASTDELETEDBY 
> string,LASTREACTIVATEON string,LASTREACTIVATEDBY string,ARCHIVEID 
> string,LASTARCHIVEON string,LASTARCHIVEDBY string,LASTRESTOREON 
> string,LASTRESTOREDBY string,ROWVERSIONSTAMP string)>,
>|  REFERENCEID varchar,
>|  LASTUPDATEON varchar,
>|  LASTUPDATEDBY varchar,
>|  CREATEDON varchar,
>|  CREATEDBY varchar,
>|  ACTIVE varchar,
>|  LASTDELETEON varchar,
>|  LASTDELETEDBY varchar,
>|  LASTREACTIVATEON varchar,
>|  LASTREACTIVATEDBY varchar,
>|  ARCHIVEID varchar,
>|  LASTARCHIVEON varchar,
>|  LASTARCHIVEDBY varchar,
>|  LASTRESTOREON varchar,
>|  LASTRESTOREDBY varchar,
>|  ROWVERSIONSTAMP varchar,
>|  proctime as PROCTIME()
>|  ) with (
>|  'connector' = 'datagen'
>|)
>|""".stripMargin)
>   tEnv.executeSql(
> s"""
>|create TEMPORARY view transform_main_data as
>|select
>|  r.GUID as wip_his_guid,
>|  r.EQUIPMENT as equipment,
>|  r.WIPLINE as wipline,
>|  r.STATION as station,
>|  cast(r.PROCESSID as decimal) as processid,
>|  r.PRODUCTNO as productno,
>|  t.TESTFINISHDATE as testfinishdate,
>|  t.OPERATION as operation,
>|  t.CHARACTERISTIC as characteristic,
>|  t.LOWERCONTROLLIMIT as lowercontrollimit,
>|  t.UPPERCONTROLLIMIT as uppercontrollimit,
>|  t.TARGETVALUE as targetvalue,
>|  t.DEFECTCODE as defectcode,
>|  t.TESTVALUE as testvalue,
>|  t.CHARACTERISTICTYPE as characteristictype,
>|  proctime
>|  from
>|  (select
>|  GUID,
>|  EQUIPMENT,
>|  WIPLINE,
>|  STATION,
>|  PROCESSID,
>|  PRODUCTNO,
>|  PARALIST,
>|  proctime
>|  FROM source_kafka_wip_his_all) r
>|  cross join
> 

[jira] [Assigned] (FLINK-28986) UNNEST function with nested fails to generate plan

2022-08-15 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he reassigned FLINK-28986:
--

Assignee: Jane Chan

> UNNEST function with nested fails to generate plan
> --
>
> Key: FLINK-28986
> URL: https://issues.apache.org/jira/browse/FLINK-28986
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
> Attachments: image-2022-08-16-14-36-07-061.png
>
>
> h3. How to reproduce
> add the following case to TableEnvironmentITCase
> {code:scala}
> @Test
> def debug(): Unit = {
>   tEnv.executeSql(
> s"""
>|CREATE TEMPORARY TABLE source_kafka_wip_his_all (
>|  GUID varchar,
>|  OPERATION varchar,
>|  PRODUCTID varchar,
>|  LOTNO varchar,
>|  SERIALNO varchar,
>|  QUERYSERIALNO varchar,
>|  SERIALNO1 varchar,
>|  SERIALNO2 varchar,
>|  WIPORDERNO varchar,
>|  WIPORDERTYPE varchar,
>|  VIRTUALLOT varchar,
>|  PREOPERATION varchar,
>|  NORMALPREOPERATION varchar,
>|  PROCESSID varchar,
>|  EQUIPMENT varchar,
>|  INBOUNDDATE varchar,
>|  OUTBOUNDDATE varchar,
>|  REWORK varchar,
>|  REWORKPROCESSID varchar,
>|  CONTAINER varchar,
>|  WIPCONTENTCLASSID varchar,
>|  STATUSCODE varchar,
>|  WIPSTATUS varchar,
>|  TESTPROCESSID varchar,
>|  TESTORDERTYPE varchar,
>|  TESTORDER varchar,
>|  TEST varchar,
>|  SORTINGPROCESSID varchar,
>|  SORTINGORDERTYPE varchar,
>|  SORTINGORDER varchar,
>|  SORTING varchar,
>|  MINO varchar,
>|  GROUPCODE varchar,
>|  HIGHLOWGROUP varchar,
>|  PRODUCTNO varchar,
>|  FACILITY varchar,
>|  WIPLINE varchar,
>|  CHILDEQUCODE varchar,
>|  STATION varchar,
>|  QTY varchar,
>|  PASS_FLAG varchar,
>|  DEFECTCODELIST varchar,
>|  ISFIRST varchar,
>|  PARALIST ARRAY string,OPERATION string,REWORKPROCESSID string,CHARACTERISTIC 
> string,CHARACTERISTICREVISION string,CHARACTERISTICTYPE 
> string,CHARACTERISTICCLASS string,UPPERCONTROLLIMIT string,TARGETVALUE 
> string,LOWERCONTROLLIMIT string,TESTVALUE string,TESTATTRIBUTE 
> string,TESTINGSTARTDATE string,TESTFINISHDATE string,UOMCODE 
> string,DEFECTCODE string,SPECPARAMID string,STATION string,GP_TIME 
> string,REFERENCEID string,LASTUPDATEON string,LASTUPDATEDBY string,CREATEDON 
> string,CREATEDBY string,ACTIVE string,LASTDELETEON string,LASTDELETEDBY 
> string,LASTREACTIVATEON string,LASTREACTIVATEDBY string,ARCHIVEID 
> string,LASTARCHIVEON string,LASTARCHIVEDBY string,LASTRESTOREON 
> string,LASTRESTOREDBY string,ROWVERSIONSTAMP string)>,
>|  REFERENCEID varchar,
>|  LASTUPDATEON varchar,
>|  LASTUPDATEDBY varchar,
>|  CREATEDON varchar,
>|  CREATEDBY varchar,
>|  ACTIVE varchar,
>|  LASTDELETEON varchar,
>|  LASTDELETEDBY varchar,
>|  LASTREACTIVATEON varchar,
>|  LASTREACTIVATEDBY varchar,
>|  ARCHIVEID varchar,
>|  LASTARCHIVEON varchar,
>|  LASTARCHIVEDBY varchar,
>|  LASTRESTOREON varchar,
>|  LASTRESTOREDBY varchar,
>|  ROWVERSIONSTAMP varchar,
>|  proctime as PROCTIME()
>|  ) with (
>|  'connector' = 'datagen'
>|)
>|""".stripMargin)
>   tEnv.executeSql(
> s"""
>|create TEMPORARY view transform_main_data as
>|select
>|  r.GUID as wip_his_guid,
>|  r.EQUIPMENT as equipment,
>|  r.WIPLINE as wipline,
>|  r.STATION as station,
>|  cast(r.PROCESSID as decimal) as processid,
>|  r.PRODUCTNO as productno,
>|  t.TESTFINISHDATE as testfinishdate,
>|  t.OPERATION as operation,
>|  t.CHARACTERISTIC as characteristic,
>|  t.LOWERCONTROLLIMIT as lowercontrollimit,
>|  t.UPPERCONTROLLIMIT as uppercontrollimit,
>|  t.TARGETVALUE as targetvalue,
>|  t.DEFECTCODE as defectcode,
>|  t.TESTVALUE as testvalue,
>|  t.CHARACTERISTICTYPE as characteristictype,
>|  proctime
>|  from
>|  (select
>|  GUID,
>|  EQUIPMENT,
>|  WIPLINE,
>|  STATION,
>|  PROCESSID,
>|  PRODUCTNO,
>|  PARALIST,
>|  proctime
>|  FROM source_kafka_wip_his_all) r
>|  cross join
>|  unnest(PARALIST) as t 
> (GUID,WIP_HIS_GUID,QUERYSERIALNO,OPERATION,REWORK

[jira] [Updated] (FLINK-28981) Release Testing: Verify FLIP-245 sources speculative execution

2022-08-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-28981:

Description: 
Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
 - FLIP-168: Speculative Execution core part
 - FLIP-224: Blocklist Mechanism
 - FLIP-245: Source Supports Speculative Execution
 - FLIP-249: Flink Web UI Enhancement for Speculative Execution

This ticket aims for verifying FLIP-245, along with FLIP-168, FLIP-224 and 
FLIP-249.

More details about this feature and how to use it can be found in this 
documentation [PR|https://github.com/apache/flink/pull/20507].

To do the verification, the process can be:
 - Write Flink jobs which has some {{source}} subtasks running much slower than 
others. 3 kinds of sources should be verified, including
 -- [Source 
functions|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java]
 -- [InputFormat 
sources|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java]
 -- [FLIP-27 new 
sources|https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java]
 - Modify Flink configuration file to enable speculative execution and tune the 
configuration as you like
 - Submit the job. Checking the web UI, logs, metrics and produced result.

  was:
Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
 - FLIP-168: Speculative Execution core part
 - FLIP-224: Blocklist Mechanism
 - FLIP-245: Source Supports Speculative Execution
 - FLIP-249: Flink Web UI Enhancement for Speculative Execution

This ticket aims for verifying FLIP-245, along with FLIP-168, FLIP-224 and 
FLIP-249.

More details about this feature and how to use it can be found in this 
documentation [PR|https://github.com/apache/flink/pull/20507].

To do the verification, the process can be:
 - Write Flink jobs which has some {{source}} subtasks running much slower than 
others. 3 kinds of sources should be verified, including
 - [Source 
functions|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java]
 - [InputFormat 
sources|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java]
 - [FLIP-27 new 
sources|https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java]
 - Modify Flink configuration file to enable speculative execution and tune the 
configuration as you like
 - Submit the job. Checking the web UI, logs, metrics and produced result.


> Release Testing: Verify FLIP-245 sources speculative execution
> --
>
> Key: FLINK-28981
> URL: https://issues.apache.org/jira/browse/FLINK-28981
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common, Runtime / Coordination
>Reporter: Zhu Zhu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
> tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
>  - FLIP-168: Speculative Execution core part
>  - FLIP-224: Blocklist Mechanism
>  - FLIP-245: Source Supports Speculative Execution
>  - FLIP-249: Flink Web UI Enhancement for Speculative Execution
> This ticket aims for verifying FLIP-245, along with FLIP-168, FLIP-224 and 
> FLIP-249.
> More details about this feature and how to use it can be found in this 
> documentation [PR|https://github.com/apache/flink/pull/20507].
> To do the verification, the process can be:
>  - Write Flink jobs which has some {{source}} subtasks running much slower 
> than others. 3 kinds of sources should be verified, including
>  -- [Source 
> functions|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java]
>  -- [InputFormat 
> sources|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java]
>  -- [FLIP-27 new 
> sources|https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java]
>  - Modify Flink configuration file to enable speculative execution and tune 
> the configuration as you like
>  - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was s

[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution

2022-08-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-28980:

Labels: release-testing  (was: test-stability)

> Release Testing: Verify FLIP-168 speculative execution
> --
>
> Key: FLINK-28980
> URL: https://issues.apache.org/jira/browse/FLINK-28980
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
> tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
>  - FLIP-168: Speculative Execution core part
>  - FLIP-224: Blocklist Mechanism
>  - FLIP-245: Source Supports Speculative Execution
>  - FLIP-249: Flink Web UI Enhancement for Speculative Execution
> This ticket aims for verifying FLIP-168, along with FLIP-224 and FLIP-249.
> More details about this feature and how to use it can be found in this 
> documentation [PR|https://github.com/apache/flink/pull/20507].
> To do the verification, the process can be:
>  - Write a Flink job which has a subtask running much slower than others 
> (e.g. sleep indefinitely if it runs on a certain host, the hostname can be 
> retrieved via InetAddress.getLocalHost().getHostName(), or if its 
> (subtaskIndex + attemptNumer) % 2 == 0)
>  - Modify Flink configuration file to enable speculative execution and tune 
> the configuration as you like
>  - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28981) Release Testing: Verify FLIP-245 sources speculative execution

2022-08-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-28981:

Description: 
Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
 - FLIP-168: Speculative Execution core part
 - FLIP-224: Blocklist Mechanism
 - FLIP-245: Source Supports Speculative Execution
 - FLIP-249: Flink Web UI Enhancement for Speculative Execution

This ticket aims for verifying FLIP-245, along with FLIP-168, FLIP-224 and 
FLIP-249.

More details about this feature and how to use it can be found in this 
documentation [PR|https://github.com/apache/flink/pull/20507].

To do the verification, the process can be:
 - Write Flink jobs which has some {{source}} subtasks running much slower than 
others. 3 kinds of sources should be verified, including
 - [Source 
functions|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java]
 - [InputFormat 
sources|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java]
 - [FLIP-27 new 
sources|https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java]
 - Modify Flink configuration file to enable speculative execution and tune the 
configuration as you like
 - Submit the job. Checking the web UI, logs, metrics and produced result.

  was:
Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
 - FLIP-168: Speculative Execution core part
 - FLIP-224: Blocklist Mechanism
 - FLIP-245: Source Supports Speculative Execution
 - FLIP-249: Flink Web UI Enhancement for Speculative Execution

This ticket aims for verifying FLIP-245, along with FLIP-168, FLIP-224 and 
FLIP-249.

More details about this feature and how to use it can be found in this 
documentation [PR|https://github.com/apache/flink/pull/20507].

To do the verification, the process can be:
 - Write Flink jobs which has some {{source}} subtasks running much slower than 
others. 3 kinds of sources should be verified, including
   - [Source 
functions|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java]
   - [InputFormat 
sources|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java]
   - [FLIP-27 new 
sources|https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java]
 - Modify Flink configuration file to enable speculative execution and tune the 
configuration as you like
 - Submit the job. Checking the web UI, logs, metrics and produced result.


> Release Testing: Verify FLIP-245 sources speculative execution
> --
>
> Key: FLINK-28981
> URL: https://issues.apache.org/jira/browse/FLINK-28981
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common, Runtime / Coordination
>Reporter: Zhu Zhu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
> tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
>  - FLIP-168: Speculative Execution core part
>  - FLIP-224: Blocklist Mechanism
>  - FLIP-245: Source Supports Speculative Execution
>  - FLIP-249: Flink Web UI Enhancement for Speculative Execution
> This ticket aims for verifying FLIP-245, along with FLIP-168, FLIP-224 and 
> FLIP-249.
> More details about this feature and how to use it can be found in this 
> documentation [PR|https://github.com/apache/flink/pull/20507].
> To do the verification, the process can be:
>  - Write Flink jobs which has some {{source}} subtasks running much slower 
> than others. 3 kinds of sources should be verified, including
>  - [Source 
> functions|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java]
>  - [InputFormat 
> sources|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java]
>  - [FLIP-27 new 
> sources|https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java]
>  - Modify Flink configuration file to enable speculative execution and tune 
> the configuration as you like
>  - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlas

[jira] [Updated] (FLINK-28981) Release Testing: Verify FLIP-245 sources speculative execution

2022-08-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-28981:

Labels: release-testing  (was: test-stability)

> Release Testing: Verify FLIP-245 sources speculative execution
> --
>
> Key: FLINK-28981
> URL: https://issues.apache.org/jira/browse/FLINK-28981
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common, Runtime / Coordination
>Reporter: Zhu Zhu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
> tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
>  - FLIP-168: Speculative Execution core part
>  - FLIP-224: Blocklist Mechanism
>  - FLIP-245: Source Supports Speculative Execution
>  - FLIP-249: Flink Web UI Enhancement for Speculative Execution
> This ticket aims for verifying FLIP-245, along with FLIP-168, FLIP-224 and 
> FLIP-249.
> More details about this feature and how to use it can be found in this 
> documentation [PR|https://github.com/apache/flink/pull/20507].
> To do the verification, the process can be:
>  - Write Flink jobs which has some {{source}} subtasks running much slower 
> than others. 3 kinds of sources should be verified, including
>- [Source 
> functions|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java]
>- [InputFormat 
> sources|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java]
>- [FLIP-27 new 
> sources|https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java]
>  - Modify Flink configuration file to enable speculative execution and tune 
> the configuration as you like
>  - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-28986) UNNEST function with nested fails to generate plan

2022-08-15 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580080#comment-17580080
 ] 

Jane Chan edited comment on FLINK-28986 at 8/16/22 6:37 AM:


The reason is that during the decorrelation rewrite, the top filter rel node 
was pushed down to form a nested filter pattern. Since the filter merge rule is 
not defined in the default rewrite rule sets, the nested filter rel nodes 
rendered the LogicalUnnestRule unmatched. This can be fixed by adding  
CoreRules.FILTER_MERGE to FlinkStreamRuleSets. cc [~godfrey] 

!image-2022-08-16-14-36-07-061.png|width=1047,height=177!


was (Author: qingyue):
The reason is that during the decorrelation rewrite, the top filter rel node 
was pushed down to form a nested filter pattern. Since the filter merge rule is 
not defined in the default rewrite rule sets, the nested filter rel nodes 
rendered the LogicalUnnestRule unmatched. This can be fixed by adding  
CoreRules.FILTER_MERGE to FlinkStreamRuleSets.

!image-2022-08-16-14-36-07-061.png|width=1047,height=177!

> UNNEST function with nested fails to generate plan
> --
>
> Key: FLINK-28986
> URL: https://issues.apache.org/jira/browse/FLINK-28986
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: Jane Chan
>Priority: Major
> Attachments: image-2022-08-16-14-36-07-061.png
>
>
> h3. How to reproduce
> add the following case to TableEnvironmentITCase
> {code:scala}
> @Test
> def debug(): Unit = {
>   tEnv.executeSql(
> s"""
>|CREATE TEMPORARY TABLE source_kafka_wip_his_all (
>|  GUID varchar,
>|  OPERATION varchar,
>|  PRODUCTID varchar,
>|  LOTNO varchar,
>|  SERIALNO varchar,
>|  QUERYSERIALNO varchar,
>|  SERIALNO1 varchar,
>|  SERIALNO2 varchar,
>|  WIPORDERNO varchar,
>|  WIPORDERTYPE varchar,
>|  VIRTUALLOT varchar,
>|  PREOPERATION varchar,
>|  NORMALPREOPERATION varchar,
>|  PROCESSID varchar,
>|  EQUIPMENT varchar,
>|  INBOUNDDATE varchar,
>|  OUTBOUNDDATE varchar,
>|  REWORK varchar,
>|  REWORKPROCESSID varchar,
>|  CONTAINER varchar,
>|  WIPCONTENTCLASSID varchar,
>|  STATUSCODE varchar,
>|  WIPSTATUS varchar,
>|  TESTPROCESSID varchar,
>|  TESTORDERTYPE varchar,
>|  TESTORDER varchar,
>|  TEST varchar,
>|  SORTINGPROCESSID varchar,
>|  SORTINGORDERTYPE varchar,
>|  SORTINGORDER varchar,
>|  SORTING varchar,
>|  MINO varchar,
>|  GROUPCODE varchar,
>|  HIGHLOWGROUP varchar,
>|  PRODUCTNO varchar,
>|  FACILITY varchar,
>|  WIPLINE varchar,
>|  CHILDEQUCODE varchar,
>|  STATION varchar,
>|  QTY varchar,
>|  PASS_FLAG varchar,
>|  DEFECTCODELIST varchar,
>|  ISFIRST varchar,
>|  PARALIST ARRAY string,OPERATION string,REWORKPROCESSID string,CHARACTERISTIC 
> string,CHARACTERISTICREVISION string,CHARACTERISTICTYPE 
> string,CHARACTERISTICCLASS string,UPPERCONTROLLIMIT string,TARGETVALUE 
> string,LOWERCONTROLLIMIT string,TESTVALUE string,TESTATTRIBUTE 
> string,TESTINGSTARTDATE string,TESTFINISHDATE string,UOMCODE 
> string,DEFECTCODE string,SPECPARAMID string,STATION string,GP_TIME 
> string,REFERENCEID string,LASTUPDATEON string,LASTUPDATEDBY string,CREATEDON 
> string,CREATEDBY string,ACTIVE string,LASTDELETEON string,LASTDELETEDBY 
> string,LASTREACTIVATEON string,LASTREACTIVATEDBY string,ARCHIVEID 
> string,LASTARCHIVEON string,LASTARCHIVEDBY string,LASTRESTOREON 
> string,LASTRESTOREDBY string,ROWVERSIONSTAMP string)>,
>|  REFERENCEID varchar,
>|  LASTUPDATEON varchar,
>|  LASTUPDATEDBY varchar,
>|  CREATEDON varchar,
>|  CREATEDBY varchar,
>|  ACTIVE varchar,
>|  LASTDELETEON varchar,
>|  LASTDELETEDBY varchar,
>|  LASTREACTIVATEON varchar,
>|  LASTREACTIVATEDBY varchar,
>|  ARCHIVEID varchar,
>|  LASTARCHIVEON varchar,
>|  LASTARCHIVEDBY varchar,
>|  LASTRESTOREON varchar,
>|  LASTRESTOREDBY varchar,
>|  ROWVERSIONSTAMP varchar,
>|  proctime as PROCTIME()
>|  ) with (
>|  'connector' = 'datagen'
>|)
>|""".stripMargin)
>   tEnv.executeSql(
> s"""
>|create TEMPORARY view transform_main_data as
>|select
>|  r.GUID as wip_his_guid,
>|  r.EQUIPMENT as equipment,
>|  r.WIPLINE as wipline,
>|  r.STATION as station,
>|  cast(r.PROCESSID as decimal) as processid,
>| 

[jira] [Commented] (FLINK-28986) UNNEST function with nested fails to generate plan

2022-08-15 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580080#comment-17580080
 ] 

Jane Chan commented on FLINK-28986:
---

The reason is that during the decorrelation rewrite, the top filter rel node 
was pushed down to form a nested filter pattern. Since the filter merge rule is 
not defined in the default rewrite rule sets, the nested filter rel nodes 
rendered the LogicalUnnestRule unmatched. This can be fixed by adding  
CoreRules.FILTER_MERGE to FlinkStreamRuleSets.

!image-2022-08-16-14-36-07-061.png|width=1047,height=177!

> UNNEST function with nested fails to generate plan
> --
>
> Key: FLINK-28986
> URL: https://issues.apache.org/jira/browse/FLINK-28986
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: Jane Chan
>Priority: Major
> Attachments: image-2022-08-16-14-36-07-061.png
>
>
> h3. How to reproduce
> add the following case to TableEnvironmentITCase
> {code:scala}
> @Test
> def debug(): Unit = {
>   tEnv.executeSql(
> s"""
>|CREATE TEMPORARY TABLE source_kafka_wip_his_all (
>|  GUID varchar,
>|  OPERATION varchar,
>|  PRODUCTID varchar,
>|  LOTNO varchar,
>|  SERIALNO varchar,
>|  QUERYSERIALNO varchar,
>|  SERIALNO1 varchar,
>|  SERIALNO2 varchar,
>|  WIPORDERNO varchar,
>|  WIPORDERTYPE varchar,
>|  VIRTUALLOT varchar,
>|  PREOPERATION varchar,
>|  NORMALPREOPERATION varchar,
>|  PROCESSID varchar,
>|  EQUIPMENT varchar,
>|  INBOUNDDATE varchar,
>|  OUTBOUNDDATE varchar,
>|  REWORK varchar,
>|  REWORKPROCESSID varchar,
>|  CONTAINER varchar,
>|  WIPCONTENTCLASSID varchar,
>|  STATUSCODE varchar,
>|  WIPSTATUS varchar,
>|  TESTPROCESSID varchar,
>|  TESTORDERTYPE varchar,
>|  TESTORDER varchar,
>|  TEST varchar,
>|  SORTINGPROCESSID varchar,
>|  SORTINGORDERTYPE varchar,
>|  SORTINGORDER varchar,
>|  SORTING varchar,
>|  MINO varchar,
>|  GROUPCODE varchar,
>|  HIGHLOWGROUP varchar,
>|  PRODUCTNO varchar,
>|  FACILITY varchar,
>|  WIPLINE varchar,
>|  CHILDEQUCODE varchar,
>|  STATION varchar,
>|  QTY varchar,
>|  PASS_FLAG varchar,
>|  DEFECTCODELIST varchar,
>|  ISFIRST varchar,
>|  PARALIST ARRAY string,OPERATION string,REWORKPROCESSID string,CHARACTERISTIC 
> string,CHARACTERISTICREVISION string,CHARACTERISTICTYPE 
> string,CHARACTERISTICCLASS string,UPPERCONTROLLIMIT string,TARGETVALUE 
> string,LOWERCONTROLLIMIT string,TESTVALUE string,TESTATTRIBUTE 
> string,TESTINGSTARTDATE string,TESTFINISHDATE string,UOMCODE 
> string,DEFECTCODE string,SPECPARAMID string,STATION string,GP_TIME 
> string,REFERENCEID string,LASTUPDATEON string,LASTUPDATEDBY string,CREATEDON 
> string,CREATEDBY string,ACTIVE string,LASTDELETEON string,LASTDELETEDBY 
> string,LASTREACTIVATEON string,LASTREACTIVATEDBY string,ARCHIVEID 
> string,LASTARCHIVEON string,LASTARCHIVEDBY string,LASTRESTOREON 
> string,LASTRESTOREDBY string,ROWVERSIONSTAMP string)>,
>|  REFERENCEID varchar,
>|  LASTUPDATEON varchar,
>|  LASTUPDATEDBY varchar,
>|  CREATEDON varchar,
>|  CREATEDBY varchar,
>|  ACTIVE varchar,
>|  LASTDELETEON varchar,
>|  LASTDELETEDBY varchar,
>|  LASTREACTIVATEON varchar,
>|  LASTREACTIVATEDBY varchar,
>|  ARCHIVEID varchar,
>|  LASTARCHIVEON varchar,
>|  LASTARCHIVEDBY varchar,
>|  LASTRESTOREON varchar,
>|  LASTRESTOREDBY varchar,
>|  ROWVERSIONSTAMP varchar,
>|  proctime as PROCTIME()
>|  ) with (
>|  'connector' = 'datagen'
>|)
>|""".stripMargin)
>   tEnv.executeSql(
> s"""
>|create TEMPORARY view transform_main_data as
>|select
>|  r.GUID as wip_his_guid,
>|  r.EQUIPMENT as equipment,
>|  r.WIPLINE as wipline,
>|  r.STATION as station,
>|  cast(r.PROCESSID as decimal) as processid,
>|  r.PRODUCTNO as productno,
>|  t.TESTFINISHDATE as testfinishdate,
>|  t.OPERATION as operation,
>|  t.CHARACTERISTIC as characteristic,
>|  t.LOWERCONTROLLIMIT as lowercontrollimit,
>|  t.UPPERCONTROLLIMIT as uppercontrollimit,
>|  t.TARGETVALUE as targetvalue,
>|  t.DEFECTCODE as defectcode,
>|  t.TESTVALUE as testvalue,
>|  t.CHARACTERISTICTYPE as characteristictype,
>|  

[jira] [Updated] (FLINK-28986) UNNEST function with nested fails to generate plan

2022-08-15 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan updated FLINK-28986:
--
Attachment: image-2022-08-16-14-36-07-061.png

> UNNEST function with nested fails to generate plan
> --
>
> Key: FLINK-28986
> URL: https://issues.apache.org/jira/browse/FLINK-28986
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: Jane Chan
>Priority: Major
> Attachments: image-2022-08-16-14-36-07-061.png
>
>
> h3. How to reproduce
> add the following case to TableEnvironmentITCase
> {code:scala}
> @Test
> def debug(): Unit = {
>   tEnv.executeSql(
> s"""
>|CREATE TEMPORARY TABLE source_kafka_wip_his_all (
>|  GUID varchar,
>|  OPERATION varchar,
>|  PRODUCTID varchar,
>|  LOTNO varchar,
>|  SERIALNO varchar,
>|  QUERYSERIALNO varchar,
>|  SERIALNO1 varchar,
>|  SERIALNO2 varchar,
>|  WIPORDERNO varchar,
>|  WIPORDERTYPE varchar,
>|  VIRTUALLOT varchar,
>|  PREOPERATION varchar,
>|  NORMALPREOPERATION varchar,
>|  PROCESSID varchar,
>|  EQUIPMENT varchar,
>|  INBOUNDDATE varchar,
>|  OUTBOUNDDATE varchar,
>|  REWORK varchar,
>|  REWORKPROCESSID varchar,
>|  CONTAINER varchar,
>|  WIPCONTENTCLASSID varchar,
>|  STATUSCODE varchar,
>|  WIPSTATUS varchar,
>|  TESTPROCESSID varchar,
>|  TESTORDERTYPE varchar,
>|  TESTORDER varchar,
>|  TEST varchar,
>|  SORTINGPROCESSID varchar,
>|  SORTINGORDERTYPE varchar,
>|  SORTINGORDER varchar,
>|  SORTING varchar,
>|  MINO varchar,
>|  GROUPCODE varchar,
>|  HIGHLOWGROUP varchar,
>|  PRODUCTNO varchar,
>|  FACILITY varchar,
>|  WIPLINE varchar,
>|  CHILDEQUCODE varchar,
>|  STATION varchar,
>|  QTY varchar,
>|  PASS_FLAG varchar,
>|  DEFECTCODELIST varchar,
>|  ISFIRST varchar,
>|  PARALIST ARRAY string,OPERATION string,REWORKPROCESSID string,CHARACTERISTIC 
> string,CHARACTERISTICREVISION string,CHARACTERISTICTYPE 
> string,CHARACTERISTICCLASS string,UPPERCONTROLLIMIT string,TARGETVALUE 
> string,LOWERCONTROLLIMIT string,TESTVALUE string,TESTATTRIBUTE 
> string,TESTINGSTARTDATE string,TESTFINISHDATE string,UOMCODE 
> string,DEFECTCODE string,SPECPARAMID string,STATION string,GP_TIME 
> string,REFERENCEID string,LASTUPDATEON string,LASTUPDATEDBY string,CREATEDON 
> string,CREATEDBY string,ACTIVE string,LASTDELETEON string,LASTDELETEDBY 
> string,LASTREACTIVATEON string,LASTREACTIVATEDBY string,ARCHIVEID 
> string,LASTARCHIVEON string,LASTARCHIVEDBY string,LASTRESTOREON 
> string,LASTRESTOREDBY string,ROWVERSIONSTAMP string)>,
>|  REFERENCEID varchar,
>|  LASTUPDATEON varchar,
>|  LASTUPDATEDBY varchar,
>|  CREATEDON varchar,
>|  CREATEDBY varchar,
>|  ACTIVE varchar,
>|  LASTDELETEON varchar,
>|  LASTDELETEDBY varchar,
>|  LASTREACTIVATEON varchar,
>|  LASTREACTIVATEDBY varchar,
>|  ARCHIVEID varchar,
>|  LASTARCHIVEON varchar,
>|  LASTARCHIVEDBY varchar,
>|  LASTRESTOREON varchar,
>|  LASTRESTOREDBY varchar,
>|  ROWVERSIONSTAMP varchar,
>|  proctime as PROCTIME()
>|  ) with (
>|  'connector' = 'datagen'
>|)
>|""".stripMargin)
>   tEnv.executeSql(
> s"""
>|create TEMPORARY view transform_main_data as
>|select
>|  r.GUID as wip_his_guid,
>|  r.EQUIPMENT as equipment,
>|  r.WIPLINE as wipline,
>|  r.STATION as station,
>|  cast(r.PROCESSID as decimal) as processid,
>|  r.PRODUCTNO as productno,
>|  t.TESTFINISHDATE as testfinishdate,
>|  t.OPERATION as operation,
>|  t.CHARACTERISTIC as characteristic,
>|  t.LOWERCONTROLLIMIT as lowercontrollimit,
>|  t.UPPERCONTROLLIMIT as uppercontrollimit,
>|  t.TARGETVALUE as targetvalue,
>|  t.DEFECTCODE as defectcode,
>|  t.TESTVALUE as testvalue,
>|  t.CHARACTERISTICTYPE as characteristictype,
>|  proctime
>|  from
>|  (select
>|  GUID,
>|  EQUIPMENT,
>|  WIPLINE,
>|  STATION,
>|  PROCESSID,
>|  PRODUCTNO,
>|  PARALIST,
>|  proctime
>|  FROM source_kafka_wip_his_all) r
>|  cross join
>|  unnest(PARALIST) as t 
> (GUID,WIP_HIS_GUID,QUERYSERIALNO,OPERATION,REWORKPROCESSID,CHARAC

[jira] [Commented] (FLINK-28912) Add Part of "Who Use Flink" In ReadMe file and https://flink.apache.org/

2022-08-15 Thread Zhou Yao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580077#comment-17580077
 ] 

Zhou Yao commented on FLINK-28912:
--

Great~,thanks

> Add Part of "Who Use Flink" In ReadMe file  and https://flink.apache.org/
> -
>
> Key: FLINK-28912
> URL: https://issues.apache.org/jira/browse/FLINK-28912
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Zhou Yao
>Priority: Minor
>  Labels: doc
> Attachments: image-2022-08-10-20-15-10-418.png
>
>
> May be ,we can learn from website of  [Apache 
> Kylin|https://kylin.apache.org/], add part of  "Who Use Flink"  in Readme or 
> website.  This can make Flink more frendly
> !image-2022-08-10-20-15-10-418.png|width=147,height=99!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28986) UNNEST function with nested fails to generate plan

2022-08-15 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan updated FLINK-28986:
--
Description: 
h3. How to reproduce

add the following case to TableEnvironmentITCase
{code:scala}
@Test
def debug(): Unit = {
  tEnv.executeSql(
s"""
   |CREATE TEMPORARY TABLE source_kafka_wip_his_all (
   |  GUID varchar,
   |  OPERATION varchar,
   |  PRODUCTID varchar,
   |  LOTNO varchar,
   |  SERIALNO varchar,
   |  QUERYSERIALNO varchar,
   |  SERIALNO1 varchar,
   |  SERIALNO2 varchar,
   |  WIPORDERNO varchar,
   |  WIPORDERTYPE varchar,
   |  VIRTUALLOT varchar,
   |  PREOPERATION varchar,
   |  NORMALPREOPERATION varchar,
   |  PROCESSID varchar,
   |  EQUIPMENT varchar,
   |  INBOUNDDATE varchar,
   |  OUTBOUNDDATE varchar,
   |  REWORK varchar,
   |  REWORKPROCESSID varchar,
   |  CONTAINER varchar,
   |  WIPCONTENTCLASSID varchar,
   |  STATUSCODE varchar,
   |  WIPSTATUS varchar,
   |  TESTPROCESSID varchar,
   |  TESTORDERTYPE varchar,
   |  TESTORDER varchar,
   |  TEST varchar,
   |  SORTINGPROCESSID varchar,
   |  SORTINGORDERTYPE varchar,
   |  SORTINGORDER varchar,
   |  SORTING varchar,
   |  MINO varchar,
   |  GROUPCODE varchar,
   |  HIGHLOWGROUP varchar,
   |  PRODUCTNO varchar,
   |  FACILITY varchar,
   |  WIPLINE varchar,
   |  CHILDEQUCODE varchar,
   |  STATION varchar,
   |  QTY varchar,
   |  PASS_FLAG varchar,
   |  DEFECTCODELIST varchar,
   |  ISFIRST varchar,
   |  PARALIST ARRAY,
   |  REFERENCEID varchar,
   |  LASTUPDATEON varchar,
   |  LASTUPDATEDBY varchar,
   |  CREATEDON varchar,
   |  CREATEDBY varchar,
   |  ACTIVE varchar,
   |  LASTDELETEON varchar,
   |  LASTDELETEDBY varchar,
   |  LASTREACTIVATEON varchar,
   |  LASTREACTIVATEDBY varchar,
   |  ARCHIVEID varchar,
   |  LASTARCHIVEON varchar,
   |  LASTARCHIVEDBY varchar,
   |  LASTRESTOREON varchar,
   |  LASTRESTOREDBY varchar,
   |  ROWVERSIONSTAMP varchar,
   |  proctime as PROCTIME()
   |  ) with (
   |  'connector' = 'datagen'
   |)
   |""".stripMargin)

  tEnv.executeSql(
s"""
   |create TEMPORARY view transform_main_data as
   |select
   |  r.GUID as wip_his_guid,
   |  r.EQUIPMENT as equipment,
   |  r.WIPLINE as wipline,
   |  r.STATION as station,
   |  cast(r.PROCESSID as decimal) as processid,
   |  r.PRODUCTNO as productno,
   |  t.TESTFINISHDATE as testfinishdate,
   |  t.OPERATION as operation,
   |  t.CHARACTERISTIC as characteristic,
   |  t.LOWERCONTROLLIMIT as lowercontrollimit,
   |  t.UPPERCONTROLLIMIT as uppercontrollimit,
   |  t.TARGETVALUE as targetvalue,
   |  t.DEFECTCODE as defectcode,
   |  t.TESTVALUE as testvalue,
   |  t.CHARACTERISTICTYPE as characteristictype,
   |  proctime
   |  from
   |  (select
   |  GUID,
   |  EQUIPMENT,
   |  WIPLINE,
   |  STATION,
   |  PROCESSID,
   |  PRODUCTNO,
   |  PARALIST,
   |  proctime
   |  FROM source_kafka_wip_his_all) r
   |  cross join
   |  unnest(PARALIST) as t 
(GUID,WIP_HIS_GUID,QUERYSERIALNO,OPERATION,REWORKPROCESSID,CHARACTERISTIC,CHARACTERISTICREVISION,CHARACTERISTICTYPE,CHARACTERISTICCLASS,UPPERCONTROLLIMIT,TARGETVALUE,LOWERCONTROLLIMIT,TESTVALUE,TESTATTRIBUTE,TESTINGSTARTDATE,TESTFINISHDATE,UOMCODE,DEFECTCODE,SPECPARAMID,STATION,GP_TIME,REFERENCEID,LASTUPDATEON,LASTUPDATEDBY,CREATEDON,CREATEDBY,ACTIVE,LASTDELETEON,LASTDELETEDBY,LASTREACTIVATEON,LASTREACTIVATEDBY,ARCHIVEID,LASTARCHIVEON,LASTARCHIVEDBY,LASTRESTOREON,LASTRESTOREDBY,ROWVERSIONSTAMP)
   |  where t.CHARACTERISTICTYPE = '2'
   |""".stripMargin)

  tEnv.executeSql(
s"""
   |explain plan for
   |select * from transform_main_data
   |where operation not in 
('G1208','G1209','G1211','G1213','G1206','G1207','G1214','G1215','G1282','G1292','G1216')
   |""".stripMargin).print()
} {code}
Stacktrace
{code:java}
org.apache.flink.table.api.TableException: Cannot generate a valid execution 
plan for the given query: LogicalProject(inputs=[0..3], 
exprs=[[CAST($4):DECIMAL(10, 0), $5, $23, $11, $13, $19, $17, $18, $25, $20, 
$15, $7]])
+- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
requiredColumns=[{6}])
   :- LogicalProject(inputs=[0], exprs=[[$14, $36, $38, $13, $34, $43, 
PROCTIME()]])
   :  +- LogicalTableScan(table=[[default_catalog, default_database, 
source_kafka_wip_his_all]])
   +- LogicalFilter(condition=[AND(SEARCH($7, 
Sarg[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), SEARCH($3, 
Sarg[(-∞

[jira] [Assigned] (FLINK-28939) Release Testing: Verify FLIP-241 ANALYZE TABLE

2022-08-15 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he reassigned FLINK-28939:
--

Assignee: Yunhong Zheng

> Release Testing: Verify FLIP-241 ANALYZE TABLE
> --
>
> Key: FLINK-28939
> URL: https://issues.apache.org/jira/browse/FLINK-28939
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: godfrey he
>Assignee: Yunhong Zheng
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> This issue aims to verify FLIP-240: 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386481
> We can verify it in SQL client after we build the flink-dist package. 
> 1. create a partition table and a non-partition table (with/without compute 
> column/metadata column, with different columns), and then insert some data
> 2. verify the different statements, please refer to the FLIP doc examples
> 3. verify the result in catalog. Currently, {{describe extended}} statement 
> does not support show the statistics in catalog, we should write some code to 
> get the statistics from catalog, or we can use hive cli if the catalog is 
> hive catalog
> 4. verify the unsupported cases,
> 4.1  analyze non-existed table
> 4.2 analyze view
> 4.3 analyze a partition table with non-existed partition
> 4.4. analyze a non-partition table with a partition
> 4.5. analyze a non-existed column
> 4.6. analyze a computed column
> 4.6. analyze a metadata column



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28986) UNNEST function with nested fails to generate plan

2022-08-15 Thread Jane Chan (Jira)
Jane Chan created FLINK-28986:
-

 Summary: UNNEST function with nested fails to generate plan
 Key: FLINK-28986
 URL: https://issues.apache.org/jira/browse/FLINK-28986
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.16.0
Reporter: Jane Chan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] tsreaper opened a new pull request, #267: [hotfix] Fix github workflow build-different-versions.yaml

2022-08-15 Thread GitBox


tsreaper opened a new pull request, #267:
URL: https://github.com/apache/flink-table-store/pull/267

   Fix syntax of `build-different-versions.yaml`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27199) Bump Pulsar to 2.10.0 for fixing the unstable Pulsar test environment.

2022-08-15 Thread Zili Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zili Chen updated FLINK-27199:
--
Labels: pull-request-available  (was: pull-request-available stale-assigned)

> Bump Pulsar to 2.10.0 for fixing the unstable Pulsar test environment.
> --
>
> Key: FLINK-27199
> URL: https://issues.apache.org/jira/browse/FLINK-27199
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Pulsar
>Affects Versions: 1.14.4, 1.15.0, 1.16.0
>Reporter: Yufan Sheng
>Assignee: Yufan Sheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0, 1.15.2
>
>
> Pulsar's transaction is not stable. The standalone cluster often hangs the 
> test, then we will meet a timeout for the tests at last.
> The latest Pulsar 2.10.0 drops the zookeeper and fixes a lot of issues in the 
> Pulsar transaction. Bump to this version would resolve the current test 
> issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails

2022-08-15 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-28927:


Assignee: ChangjiGuo

> Can not clean up the uploaded shared files when the checkpoint fails
> 
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Assignee: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
> thread and do some cleanup work, including the following:
>  * Cancel all AsyncSnapshotCallable thread.
>  * If the thread has finished, it will clean up all state object.
>  * If the thread has not completed, it will be interrupted(maybe).
>  * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
>  * Files that have finished uploading before the thread is canceled.
>  * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it and it works well so far.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails

2022-08-15 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580068#comment-17580068
 ] 

Yun Tang commented on FLINK-28927:
--

[~ChangjiGuo] It seems you did not give a clear idea of how to solve the 
problem of cleaning up interrupted uploaded files. I could assign the task to 
you first, and hope you could give clear design then.

> Can not clean up the uploaded shared files when the checkpoint fails
> 
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
> thread and do some cleanup work, including the following:
>  * Cancel all AsyncSnapshotCallable thread.
>  * If the thread has finished, it will clean up all state object.
>  * If the thread has not completed, it will be interrupted(maybe).
>  * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
>  * Files that have finished uploading before the thread is canceled.
>  * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it and it works well so far.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-27199) Bump Pulsar to 2.10.0 for fixing the unstable Pulsar test environment.

2022-08-15 Thread Zili Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zili Chen closed FLINK-27199.
-
Resolution: Fixed

1.15 via 65f79130e4b24a2b9d7035bc0751842889729f63

Closed.

> Bump Pulsar to 2.10.0 for fixing the unstable Pulsar test environment.
> --
>
> Key: FLINK-27199
> URL: https://issues.apache.org/jira/browse/FLINK-27199
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Pulsar
>Affects Versions: 1.14.4, 1.15.0, 1.16.0
>Reporter: Yufan Sheng
>Assignee: Yufan Sheng
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.16.0, 1.15.2
>
>
> Pulsar's transaction is not stable. The standalone cluster often hangs the 
> test, then we will meet a timeout for the tests at last.
> The latest Pulsar 2.10.0 drops the zookeeper and fixes a lot of issues in the 
> Pulsar transaction. Bump to this version would resolve the current test 
> issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28967) Fix Hive multi-version support for Table Store

2022-08-15 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-28967:
-
Fix Version/s: (was: table-store-0.3.0)

> Fix Hive multi-version support for Table Store
> --
>
> Key: FLINK-28967
> URL: https://issues.apache.org/jira/browse/FLINK-28967
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Affects Versions: table-store-0.2.0, table-store-0.3.0
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.2.0
>
>
> Currently table store Hive catalog and connector can only work for Hive 2.3. 
> Support for Hive 2.1 and 2.2 should be fixed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27199) Bump Pulsar to 2.10.0 for fixing the unstable Pulsar test environment.

2022-08-15 Thread Zili Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zili Chen updated FLINK-27199:
--
Fix Version/s: (was: 1.14.6)

> Bump Pulsar to 2.10.0 for fixing the unstable Pulsar test environment.
> --
>
> Key: FLINK-27199
> URL: https://issues.apache.org/jira/browse/FLINK-27199
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Pulsar
>Affects Versions: 1.14.4, 1.15.0, 1.16.0
>Reporter: Yufan Sheng
>Assignee: Yufan Sheng
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.16.0, 1.15.2
>
>
> Pulsar's transaction is not stable. The standalone cluster often hangs the 
> test, then we will meet a timeout for the tests at last.
> The latest Pulsar 2.10.0 drops the zookeeper and fixes a lot of issues in the 
> Pulsar transaction. Bump to this version would resolve the current test 
> issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28967) Fix Hive multi-version support for Table Store

2022-08-15 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-28967.

  Assignee: Caizhi Weng
Resolution: Fixed

master: 6652905c4c0ff7f2533353c2d7d3d6552eb978e3
release-0.2: cb5a24e4fc804df4ea746182ea28d03e1866971c

> Fix Hive multi-version support for Table Store
> --
>
> Key: FLINK-28967
> URL: https://issues.apache.org/jira/browse/FLINK-28967
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Affects Versions: table-store-0.2.0, table-store-0.3.0
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.2.0, table-store-0.3.0
>
>
> Currently table store Hive catalog and connector can only work for Hive 2.3. 
> Support for Hive 2.1 and 2.2 should be fixed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] tisonkun commented on pull request #20582: [BK-1.15][FLINK-27199][Connector/Pulsar] Bump pulsar to 2.10.0

2022-08-15 Thread GitBox


tisonkun commented on PR #20582:
URL: https://github.com/apache/flink/pull/20582#issuecomment-1216187797

   Merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] tisonkun merged pull request #20582: [BK-1.15][FLINK-27199][Connector/Pulsar] Bump pulsar to 2.10.0

2022-08-15 Thread GitBox


tisonkun merged PR #20582:
URL: https://github.com/apache/flink/pull/20582


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] JingsongLi merged pull request #266: [FLINK-28967] Fix Hive multi-version support for Table Store

2022-08-15 Thread GitBox


JingsongLi merged PR #266:
URL: https://github.com/apache/flink-table-store/pull/266


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28981) Release Testing: Verify FLIP-245 sources speculative execution

2022-08-15 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-28981:
---
Priority: Blocker  (was: Major)

> Release Testing: Verify FLIP-245 sources speculative execution
> --
>
> Key: FLINK-28981
> URL: https://issues.apache.org/jira/browse/FLINK-28981
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common, Runtime / Coordination
>Reporter: Zhu Zhu
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.16.0
>
>
> Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
> tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
>  - FLIP-168: Speculative Execution core part
>  - FLIP-224: Blocklist Mechanism
>  - FLIP-245: Source Supports Speculative Execution
>  - FLIP-249: Flink Web UI Enhancement for Speculative Execution
> This ticket aims for verifying FLIP-245, along with FLIP-168, FLIP-224 and 
> FLIP-249.
> More details about this feature and how to use it can be found in this 
> documentation [PR|https://github.com/apache/flink/pull/20507].
> To do the verification, the process can be:
>  - Write Flink jobs which has some {{source}} subtasks running much slower 
> than others. 3 kinds of sources should be verified, including
>- [Source 
> functions|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java]
>- [InputFormat 
> sources|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java]
>- [FLIP-27 new 
> sources|https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java]
>  - Modify Flink configuration file to enable speculative execution and tune 
> the configuration as you like
>  - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution

2022-08-15 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-28980:
---
Priority: Blocker  (was: Major)

> Release Testing: Verify FLIP-168 speculative execution
> --
>
> Key: FLINK-28980
> URL: https://issues.apache.org/jira/browse/FLINK-28980
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.16.0
>
>
> Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
> tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
>  - FLIP-168: Speculative Execution core part
>  - FLIP-224: Blocklist Mechanism
>  - FLIP-245: Source Supports Speculative Execution
>  - FLIP-249: Flink Web UI Enhancement for Speculative Execution
> This ticket aims for verifying FLIP-168, along with FLIP-224 and FLIP-249.
> More details about this feature and how to use it can be found in this 
> documentation [PR|https://github.com/apache/flink/pull/20507].
> To do the verification, the process can be:
>  - Write a Flink job which has a subtask running much slower than others 
> (e.g. sleep indefinitely if it runs on a certain host, the hostname can be 
> retrieved via InetAddress.getLocalHost().getHostName(), or if its 
> (subtaskIndex + attemptNumer) % 2 == 0)
>  - Modify Flink configuration file to enable speculative execution and tune 
> the configuration as you like
>  - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28981) Release Testing: Verify FLIP-245 sources speculative execution

2022-08-15 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-28981:
---
Labels: test-stability  (was: )

> Release Testing: Verify FLIP-245 sources speculative execution
> --
>
> Key: FLINK-28981
> URL: https://issues.apache.org/jira/browse/FLINK-28981
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common, Runtime / Coordination
>Reporter: Zhu Zhu
>Priority: Major
>  Labels: test-stability
> Fix For: 1.16.0
>
>
> Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
> tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
>  - FLIP-168: Speculative Execution core part
>  - FLIP-224: Blocklist Mechanism
>  - FLIP-245: Source Supports Speculative Execution
>  - FLIP-249: Flink Web UI Enhancement for Speculative Execution
> This ticket aims for verifying FLIP-245, along with FLIP-168, FLIP-224 and 
> FLIP-249.
> More details about this feature and how to use it can be found in this 
> documentation [PR|https://github.com/apache/flink/pull/20507].
> To do the verification, the process can be:
>  - Write Flink jobs which has some {{source}} subtasks running much slower 
> than others. 3 kinds of sources should be verified, including
>- [Source 
> functions|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java]
>- [InputFormat 
> sources|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java]
>- [FLIP-27 new 
> sources|https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java]
>  - Modify Flink configuration file to enable speculative execution and tune 
> the configuration as you like
>  - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution

2022-08-15 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-28980:
---
Labels: test-stability  (was: )

> Release Testing: Verify FLIP-168 speculative execution
> --
>
> Key: FLINK-28980
> URL: https://issues.apache.org/jira/browse/FLINK-28980
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Priority: Major
>  Labels: test-stability
> Fix For: 1.16.0
>
>
> Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
> tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
>  - FLIP-168: Speculative Execution core part
>  - FLIP-224: Blocklist Mechanism
>  - FLIP-245: Source Supports Speculative Execution
>  - FLIP-249: Flink Web UI Enhancement for Speculative Execution
> This ticket aims for verifying FLIP-168, along with FLIP-224 and FLIP-249.
> More details about this feature and how to use it can be found in this 
> documentation [PR|https://github.com/apache/flink/pull/20507].
> To do the verification, the process can be:
>  - Write a Flink job which has a subtask running much slower than others 
> (e.g. sleep indefinitely if it runs on a certain host, the hostname can be 
> retrieved via InetAddress.getLocalHost().getHostName(), or if its 
> (subtaskIndex + attemptNumer) % 2 == 0)
>  - Modify Flink configuration file to enable speculative execution and tune 
> the configuration as you like
>  - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28925) Fix the concurrency problem in hybrid shuffle

2022-08-15 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580062#comment-17580062
 ] 

godfrey he commented on FLINK-28925:


[~Weijie Guo] would you like to fix it ?

> Fix the concurrency problem in hybrid shuffle
> -
>
> Key: FLINK-28925
> URL: https://issues.apache.org/jira/browse/FLINK-28925
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Weijie Guo
>Priority: Blocker
> Fix For: 1.16.0
>
>
> Through tpc-ds testing and code analysis, I found some thread unsafe problems 
> in hybrid shuffle:
>  # HsSubpartitionMemeoryDataManager#consumeBuffer should return a 
> readOnlySlice buffer to downstream instead of original buffer: If the 
> spilling thread is processing while  downstream task is consuming the same 
> buffer, the amount of data written to the disk will be smaller than the 
> actual value. To solve this, we should let the consuming thread and the 
> spilling thread share the same data but not index.
>  # HsSubpartitionMemoryDataManager#releaseSubpartitionBuffers should ignore 
> the release decision if the buffer already removed from bufferIndexToContexts 
> instead of throw an exception. It should be pointed out that although the 
> actual release operation is synchronous, a double release can still happen. 
> The reason is that non-global decisions do not need to be synchronized. In 
> other words, the main task thread and the consumer thread may decide to 
> release a buffer at the same time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally

2022-08-15 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580061#comment-17580061
 ] 

Yun Tang commented on FLINK-28984:
--

I think the second analysis looks a bit weird. If the FSDataOutputStream has 
not been created, it will not call 
{{FsCheckpointStateOutputStream#createStream}} , which means we will not get 
the generated random path.

The SafetyNetCloseableRegistry would help close the unreleased closable on JVM 
GC, it must be we forget to close them in somewhere.

> FsCheckpointStateOutputStream is not being released normally
> 
>
> Key: FLINK-28984
> URL: https://issues.apache.org/jira/browse/FLINK-28984
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.6, 1.15.1
>Reporter: ChangjiGuo
>Priority: Major
>
> If the checkpoint is aborted, AsyncSnapshotCallable will close the 
> snapshotCloseableRegistry when it is canceled. There may be two situations 
> here:
>  # The FSDataOutputStream has been created and closed while closing 
> FsCheckpointStateOutputStream.
>  # The FSDataOutputStream has not been created yet, but closed flag has been 
> set to true. You can see this in log:
> {code:java}
> 2022-08-16 12:55:44,161 WARN  
> org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
> unclosed resource via safety-net: 
> ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
>  : 
> x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
>  {code}
>         The output stream will be automatically closed by the 
> SafetyNetCloseableRegistry but the file will not be deleted.
> The second case usually occurs when the storage system has high latency in 
> creating files.
> How to reproduce?
> This is not easy to reproduce, but you can try to set a smaller checkpoint 
> timeout and increase the parallelism of the flink job.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] SteNicholas commented on pull request #266: [FLINK-28967] Fix Hive multi-version support for Table Store

2022-08-15 Thread GitBox


SteNicholas commented on PR #266:
URL: 
https://github.com/apache/flink-table-store/pull/266#issuecomment-1216181701

   @tsreaper, the Hive 2.1 and 2.2 use the latest version 2.1.1 and 2.2.0, 
hence does the Hive 2.3 need to support the latest version 2.3.9?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28429) Reduce pyflink tests time

2022-08-15 Thread Huang Xingbo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo updated FLINK-28429:
-
Labels:   (was: test-stability)

> Reduce pyflink tests time
> -
>
> Key: FLINK-28429
> URL: https://issues.apache.org/jira/browse/FLINK-28429
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Major
> Fix For: 1.16.0
>
>
> Currently, it costs about 1 hour 30mins in pyflink tests. We need to optimize 
> it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-28920) Release Testing: Verify Python DataStream Window

2022-08-15 Thread Huang Xingbo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo resolved FLINK-28920.
--
Resolution: Done

> Release Testing: Verify Python DataStream Window
> 
>
> Key: FLINK-28920
> URL: https://issues.apache.org/jira/browse/FLINK-28920
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: Luning Wang
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> * Build flink source code and compile source code
> {code:bash}
> $ cd {flink-source-code}
> $ mvn clean install -DskipTests
> {code}
> * Prepare a Python Virtual Environment
> {code:bash}
> $ cd flink-python/dev
> $ ./lint-python.sh -s basic
> $ source .conda/bin/activate
> {code}
> * Install PyFlink from source code. For more details, you can refer to the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
> {code:bash}
> $ cd flink-python/apache-flink-libraries
> $ python setup.py sdist
> $ pip install dist/*.tar.gz
> $ cd ..
> $ pip install -r dev/dev-requirements.txt
> $ python setpy.py sdist
> $ pip install dist/*.tar.gz
> {code}
> h1. Test
> * Write a python datastream window job  in thread mode. For details of 
> Window, you can refer to the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/].
> {code:python}
> from typing import Tuple
> from pyflink.common import Configuration
> from pyflink.common.time import Time
> from pyflink.common.typeinfo import Types
> from pyflink.common.watermark_strategy import WatermarkStrategy, 
> TimestampAssigner
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.functions import AggregateFunction
> from pyflink.datastream.window import EventTimeSessionWindows
> class SecondColumnTimestampAssigner(TimestampAssigner):
> def extract_timestamp(self, value, record_timestamp) -> int:
> return int(value[1])
> def main():
> config = Configuration()
> # thread mode
> config.set_string("python.execution-mode", "thread")
> # process mode
> # config.set_string("python.execution-mode", "process")
> env = StreamExecutionEnvironment.get_execution_environment(config)
> data_stream = env.from_collection([
> ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 
> 15)],
> type_info=Types.TUPLE([Types.STRING(), Types.INT()]))  # type: 
> DataStream
> watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> class MyAggregateFunction(AggregateFunction):
> def create_accumulator(self) -> Tuple[int, str]:
> return 0, ''
> def add(self, value: Tuple[str, int], accumulator: Tuple[int, str]) 
> -> Tuple[int, str]:
> return value[1] + accumulator[0], value[0]
> def get_result(self, accumulator: Tuple[str, int]):
> return accumulator[1], accumulator[0]
> def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]):
> return acc_a[0] + acc_b[0], acc_a[1]
> ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
> .key_by(lambda x: x[0], key_type=Types.STRING()) \
> .window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
> .aggregate(MyAggregateFunction(),
>accumulator_type=Types.TUPLE([Types.INT(), 
> Types.STRING()]),
>output_type=Types.TUPLE([Types.STRING(), Types.INT()]))
> ds.print()
> env.execute('test_window_aggregate_accumulator_type')
> if __name__ == '__main__':
> main()
> {code}
> * run the python datastream window job and watch the result
> {code:bash}
> $ python demo.py
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-28918) Release Testing: Verify FLIP-206 in Python DataStream API

2022-08-15 Thread Huang Xingbo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo resolved FLINK-28918.
--
Resolution: Done

> Release Testing: Verify FLIP-206 in Python DataStream API
> -
>
> Key: FLINK-28918
> URL: https://issues.apache.org/jira/browse/FLINK-28918
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: Luning Wang
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> * Build flink source code and compile source code
> {code:bash}
> $ cd {flink-source-code}
> $ mvn clean install -DskipTests
> {code}
> * Prepare a Python Virtual Environment
> {code:bash}
> $ cd flink-python/dev
> $ ./lint-python.sh -s basic
> $ source .conda/bin/activate
> {code}
> * Install PyFlink from source code. For more details, you can refer to the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
> {code:bash}
> $ cd flink-python/apache-flink-libraries
> $ python setup.py sdist
> $ pip install dist/*.tar.gz
> $ cd ..
> $ pip install -r dev/dev-requirements.txt
> $ python setpy.py sdist
> $ pip install dist/*.tar.gz
> {code}
> h1. Test
> * Write a python datastream job  in thread mode
> {code:python}
> from pyflink.common import Configuration
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> def main():
> config = Configuration()
> config.set_string("python.execution-mode", "thread")
> env = StreamExecutionEnvironment.get_execution_environment(config)
> ds = env.from_collection(
> [(1, '9', 0), (1, '5', 1), (1, '6', 2), (5, '5', 0), (5, '3', 1)],
> type_info=Types.ROW_NAMED(["v1", "v2", "v3"],
>   [Types.INT(), Types.STRING(), Types.INT()]))
> def flat_map_func1(data):
> for i in data:
> yield int(i), 1
> def flat_map_func2(data):
> for i in data:
> yield i
> ds = ds.key_by(lambda x: x[0]) \
> .min_by("v2") \
> .map(lambda x: (x[0], x[1], x[2]),
> output_type=Types.TUPLE([Types.INT(), Types.STRING(), 
> Types.INT()])) \
> .key_by(lambda x: x[2]) \
> .max_by(0) \
> .flat_map(flat_map_func1, output_type=Types.TUPLE([Types.INT(), 
> Types.INT()])) \
> .key_by(lambda x: [1]) \
> .min_by() \
> .flat_map(flat_map_func2, output_type=Types.INT()) \
> .key_by(lambda x: x) \
> .max_by()
> ds.print()
> env.execute("key_by_min_by_max_by_test_batch")
> if __name__ == '__main__':
> main()
> {code}
> * run the python datastream job and watch the result
> {code:bash}
> $ python demo.py
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28983) using serviceaccount in FlinkDeployment not works when sink to aws s3

2022-08-15 Thread Lichuan Shang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lichuan Shang updated FLINK-28983:
--
Description: 
I am deploying a Flink CDC job using sql-runner example from official 
examples(see 
[https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example).]
 

The _flink_ service account has all s3 permissions (`s3:*` in iam policy) but 
the k8s pod keeps on restarting and there's too much errors on pod's log:

 
{code:java}
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739)
        at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304)
        at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at akka.actor.Actor.aroundReceive(Actor.scala:537)
        at akka.actor.Actor.aroundReceive$(Actor.scala:535)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
        at akka.actor.ActorCell.invoke(ActorCell.scala:548)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
        at akka.dispatch.Mailbox.run(Mailbox.scala:231)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
        ... 4 more
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: doesBucketExist on 
nwlogs: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request 
(Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 
B3ZNHK1DSM4JDJ39; S3 Extended Request ID: 
egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=; 
Proxy: null), S3 Extended Request ID: 
egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=:400
 Bad Request: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 
400 Bad Request; Request ID: B3ZNHK1DSM4JDJ39; S3 Extended Request ID: 
egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=; 
Proxy: null)
        at 
org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:224)
        at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
        at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
        at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
        at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
        at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236)
        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:391)
        at 
org.apache.

[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #141: [FLINK-28805] Add Transformer for HashingTF

2022-08-15 Thread GitBox


yunfengzhou-hub commented on code in PR #141:
URL: https://github.com/apache/flink-ml/pull/141#discussion_r946314824


##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/hashingtf/HashingTF.java:
##
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.hashingtf;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.shaded.guava30.com.google.common.hash.Hashing.murmur3_32;
+
+/**
+ * A Transformer that maps a sequence of terms(strings, numbers, booleans) to 
a sparse vector with a

Review Comment:
   nit: This should be "vector" instead of "sparse vector" corresponding to the 
`VectorTypeInfo.INSTANCE` used when generating `outputTypeInfo`.



##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/hashingtf/HashingTF.java:
##
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.hashingtf;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.shaded.guava30.com.google.common.hash.Hashing.murmur3_32;
+
+/**
+ * A Transformer that maps a sequence of terms(strings, numbers, booleans) to 
a sparse vector with a
+ * specified dimension using the hashing trick.
+ *
+ * If multiple features are projected into the same column, the output 
values are accumulated by
+ * default. Users could also enforce all non-zero output values as 1 by 
setting {@link
+ * HashingTFParams#BINARY} as true.
+ *
+ * For the hashing trick, see https://en.wikipedia.org/wiki/Feature_hashing

[jira] [Commented] (FLINK-28918) Release Testing: Verify FLIP-206 in Python DataStream API

2022-08-15 Thread Huang Xingbo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580058#comment-17580058
 ] 

Huang Xingbo commented on FLINK-28918:
--

[~ana4] Thanks a lot for the test.

> Release Testing: Verify FLIP-206 in Python DataStream API
> -
>
> Key: FLINK-28918
> URL: https://issues.apache.org/jira/browse/FLINK-28918
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: Luning Wang
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> * Build flink source code and compile source code
> {code:bash}
> $ cd {flink-source-code}
> $ mvn clean install -DskipTests
> {code}
> * Prepare a Python Virtual Environment
> {code:bash}
> $ cd flink-python/dev
> $ ./lint-python.sh -s basic
> $ source .conda/bin/activate
> {code}
> * Install PyFlink from source code. For more details, you can refer to the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
> {code:bash}
> $ cd flink-python/apache-flink-libraries
> $ python setup.py sdist
> $ pip install dist/*.tar.gz
> $ cd ..
> $ pip install -r dev/dev-requirements.txt
> $ python setpy.py sdist
> $ pip install dist/*.tar.gz
> {code}
> h1. Test
> * Write a python datastream job  in thread mode
> {code:python}
> from pyflink.common import Configuration
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> def main():
> config = Configuration()
> config.set_string("python.execution-mode", "thread")
> env = StreamExecutionEnvironment.get_execution_environment(config)
> ds = env.from_collection(
> [(1, '9', 0), (1, '5', 1), (1, '6', 2), (5, '5', 0), (5, '3', 1)],
> type_info=Types.ROW_NAMED(["v1", "v2", "v3"],
>   [Types.INT(), Types.STRING(), Types.INT()]))
> def flat_map_func1(data):
> for i in data:
> yield int(i), 1
> def flat_map_func2(data):
> for i in data:
> yield i
> ds = ds.key_by(lambda x: x[0]) \
> .min_by("v2") \
> .map(lambda x: (x[0], x[1], x[2]),
> output_type=Types.TUPLE([Types.INT(), Types.STRING(), 
> Types.INT()])) \
> .key_by(lambda x: x[2]) \
> .max_by(0) \
> .flat_map(flat_map_func1, output_type=Types.TUPLE([Types.INT(), 
> Types.INT()])) \
> .key_by(lambda x: [1]) \
> .min_by() \
> .flat_map(flat_map_func2, output_type=Types.INT()) \
> .key_by(lambda x: x) \
> .max_by()
> ds.print()
> env.execute("key_by_min_by_max_by_test_batch")
> if __name__ == '__main__':
> main()
> {code}
> * run the python datastream job and watch the result
> {code:bash}
> $ python demo.py
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28920) Release Testing: Verify Python DataStream Window

2022-08-15 Thread Huang Xingbo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580057#comment-17580057
 ] 

Huang Xingbo commented on FLINK-28920:
--

[~ana4] Thanks a lot for the test.

> Release Testing: Verify Python DataStream Window
> 
>
> Key: FLINK-28920
> URL: https://issues.apache.org/jira/browse/FLINK-28920
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: Luning Wang
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> * Build flink source code and compile source code
> {code:bash}
> $ cd {flink-source-code}
> $ mvn clean install -DskipTests
> {code}
> * Prepare a Python Virtual Environment
> {code:bash}
> $ cd flink-python/dev
> $ ./lint-python.sh -s basic
> $ source .conda/bin/activate
> {code}
> * Install PyFlink from source code. For more details, you can refer to the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
> {code:bash}
> $ cd flink-python/apache-flink-libraries
> $ python setup.py sdist
> $ pip install dist/*.tar.gz
> $ cd ..
> $ pip install -r dev/dev-requirements.txt
> $ python setpy.py sdist
> $ pip install dist/*.tar.gz
> {code}
> h1. Test
> * Write a python datastream window job  in thread mode. For details of 
> Window, you can refer to the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/].
> {code:python}
> from typing import Tuple
> from pyflink.common import Configuration
> from pyflink.common.time import Time
> from pyflink.common.typeinfo import Types
> from pyflink.common.watermark_strategy import WatermarkStrategy, 
> TimestampAssigner
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.functions import AggregateFunction
> from pyflink.datastream.window import EventTimeSessionWindows
> class SecondColumnTimestampAssigner(TimestampAssigner):
> def extract_timestamp(self, value, record_timestamp) -> int:
> return int(value[1])
> def main():
> config = Configuration()
> # thread mode
> config.set_string("python.execution-mode", "thread")
> # process mode
> # config.set_string("python.execution-mode", "process")
> env = StreamExecutionEnvironment.get_execution_environment(config)
> data_stream = env.from_collection([
> ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 
> 15)],
> type_info=Types.TUPLE([Types.STRING(), Types.INT()]))  # type: 
> DataStream
> watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> class MyAggregateFunction(AggregateFunction):
> def create_accumulator(self) -> Tuple[int, str]:
> return 0, ''
> def add(self, value: Tuple[str, int], accumulator: Tuple[int, str]) 
> -> Tuple[int, str]:
> return value[1] + accumulator[0], value[0]
> def get_result(self, accumulator: Tuple[str, int]):
> return accumulator[1], accumulator[0]
> def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]):
> return acc_a[0] + acc_b[0], acc_a[1]
> ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
> .key_by(lambda x: x[0], key_type=Types.STRING()) \
> .window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
> .aggregate(MyAggregateFunction(),
>accumulator_type=Types.TUPLE([Types.INT(), 
> Types.STRING()]),
>output_type=Types.TUPLE([Types.STRING(), Types.INT()]))
> ds.print()
> env.execute('test_window_aggregate_accumulator_type')
> if __name__ == '__main__':
> main()
> {code}
> * run the python datastream window job and watch the result
> {code:bash}
> $ python demo.py
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] syhily commented on a diff in pull request #20567: [FLINK-27917][Connector/Pulsar] Fix message assert error in PulsarUnorderedPartitionSplitReaderTest

2022-08-15 Thread GitBox


syhily commented on code in PR #20567:
URL: https://github.com/apache/flink/pull/20567#discussion_r946357858


##
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java:
##
@@ -198,7 +213,7 @@ void 
pollMessageAfterTimeout(PulsarPartitionSplitReaderBase splitReader)
 String topicName = randomAlphabetic(10);
 
 // Add a split
-seekStartPositionAndHandleSplit(splitReader, topicName, 0);
+handleSplit(splitReader, topicName, 0, MessageId.latest);

Review Comment:
   @tisonkun Yep.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28920) Release Testing: Verify Python DataStream Window

2022-08-15 Thread Luning Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580053#comment-17580053
 ] 

Luning Wang commented on FLINK-28920:
-

This test can successfully run on an M1 chip. [~hxbks2ks]

> Release Testing: Verify Python DataStream Window
> 
>
> Key: FLINK-28920
> URL: https://issues.apache.org/jira/browse/FLINK-28920
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: Luning Wang
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> * Build flink source code and compile source code
> {code:bash}
> $ cd {flink-source-code}
> $ mvn clean install -DskipTests
> {code}
> * Prepare a Python Virtual Environment
> {code:bash}
> $ cd flink-python/dev
> $ ./lint-python.sh -s basic
> $ source .conda/bin/activate
> {code}
> * Install PyFlink from source code. For more details, you can refer to the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
> {code:bash}
> $ cd flink-python/apache-flink-libraries
> $ python setup.py sdist
> $ pip install dist/*.tar.gz
> $ cd ..
> $ pip install -r dev/dev-requirements.txt
> $ python setpy.py sdist
> $ pip install dist/*.tar.gz
> {code}
> h1. Test
> * Write a python datastream window job  in thread mode. For details of 
> Window, you can refer to the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/].
> {code:python}
> from typing import Tuple
> from pyflink.common import Configuration
> from pyflink.common.time import Time
> from pyflink.common.typeinfo import Types
> from pyflink.common.watermark_strategy import WatermarkStrategy, 
> TimestampAssigner
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.functions import AggregateFunction
> from pyflink.datastream.window import EventTimeSessionWindows
> class SecondColumnTimestampAssigner(TimestampAssigner):
> def extract_timestamp(self, value, record_timestamp) -> int:
> return int(value[1])
> def main():
> config = Configuration()
> # thread mode
> config.set_string("python.execution-mode", "thread")
> # process mode
> # config.set_string("python.execution-mode", "process")
> env = StreamExecutionEnvironment.get_execution_environment(config)
> data_stream = env.from_collection([
> ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 
> 15)],
> type_info=Types.TUPLE([Types.STRING(), Types.INT()]))  # type: 
> DataStream
> watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> class MyAggregateFunction(AggregateFunction):
> def create_accumulator(self) -> Tuple[int, str]:
> return 0, ''
> def add(self, value: Tuple[str, int], accumulator: Tuple[int, str]) 
> -> Tuple[int, str]:
> return value[1] + accumulator[0], value[0]
> def get_result(self, accumulator: Tuple[str, int]):
> return accumulator[1], accumulator[0]
> def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]):
> return acc_a[0] + acc_b[0], acc_a[1]
> ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
> .key_by(lambda x: x[0], key_type=Types.STRING()) \
> .window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
> .aggregate(MyAggregateFunction(),
>accumulator_type=Types.TUPLE([Types.INT(), 
> Types.STRING()]),
>output_type=Types.TUPLE([Types.STRING(), Types.INT()]))
> ds.print()
> env.execute('test_window_aggregate_accumulator_type')
> if __name__ == '__main__':
> main()
> {code}
> * run the python datastream window job and watch the result
> {code:bash}
> $ python demo.py
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28918) Release Testing: Verify FLIP-206 in Python DataStream API

2022-08-15 Thread Luning Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580054#comment-17580054
 ] 

Luning Wang commented on FLINK-28918:
-

This test can successfully run on an M1 chip. [~hxbks2ks]

> Release Testing: Verify FLIP-206 in Python DataStream API
> -
>
> Key: FLINK-28918
> URL: https://issues.apache.org/jira/browse/FLINK-28918
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: Luning Wang
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> * Build flink source code and compile source code
> {code:bash}
> $ cd {flink-source-code}
> $ mvn clean install -DskipTests
> {code}
> * Prepare a Python Virtual Environment
> {code:bash}
> $ cd flink-python/dev
> $ ./lint-python.sh -s basic
> $ source .conda/bin/activate
> {code}
> * Install PyFlink from source code. For more details, you can refer to the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
> {code:bash}
> $ cd flink-python/apache-flink-libraries
> $ python setup.py sdist
> $ pip install dist/*.tar.gz
> $ cd ..
> $ pip install -r dev/dev-requirements.txt
> $ python setpy.py sdist
> $ pip install dist/*.tar.gz
> {code}
> h1. Test
> * Write a python datastream job  in thread mode
> {code:python}
> from pyflink.common import Configuration
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> def main():
> config = Configuration()
> config.set_string("python.execution-mode", "thread")
> env = StreamExecutionEnvironment.get_execution_environment(config)
> ds = env.from_collection(
> [(1, '9', 0), (1, '5', 1), (1, '6', 2), (5, '5', 0), (5, '3', 1)],
> type_info=Types.ROW_NAMED(["v1", "v2", "v3"],
>   [Types.INT(), Types.STRING(), Types.INT()]))
> def flat_map_func1(data):
> for i in data:
> yield int(i), 1
> def flat_map_func2(data):
> for i in data:
> yield i
> ds = ds.key_by(lambda x: x[0]) \
> .min_by("v2") \
> .map(lambda x: (x[0], x[1], x[2]),
> output_type=Types.TUPLE([Types.INT(), Types.STRING(), 
> Types.INT()])) \
> .key_by(lambda x: x[2]) \
> .max_by(0) \
> .flat_map(flat_map_func1, output_type=Types.TUPLE([Types.INT(), 
> Types.INT()])) \
> .key_by(lambda x: [1]) \
> .min_by() \
> .flat_map(flat_map_func2, output_type=Types.INT()) \
> .key_by(lambda x: x) \
> .max_by()
> ds.print()
> env.execute("key_by_min_by_max_by_test_batch")
> if __name__ == '__main__':
> main()
> {code}
> * run the python datastream job and watch the result
> {code:bash}
> $ python demo.py
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails

2022-08-15 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580044#comment-17580044
 ] 

ChangjiGuo edited comment on FLINK-28927 at 8/16/22 5:31 AM:
-

I also found another problem when debugging the code.

See [here|https://issues.apache.org/jira/browse/FLINK-28984]

[~yunta] Can you assign these two issues to me? Thanks!
 
 


was (Author: changjiguo):
I also found another problem when debugging the code.

See [https://issues.apache.org/jira/projects/FLINK/issues/FLINK-28984]

[~yunta] Can you assign these two issues to me? Thanks!
 

> Can not clean up the uploaded shared files when the checkpoint fails
> 
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
> thread and do some cleanup work, including the following:
>  * Cancel all AsyncSnapshotCallable thread.
>  * If the thread has finished, it will clean up all state object.
>  * If the thread has not completed, it will be interrupted(maybe).
>  * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
>  * Files that have finished uploading before the thread is canceled.
>  * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it and it works well so far.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28985) support create table like view

2022-08-15 Thread waywtdcc (Jira)
waywtdcc created FLINK-28985:


 Summary: support create table like view
 Key: FLINK-28985
 URL: https://issues.apache.org/jira/browse/FLINK-28985
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.16.0
Reporter: waywtdcc
 Fix For: 1.16.0


At present, to create a table based on table like, you can only use the table 
type table, not the view type.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally

2022-08-15 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-28984:
---
Description: 
If the checkpoint is aborted, AsyncSnapshotCallable will close the 
snapshotCloseableRegistry when it is canceled. There may be two situations here:
 # The FSDataOutputStream has been created and closed while closing 
FsCheckpointStateOutputStream.
 # The FSDataOutputStream has not been created yet, but closed flag has been 
set to true. You can see this in log:
{code:java}
2022-08-16 12:55:44,161 WARN  
org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
unclosed resource via safety-net: 
ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
 : 
x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
 {code}

        The output stream will be automatically closed by the 
SafetyNetCloseableRegistry but the file will not be deleted.

The second case usually occurs when the storage system has high latency in 
creating files.

How to reproduce?

This is not easy to reproduce, but you can try to set a smaller checkpoint 
timeout and increase the parallelism of the flink job.
 

  was:
If the checkpoint is aborted, AsyncSnapshotCallable will close the 
snapshotCloseableRegistry when it is canceled. There may be two situations here:
 # The FSDataOutputStream has been created and closed while closing 
FsCheckpointStateOutputStream.
 # The FSDataOutputStream has not been created yet, but closed flag has been 
set to true. You can see this in log:
{code:java}
2022-08-16 12:55:44,161 WARN  
org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
unclosed resource via safety-net: 
ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
 : 
x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
 {code}

        The output stream will be automatically closed by the 
SafetyNetCloseableRegistry but the file will not be deleted.

The second case usually occurs when the storage system has high latency in 
creating files.

 


> FsCheckpointStateOutputStream is not being released normally
> 
>
> Key: FLINK-28984
> URL: https://issues.apache.org/jira/browse/FLINK-28984
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.6, 1.15.1
>Reporter: ChangjiGuo
>Priority: Major
>
> If the checkpoint is aborted, AsyncSnapshotCallable will close the 
> snapshotCloseableRegistry when it is canceled. There may be two situations 
> here:
>  # The FSDataOutputStream has been created and closed while closing 
> FsCheckpointStateOutputStream.
>  # The FSDataOutputStream has not been created yet, but closed flag has been 
> set to true. You can see this in log:
> {code:java}
> 2022-08-16 12:55:44,161 WARN  
> org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
> unclosed resource via safety-net: 
> ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
>  : 
> x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
>  {code}
>         The output stream will be automatically closed by the 
> SafetyNetCloseableRegistry but the file will not be deleted.
> The second case usually occurs when the storage system has high latency in 
> creating files.
> How to reproduce?
> This is not easy to reproduce, but you can try to set a smaller checkpoint 
> timeout and increase the parallelism of the flink job.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally

2022-08-15 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-28984:
---
External issue URL: https://issues.apache.org/jira/browse/FLINK-28927

> FsCheckpointStateOutputStream is not being released normally
> 
>
> Key: FLINK-28984
> URL: https://issues.apache.org/jira/browse/FLINK-28984
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.6, 1.15.1
>Reporter: ChangjiGuo
>Priority: Major
>
> If the checkpoint is aborted, AsyncSnapshotCallable will close the 
> snapshotCloseableRegistry when it is canceled. There may be two situations 
> here:
>  # The FSDataOutputStream has been created and closed while closing 
> FsCheckpointStateOutputStream.
>  # The FSDataOutputStream has not been created yet, but closed flag has been 
> set to true. You can see this in log:
> {code:java}
> 2022-08-16 12:55:44,161 WARN  
> org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
> unclosed resource via safety-net: 
> ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
>  : 
> x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
>  {code}
>         The output stream will be automatically closed by the 
> SafetyNetCloseableRegistry but the file will not be deleted.
> The second case usually occurs when the storage system has high latency in 
> creating files.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally

2022-08-15 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-28984:
---
External issue URL:   (was: 
https://issues.apache.org/jira/browse/FLINK-28927)

> FsCheckpointStateOutputStream is not being released normally
> 
>
> Key: FLINK-28984
> URL: https://issues.apache.org/jira/browse/FLINK-28984
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.6, 1.15.1
>Reporter: ChangjiGuo
>Priority: Major
>
> If the checkpoint is aborted, AsyncSnapshotCallable will close the 
> snapshotCloseableRegistry when it is canceled. There may be two situations 
> here:
>  # The FSDataOutputStream has been created and closed while closing 
> FsCheckpointStateOutputStream.
>  # The FSDataOutputStream has not been created yet, but closed flag has been 
> set to true. You can see this in log:
> {code:java}
> 2022-08-16 12:55:44,161 WARN  
> org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
> unclosed resource via safety-net: 
> ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
>  : 
> x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
>  {code}
>         The output stream will be automatically closed by the 
> SafetyNetCloseableRegistry but the file will not be deleted.
> The second case usually occurs when the storage system has high latency in 
> creating files.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails

2022-08-15 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580044#comment-17580044
 ] 

ChangjiGuo edited comment on FLINK-28927 at 8/16/22 5:15 AM:
-

I also found another problem when debugging the code.

See [https://issues.apache.org/jira/projects/FLINK/issues/FLINK-28984]

[~yunta] Can you assign these two issues to me? Thanks!
 


was (Author: changjiguo):
I also found another problem when debugging the code.

See [https://issues.apache.org/jira/projects/FLINK/issues/FLINK-28984]
 

> Can not clean up the uploaded shared files when the checkpoint fails
> 
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
> thread and do some cleanup work, including the following:
>  * Cancel all AsyncSnapshotCallable thread.
>  * If the thread has finished, it will clean up all state object.
>  * If the thread has not completed, it will be interrupted(maybe).
>  * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
>  * Files that have finished uploading before the thread is canceled.
>  * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it and it works well so far.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28927) Can not clean up the uploaded shared files when the checkpoint fails

2022-08-15 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580044#comment-17580044
 ] 

ChangjiGuo commented on FLINK-28927:


I also found another problem when debugging the code.

See [https://issues.apache.org/jira/projects/FLINK/issues/FLINK-28984]
 

> Can not clean up the uploaded shared files when the checkpoint fails
> 
>
> Key: FLINK-28927
> URL: https://issues.apache.org/jira/browse/FLINK-28927
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.6, 1.15.1
> Environment: Flink-1.11
>Reporter: ChangjiGuo
>Priority: Major
>
> If a checkpoint times out, the task will cancel the AsyncCheckpointRunnable 
> thread and do some cleanup work, including the following:
>  * Cancel all AsyncSnapshotCallable thread.
>  * If the thread has finished, it will clean up all state object.
>  * If the thread has not completed, it will be interrupted(maybe).
>  * Close snapshotCloseableRegistry.
> In my case, the thread was interrupted while waiting for the file upload to 
> complete, but the file was not cleaned up.
> RocksDBStateUploader.java
> {code:java}
> FutureUtils.waitForAll(futures.values()).get();
> {code}
> It will wait for all files to be uploaded here. Although it has been 
> interrupted, the uploaded files will not be cleaned up. The remaining files 
> are mainly divided into:
>  * Files that have finished uploading before the thread is canceled.
>  * outputStream.closeAndGetHandle() is called, but snapshotCloseableRegistry 
> has not been closed.
> How to reproduce?
> Shorten the checkpoint timeout time, making the checkpoint fail. Then check 
> if there are any files in the shared directory.
> I'm testing on Flink-1.11, but I found the code from the latest branch may 
> have the same problem. I tried to fix it and it works well so far.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally

2022-08-15 Thread ChangjiGuo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-28984:
---
Description: 
If the checkpoint is aborted, AsyncSnapshotCallable will close the 
snapshotCloseableRegistry when it is canceled. There may be two situations here:
 # The FSDataOutputStream has been created and closed while closing 
FsCheckpointStateOutputStream.
 # The FSDataOutputStream has not been created yet, but closed flag has been 
set to true. You can see this in log:
{code:java}
2022-08-16 12:55:44,161 WARN  
org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
unclosed resource via safety-net: 
ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
 : 
x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
 {code}

        The output stream will be automatically closed by the 
SafetyNetCloseableRegistry but the file will not be deleted.

The second case usually occurs when the storage system has high latency in 
creating files.

 

  was:
If the checkpoint is aborted, AsyncSnapshotCallable will close the 
snapshotCloseableRegistry when it is canceled. There may be two situations here:
 # The FSDataOutputStream has been created and closed while closing 
FsCheckpointStateOutputStream.
 # The FSDataOutputStream has not been created yet, but closed flag has been 
set to true. You can see this in log:
{code:java}
2022-08-16 12:55:44,161 WARN  
org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
unclosed resource via safety-net: 
ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
 : 
x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
 {code}

        The output stream will be automatically closed by the 
SafetyNetCloseableRegistry          but the file will not be deleted.

The second case usually occurs when the storage system has high latency in 
creating files.

 


> FsCheckpointStateOutputStream is not being released normally
> 
>
> Key: FLINK-28984
> URL: https://issues.apache.org/jira/browse/FLINK-28984
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.6, 1.15.1
>Reporter: ChangjiGuo
>Priority: Major
>
> If the checkpoint is aborted, AsyncSnapshotCallable will close the 
> snapshotCloseableRegistry when it is canceled. There may be two situations 
> here:
>  # The FSDataOutputStream has been created and closed while closing 
> FsCheckpointStateOutputStream.
>  # The FSDataOutputStream has not been created yet, but closed flag has been 
> set to true. You can see this in log:
> {code:java}
> 2022-08-16 12:55:44,161 WARN  
> org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
> unclosed resource via safety-net: 
> ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
>  : 
> x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
>  {code}
>         The output stream will be automatically closed by the 
> SafetyNetCloseableRegistry but the file will not be deleted.
> The second case usually occurs when the storage system has high latency in 
> creating files.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28954) Release Testing: Verify FLIP-223 HiveServer2 Endpoint

2022-08-15 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-28954:
---

Assignee: luoyuxia

> Release Testing: Verify FLIP-223 HiveServer2 Endpoint
> -
>
> Key: FLINK-28954
> URL: https://issues.apache.org/jira/browse/FLINK-28954
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive, Table SQL / Gateway
>Affects Versions: 1.16.0
>Reporter: Shengkai Fang
>Assignee: luoyuxia
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> HiveServer2 Endpoint is ready to use in this version. I think we can verify:
>  # We can start the SQL Gateway with HiveServer2 Endpoint
>  # User is able to sumit SQL with Hive beeline
>  # User is able to sumit SQL with DBeaver



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28984) FsCheckpointStateOutputStream is not being released normally

2022-08-15 Thread ChangjiGuo (Jira)
ChangjiGuo created FLINK-28984:
--

 Summary: FsCheckpointStateOutputStream is not being released 
normally
 Key: FLINK-28984
 URL: https://issues.apache.org/jira/browse/FLINK-28984
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.15.1, 1.11.6
Reporter: ChangjiGuo


If the checkpoint is aborted, AsyncSnapshotCallable will close the 
snapshotCloseableRegistry when it is canceled. There may be two situations here:
 # The FSDataOutputStream has been created and closed while closing 
FsCheckpointStateOutputStream.
 # The FSDataOutputStream has not been created yet, but closed flag has been 
set to true. You can see this in log:
{code:java}
2022-08-16 12:55:44,161 WARN  
org.apache.flink.core.fs.SafetyNetCloseableRegistry   - Closing 
unclosed resource via safety-net: 
ClosingFSDataOutputStream(org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream@4ebe8e64)
 : 
x/flink/checkpoint/state/9214a2e302904b14baf2dc1aacbc7933/ae157c5a05a8922a46a179cdb4c86b10/shared/9d8a1e92-2f69-4ab0-8ce9-c1beb149229a
 {code}

        The output stream will be automatically closed by the 
SafetyNetCloseableRegistry          but the file will not be deleted.

The second case usually occurs when the storage system has high latency in 
creating files.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28952) Release Testing: Veify Hive dialect

2022-08-15 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-28952:
---

Assignee: Shengkai Fang

> Release Testing: Veify Hive dialect 
> 
>
> Key: FLINK-28952
> URL: https://issues.apache.org/jira/browse/FLINK-28952
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive, Table SQL / API
>Affects Versions: 1.16.0
>Reporter: luoyuxia
>Assignee: Shengkai Fang
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28983) using serviceaccount in FlinkDeployment not works when sink to aws s3

2022-08-15 Thread Lichuan Shang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lichuan Shang updated FLINK-28983:
--
Description: 
I am deploying a Flink CDC job using sql-runner example from official 
examples(see 
[https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example).]
 

The _flink_ service account has all s3 permissions (`s3:*` in iam policy) but 
the k8s pod keeps on restarting and there's too much errors on pod's log:

 
{code:java}
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739)
        at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304)
        at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at akka.actor.Actor.aroundReceive(Actor.scala:537)
        at akka.actor.Actor.aroundReceive$(Actor.scala:535)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
        at akka.actor.ActorCell.invoke(ActorCell.scala:548)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
        at akka.dispatch.Mailbox.run(Mailbox.scala:231)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
        ... 4 more
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: doesBucketExist on 
nwlogs: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request 
(Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 
B3ZNHK1DSM4JDJ39; S3 Extended Request ID: 
egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=; 
Proxy: null), S3 Extended Request ID: 
egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=:400
 Bad Request: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 
400 Bad Request; Request ID: B3ZNHK1DSM4JDJ39; S3 Extended Request ID: 
egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=; 
Proxy: null)
        at 
org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:224)
        at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
        at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
        at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
        at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
        at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236)
        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:391)
        at 
org.apache.

[jira] [Created] (FLINK-28983) using serviceaccount in FlinkDeployment not works when sink to aws s3

2022-08-15 Thread Lichuan Shang (Jira)
Lichuan Shang created FLINK-28983:
-

 Summary: using serviceaccount in FlinkDeployment not works when 
sink to aws s3
 Key: FLINK-28983
 URL: https://issues.apache.org/jira/browse/FLINK-28983
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.1.0
Reporter: Lichuan Shang


I am deploying a Flink CDC job using sql-runner example from official 
examples(see 
[https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example).]
 

The `flink` service account has all s3 permissions (`s3:*` in iam policy) but 
the k8s pod keeps on restarting and there's too much errors on pod's log:

```

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739)
        at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304)
        at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at akka.actor.Actor.aroundReceive(Actor.scala:537)
        at akka.actor.Actor.aroundReceive$(Actor.scala:535)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
        at akka.actor.ActorCell.invoke(ActorCell.scala:548)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
        at akka.dispatch.Mailbox.run(Mailbox.scala:231)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
        ... 4 more
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: doesBucketExist on 
nwlogs: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request 
(Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 
B3ZNHK1DSM4JDJ39; S3 Extended Request ID: 
egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=; 
Proxy: null), S3 Extended Request ID: 
egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=:400
 Bad Request: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 
400 Bad Request; Request ID: B3ZNHK1DSM4JDJ39; S3 Extended Request ID: 
egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=; 
Proxy: null)
        at 
org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:224)
        at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
        at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
        at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
        at org.apache.had

[GitHub] [flink] imaffe commented on a diff in pull request #20567: [FLINK-27917][Connector/Pulsar] Fix message assert error in PulsarUnorderedPartitionSplitReaderTest

2022-08-15 Thread GitBox


imaffe commented on code in PR #20567:
URL: https://github.com/apache/flink/pull/20567#discussion_r946332754


##
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java:
##
@@ -198,7 +213,7 @@ void 
pollMessageAfterTimeout(PulsarPartitionSplitReaderBase splitReader)
 String topicName = randomAlphabetic(10);
 
 // Add a split
-seekStartPositionAndHandleSplit(splitReader, topicName, 0);
+handleSplit(splitReader, topicName, 0, MessageId.latest);

Review Comment:
   Here we should use seekStartPositionAndHandleSplit(). The difference is 
whether we want to do a seek before we call handleSplitChanges. Here we want to 
receive 0 message in the next poll() call, so we need to seek to the `latest` 
cursor so no data will be consumed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] tisonkun commented on a diff in pull request #20567: [FLINK-27917][Connector/Pulsar] Fix message assert error in PulsarUnorderedPartitionSplitReaderTest

2022-08-15 Thread GitBox


tisonkun commented on code in PR #20567:
URL: https://github.com/apache/flink/pull/20567#discussion_r946324038


##
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java:
##
@@ -198,7 +213,7 @@ void 
pollMessageAfterTimeout(PulsarPartitionSplitReaderBase splitReader)
 String topicName = randomAlphabetic(10);
 
 // Add a split
-seekStartPositionAndHandleSplit(splitReader, topicName, 0);
+handleSplit(splitReader, topicName, 0, MessageId.latest);

Review Comment:
   @syhily so you mean that these two changes are all about non-existed topic, 
and you leave those for existing topic as is?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-28965) Should create empty partition when it's for dynamic partition

2022-08-15 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-28965:
---

Assignee: tartarus

> Should create empty partition when it's for dynamic partition
> -
>
> Key: FLINK-28965
> URL: https://issues.apache.org/jira/browse/FLINK-28965
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.16.0
>Reporter: luoyuxia
>Assignee: tartarus
>Priority: Critical
> Fix For: 1.16.0
>
>
> Can be reproduced by the following code in HiveDialectTest:
>  
> {code:java}
> tableEnv.executeSql(
> "create table over1k_part_orc(\n"
> + "   si smallint,\n"
> + "   i int,\n"
> + "   b bigint,\n"
> + "   f float)\n"
> + "   partitioned by (ds string, t tinyint) stored as 
> orc");
> tableEnv.executeSql(
> "create table over1k(\n"
> + "   t tinyint,\n"
> + "   si smallint,\n"
> + "   i int,\n"
> + "   b bigint,\n"
> + "   f float,\n"
> + "   d double,\n"
> + "   bo boolean,\n"
> + "   s string,\n"
> + "   ts timestamp,\n"
> + "   dec decimal(4,2),\n"
> + "   bin binary)");
> tableEnv.executeSql(
> "insert overwrite table over1k_part_orc partition(ds=\"foo\", 
> t)"
> + " select si,i,b,f,t from over1k where t is null or 
> t=27 order by si")
> .await(); {code}
>  
> Althogh it's for dynamic partition, the current code will try to create a 
> partition for it (ds='foo') when there's no data wrotten to it , so the 
> exception will be thrown since the partition spec (ds='foo') is not full path
> {code:java}
> Caused by: MetaException(message:Invalid partition key & values; keys [ds, t, 
> ], values [foo, ]) {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28548) The commit partition base path is not created when no data is sent which may cause FileNotFoundException

2022-08-15 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-28548.
---
  Assignee: Liu
Resolution: Fixed

Fixed in master: f2abb51ac91c8b0e9bdd261de791d3aa1ba033dd

> The commit partition base path is not created when no data is sent which may 
> cause FileNotFoundException
> 
>
> Key: FLINK-28548
> URL: https://issues.apache.org/jira/browse/FLINK-28548
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.14.5, 1.15.1, 1.16.0
>Reporter: Liu
>Assignee: Liu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> The commit partition base path is not created when no data is sent which may 
> cause FileNotFoundException.  The exception is as following:
> {code:java}
> Caused by: java.io.FileNotFoundException: File 
> /home/ljg/test_sql.db/flink_batch_test/.staging_1657697612169 does not exist.
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:771)
>  ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?]
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:120)
>  ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?]
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:828)
>  ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?]
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:824)
>  ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?]
>   at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>  ~[hadoop-common-2.6.0U203-cdh5.4.4.jar:?]
>   at 
> org.apache.hadoop.hdfs.perflog.FileSystemLinkResolverWithStatistics$1.doCall(FileSystemLinkResolverWithStatistics.java:37)
>  ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?]
>   at org.apache.hadoop.hdfs.perflog.PerfProxy.call(PerfProxy.java:49) 
> ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?]
>   at 
> org.apache.hadoop.hdfs.perflog.FileSystemLinkResolverWithStatistics.resolve(FileSystemLinkResolverWithStatistics.java:39)
>  ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?]
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:835)
>  ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?]
>   at 
> org.apache.hadoop.fs.FilterFileSystem.listStatus(FilterFileSystem.java:238) 
> ~[hadoop-common-2.6.0U203-cdh5.4.4.jar:?]
>   at 
> org.apache.hadoop.fs.FilterFileSystem.listStatus(FilterFileSystem.java:238) 
> ~[hadoop-common-2.6.0U203-cdh5.4.4.jar:?]
>   at 
> org.apache.hadoop.fs.viewfs.ChRootedFileSystem.listStatus(ChRootedFileSystem.java:241)
>  ~[hadoop-common-2.6.0U203-cdh5.4.4.jar:?]
>   at 
> org.apache.hadoop.fs.viewfs.ViewFileSystem.listStatus(ViewFileSystem.java:376)
>  ~[hadoop-common-2.6.0U203-cdh5.4.4.jar:?]
>   at 
> org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170)
>  ~[flink-sql-connector-hive-2.3.6_2.12-1.15.0.jar:1.15.0]
>   at 
> org.apache.flink.connector.file.table.PartitionTempFileManager.listTaskTemporaryPaths(PartitionTempFileManager.java:87)
>  ~[flink-connector-files-1.15.0.jar:1.15.0]
>   at 
> org.apache.flink.connector.file.table.FileSystemCommitter.commitPartitions(FileSystemCommitter.java:78)
>  ~[flink-connector-files-1.15.0.jar:1.15.0]
>   at 
> org.apache.flink.connector.file.table.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:89)
>  ~[flink-connector-files-1.15.0.jar:1.15.0]
>   at 
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:153)
>  ~[flink-dist-1.15.0.jar:1.15.0]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.jobFinished(DefaultExecutionGraph.java:1190)
>  ~[flink-dist-1.15.0.jar:1.15.0]
>   ... 43 more {code}
> We should check whether the base path exists before listStatus for the path.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] wuchong merged pull request #20269: [FLINK-28548][Connectors / FileSystem] Fix the exception FileNotFoundException when the commit partition base path is not created

2022-08-15 Thread GitBox


wuchong merged PR #20269:
URL: https://github.com/apache/flink/pull/20269


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-28003) Hive sql is wrongly modified by SqlCompleter in SQL client when using -f {file}

2022-08-15 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-28003.
---
Resolution: Fixed

Fixed in master: 226f1602c047092cc1997f6e861aa37858df21f7

> Hive sql is wrongly modified by SqlCompleter in SQL client when using -f 
> {file}
> ---
>
> Key: FLINK-28003
> URL: https://issues.apache.org/jira/browse/FLINK-28003
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.15.0
>Reporter: Jing Zhang
>Assignee: Liu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.16.0
>
> Attachments: zj_test.sql
>
>
> When I run the following sql in SqlClient using 'sql-client.sh -f zj_test.sql'
> {code:java}
> create table if not exists db.zj_test(
> pos                   int,
> rank_cmd              string
> )
> partitioned by (
> `p_date` string,
> `p_hourmin` string);
> INSERT OVERWRITE TABLE db.zj_test PARTITION (p_date='20220605', p_hourmin = 
> '0100')
> SELECT
> pos ,
> rank_cmd
> FROM db.sourceT
> where p_date = '20220605' and p_hourmin = '0100'; {code}
> An error would be thrown out because the 'pos' field is changed to 
> 'POSITION'. I guess `SqlCompleter` in sqlClient module might do something 
> wrong.
> The error could be reproduced using the attached file.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28982) Start TaskInterrupter when task switch from DEPLOYING to CANCELING

2022-08-15 Thread LI Mingkun (Jira)
LI Mingkun created FLINK-28982:
--

 Summary: Start TaskInterrupter when task switch from DEPLOYING to 
CANCELING
 Key: FLINK-28982
 URL: https://issues.apache.org/jira/browse/FLINK-28982
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Web Frontend
Reporter: LI Mingkun
 Attachments: image-2022-08-16-12-10-43-894.png

Task will start TaskInterrupter only when `ExecutionState` is INITIALIZING or 
RUNNING in the function: 
org.apache.flink.runtime.taskmanager.Task#cancelOrFailAndCancelInvokableInternal

 

I met a dead lock in multi task which caused by Flink Remote Shuffle's sharing 
TCP connection bug and blocked tasks destruction when I use Flink Remote 
Shuffle.

stack as following:

!image-2022-08-16-12-10-43-894.png!

My question: Why not start the TaskInterrupter when cancel a deploying task?

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] wuchong merged pull request #20182: [FLINK-28003][Table SQL / Client] Disable SqlCompleter when using -f {file}

2022-08-15 Thread GitBox


wuchong merged PR #20182:
URL: https://github.com/apache/flink/pull/20182


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-28773) Fix Hive sink not write a success file after finish writing in batch mode

2022-08-15 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-28773.
---
  Assignee: tartarus
Resolution: Fixed

Fixed in master: 00cded670753e2795c774c159facb801bbeb9927 to 
b66680bb658dfb550e60a5e7440ee904c87219c6

> Fix Hive sink not write a success file after finish writing in batch mode
> -
>
> Key: FLINK-28773
> URL: https://issues.apache.org/jira/browse/FLINK-28773
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: luoyuxia
>Assignee: tartarus
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> Currently, in stream mode, we allow user to configure commit policy, but in 
> batch mode, we don't provide such way and it will always commit to metastore. 
> But user expect other commit policy such like write a success file in batch 
> mode. So, it'll be better to support write a success file after finish 
> writing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] wuchong merged pull request #20469: [FLINK-28773][hive] Fix Hive sink not write a success file after finish writing in batch mode

2022-08-15 Thread GitBox


wuchong merged PR #20469:
URL: https://github.com/apache/flink/pull/20469


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] curcur commented on a diff in pull request #20585: [WIP][FLINK-28976][state/changelog] Don't add extra delay to the 1st materialization

2022-08-15 Thread GitBox


curcur commented on code in PR #20585:
URL: https://github.com/apache/flink/pull/20585#discussion_r946315050


##
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java:
##
@@ -118,7 +118,7 @@ public void start() {
 
 LOG.info("Task {} starts periodic materialization", subtaskName);
 
-scheduleNextMaterialization(periodicMaterializeDelay + 
initialDelay);
+scheduleNextMaterialization(initialDelay);

Review Comment:
   Let's introduce a way to disable the materialization (flag).
   
   I am not a fan of using `periodicMaterializationDelay` very long to disable 
materialization. It is implicit and may lead to error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] curcur commented on a diff in pull request #20585: [WIP][FLINK-28976][state/changelog] Don't add extra delay to the 1st materialization

2022-08-15 Thread GitBox


curcur commented on code in PR #20585:
URL: https://github.com/apache/flink/pull/20585#discussion_r946315050


##
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java:
##
@@ -118,7 +118,7 @@ public void start() {
 
 LOG.info("Task {} starts periodic materialization", subtaskName);
 
-scheduleNextMaterialization(periodicMaterializeDelay + 
initialDelay);
+scheduleNextMaterialization(initialDelay);

Review Comment:
   I think we should have some way to disable the materialization (flag).
   
   I am not a fan of using `periodicMaterializationDelay` very long to disable 
materialization. It is implicit and may lead error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-28130) FLIP-224: Blocklist Mechanism

2022-08-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-28130.
---
Resolution: Done

> FLIP-224: Blocklist Mechanism
> -
>
> Key: FLINK-28130
> URL: https://issues.apache.org/jira/browse/FLINK-28130
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Affects Versions: 1.16.0
>Reporter: Lijie Wang
>Assignee: Lijie Wang
>Priority: Major
> Fix For: 1.16.0
>
>
> In order to support speculative execution for batch 
> jobs([FLIP-168|https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job]),
>  we need a mechanism to block resources on nodes where the slow tasks are 
> located. We propose to introduce a blocklist mechanism as follows:  Once a 
> node is marked as blocked, future slots should not be allocated from the 
> blocked node, but the slots that are already allocated will not be affected.
> More details see 
> [FLIP-224|https://cwiki.apache.org/confluence/display/FLINK/FLIP-224%3A+Blocklist+Mechanism]
> This is the umbrella ticket to track all the changes of this feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28587) FLIP-249: Flink Web UI Enhancement for Speculative Execution

2022-08-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-28587.
---
Fix Version/s: 1.16.0
   Resolution: Done

> FLIP-249: Flink Web UI Enhancement for Speculative Execution
> 
>
> Key: FLINK-28587
> URL: https://issues.apache.org/jira/browse/FLINK-28587
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / REST, Runtime / Web Frontend
>Affects Versions: 1.16.0
>Reporter: Gen Luo
>Assignee: Gen Luo
>Priority: Major
> Fix For: 1.16.0
>
>
> As a follow-up step of FLIP-168 and FLIP-224, the Flink Web UI needs to be 
> enhanced to display the related information if the speculative execution 
> mechanism is enabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28397) [FLIP-245] Source Supports Speculative Execution For Batch Job

2022-08-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-28397.
---
Resolution: Done

> [FLIP-245] Source Supports Speculative Execution For Batch Job
> --
>
> Key: FLINK-28397
> URL: https://issues.apache.org/jira/browse/FLINK-28397
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Affects Versions: 1.16.0
>Reporter: Jing Zhang
>Assignee: Jing Zhang
>Priority: Major
> Fix For: 1.16.0
>
>
> This is the umbrella ticket of 
> [FLIP-245|https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job].
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] SteNicholas commented on pull request #266: [FLINK-28967] Fix Hive multi-version support for Table Store

2022-08-15 Thread GitBox


SteNicholas commented on PR #266:
URL: 
https://github.com/apache/flink-table-store/pull/266#issuecomment-1216113060

   @tsreaper @JingsongLi , does the NOTICE file of this module need to be 
updated?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-28981) Release Testing: Verify FLIP-245 sources speculative execution

2022-08-15 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-28981:
---

 Summary: Release Testing: Verify FLIP-245 sources speculative 
execution
 Key: FLINK-28981
 URL: https://issues.apache.org/jira/browse/FLINK-28981
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Common, Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.16.0


Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
 - FLIP-168: Speculative Execution core part
 - FLIP-224: Blocklist Mechanism
 - FLIP-245: Source Supports Speculative Execution
 - FLIP-249: Flink Web UI Enhancement for Speculative Execution

This ticket aims for verifying FLIP-245, along with FLIP-168, FLIP-224 and 
FLIP-249.

More details about this feature and how to use it can be found in this 
documentation [PR|https://github.com/apache/flink/pull/20507].

To do the verification, the process can be:
 - Write Flink jobs which has some {{source}} subtasks running much slower than 
others. 3 kinds of sources should be verified, including
   - [Source 
functions|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java]
   - [InputFormat 
sources|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java]
   - [FLIP-27 new 
sources|https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java]
 - Modify Flink configuration file to enable speculative execution and tune the 
configuration as you like
 - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution

2022-08-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-28980:

Description: 
Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
 - FLIP-168: Speculative Execution core part
 - FLIP-224: Blocklist Mechanism
 - FLIP-245: Source Supports Speculative Execution
 - FLIP-249: Flink Web UI Enhancement for Speculative Execution

This ticket aims for verifying FLIP-168, along with FLIP-224 and FLIP-249.

More details about this feature and how to use it can be found in this 
documentation [PR|https://github.com/apache/flink/pull/20507].

To do the verification, the process can be:
 - Write a Flink job which has a subtask running much slower than others (e.g. 
sleep indefinitely if it runs on a certain host, the hostname can be retrieved 
via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + 
attemptNumer) % 2 == 0)
 - Modify Flink configuration file to enable speculative execution and tune the 
configuration as you like
 - Submit the job. Checking the web UI, logs, metrics and produced result.

  was:
Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
 - FLIP-168: Speculative Execution core part
 - FLIP-224: Blocklist Mechanism
 - FLIP-245: Source Supports Speculative Execution
 - FLIP-249: Flink Web UI Enhancement for Speculative Execution

This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249.

More details about this feature and how to use it can be found in this 
documentation [PR|https://github.com/apache/flink/pull/20507].

To do the verification, the process can be:
 - Write a Flink job which has a subtask running much slower than others (e.g. 
sleep indefinitely if it runs on a certain host, the hostname can be retrieved 
via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + 
attemptNumer) % 2 == 0)
 - Modify Flink configuration file to enable speculative execution and tune the 
configuration as you like
 - Submit the job. Checking the web UI, logs, metrics and produced result.


> Release Testing: Verify FLIP-168 speculative execution
> --
>
> Key: FLINK-28980
> URL: https://issues.apache.org/jira/browse/FLINK-28980
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Priority: Major
> Fix For: 1.16.0
>
>
> Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
> tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
>  - FLIP-168: Speculative Execution core part
>  - FLIP-224: Blocklist Mechanism
>  - FLIP-245: Source Supports Speculative Execution
>  - FLIP-249: Flink Web UI Enhancement for Speculative Execution
> This ticket aims for verifying FLIP-168, along with FLIP-224 and FLIP-249.
> More details about this feature and how to use it can be found in this 
> documentation [PR|https://github.com/apache/flink/pull/20507].
> To do the verification, the process can be:
>  - Write a Flink job which has a subtask running much slower than others 
> (e.g. sleep indefinitely if it runs on a certain host, the hostname can be 
> retrieved via InetAddress.getLocalHost().getHostName(), or if its 
> (subtaskIndex + attemptNumer) % 2 == 0)
>  - Modify Flink configuration file to enable speculative execution and tune 
> the configuration as you like
>  - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28920) Release Testing: Verify Python DataStream Window

2022-08-15 Thread Huang Xingbo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580026#comment-17580026
 ] 

Huang Xingbo commented on FLINK-28920:
--

Thanks [~ana4] for the test. I have assigned it to you. If you encountered any 
problem, feel free to contact me.

> Release Testing: Verify Python DataStream Window
> 
>
> Key: FLINK-28920
> URL: https://issues.apache.org/jira/browse/FLINK-28920
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: Luning Wang
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> * Build flink source code and compile source code
> {code:bash}
> $ cd {flink-source-code}
> $ mvn clean install -DskipTests
> {code}
> * Prepare a Python Virtual Environment
> {code:bash}
> $ cd flink-python/dev
> $ ./lint-python.sh -s basic
> $ source .conda/bin/activate
> {code}
> * Install PyFlink from source code. For more details, you can refer to the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
> {code:bash}
> $ cd flink-python/apache-flink-libraries
> $ python setup.py sdist
> $ pip install dist/*.tar.gz
> $ cd ..
> $ pip install -r dev/dev-requirements.txt
> $ python setpy.py sdist
> $ pip install dist/*.tar.gz
> {code}
> h1. Test
> * Write a python datastream window job  in thread mode. For details of 
> Window, you can refer to the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/].
> {code:python}
> from typing import Tuple
> from pyflink.common import Configuration
> from pyflink.common.time import Time
> from pyflink.common.typeinfo import Types
> from pyflink.common.watermark_strategy import WatermarkStrategy, 
> TimestampAssigner
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.functions import AggregateFunction
> from pyflink.datastream.window import EventTimeSessionWindows
> class SecondColumnTimestampAssigner(TimestampAssigner):
> def extract_timestamp(self, value, record_timestamp) -> int:
> return int(value[1])
> def main():
> config = Configuration()
> # thread mode
> config.set_string("python.execution-mode", "thread")
> # process mode
> # config.set_string("python.execution-mode", "process")
> env = StreamExecutionEnvironment.get_execution_environment(config)
> data_stream = env.from_collection([
> ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 
> 15)],
> type_info=Types.TUPLE([Types.STRING(), Types.INT()]))  # type: 
> DataStream
> watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> class MyAggregateFunction(AggregateFunction):
> def create_accumulator(self) -> Tuple[int, str]:
> return 0, ''
> def add(self, value: Tuple[str, int], accumulator: Tuple[int, str]) 
> -> Tuple[int, str]:
> return value[1] + accumulator[0], value[0]
> def get_result(self, accumulator: Tuple[str, int]):
> return accumulator[1], accumulator[0]
> def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]):
> return acc_a[0] + acc_b[0], acc_a[1]
> ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
> .key_by(lambda x: x[0], key_type=Types.STRING()) \
> .window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
> .aggregate(MyAggregateFunction(),
>accumulator_type=Types.TUPLE([Types.INT(), 
> Types.STRING()]),
>output_type=Types.TUPLE([Types.STRING(), Types.INT()]))
> ds.print()
> env.execute('test_window_aggregate_accumulator_type')
> if __name__ == '__main__':
> main()
> {code}
> * run the python datastream window job and watch the result
> {code:bash}
> $ python demo.py
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28918) Release Testing: Verify FLIP-206 in Python DataStream API

2022-08-15 Thread Huang Xingbo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580027#comment-17580027
 ] 

Huang Xingbo commented on FLINK-28918:
--

Thanks [~ana4] for the test. I have assigned it to you. If you encountered any 
problem, feel free to contact me.

> Release Testing: Verify FLIP-206 in Python DataStream API
> -
>
> Key: FLINK-28918
> URL: https://issues.apache.org/jira/browse/FLINK-28918
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: Luning Wang
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> * Build flink source code and compile source code
> {code:bash}
> $ cd {flink-source-code}
> $ mvn clean install -DskipTests
> {code}
> * Prepare a Python Virtual Environment
> {code:bash}
> $ cd flink-python/dev
> $ ./lint-python.sh -s basic
> $ source .conda/bin/activate
> {code}
> * Install PyFlink from source code. For more details, you can refer to the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
> {code:bash}
> $ cd flink-python/apache-flink-libraries
> $ python setup.py sdist
> $ pip install dist/*.tar.gz
> $ cd ..
> $ pip install -r dev/dev-requirements.txt
> $ python setpy.py sdist
> $ pip install dist/*.tar.gz
> {code}
> h1. Test
> * Write a python datastream job  in thread mode
> {code:python}
> from pyflink.common import Configuration
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> def main():
> config = Configuration()
> config.set_string("python.execution-mode", "thread")
> env = StreamExecutionEnvironment.get_execution_environment(config)
> ds = env.from_collection(
> [(1, '9', 0), (1, '5', 1), (1, '6', 2), (5, '5', 0), (5, '3', 1)],
> type_info=Types.ROW_NAMED(["v1", "v2", "v3"],
>   [Types.INT(), Types.STRING(), Types.INT()]))
> def flat_map_func1(data):
> for i in data:
> yield int(i), 1
> def flat_map_func2(data):
> for i in data:
> yield i
> ds = ds.key_by(lambda x: x[0]) \
> .min_by("v2") \
> .map(lambda x: (x[0], x[1], x[2]),
> output_type=Types.TUPLE([Types.INT(), Types.STRING(), 
> Types.INT()])) \
> .key_by(lambda x: x[2]) \
> .max_by(0) \
> .flat_map(flat_map_func1, output_type=Types.TUPLE([Types.INT(), 
> Types.INT()])) \
> .key_by(lambda x: [1]) \
> .min_by() \
> .flat_map(flat_map_func2, output_type=Types.INT()) \
> .key_by(lambda x: x) \
> .max_by()
> ds.print()
> env.execute("key_by_min_by_max_by_test_batch")
> if __name__ == '__main__':
> main()
> {code}
> * run the python datastream job and watch the result
> {code:bash}
> $ python demo.py
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28920) Release Testing: Verify Python DataStream Window

2022-08-15 Thread Huang Xingbo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo reassigned FLINK-28920:


Assignee: Luning Wang

> Release Testing: Verify Python DataStream Window
> 
>
> Key: FLINK-28920
> URL: https://issues.apache.org/jira/browse/FLINK-28920
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: Luning Wang
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> * Build flink source code and compile source code
> {code:bash}
> $ cd {flink-source-code}
> $ mvn clean install -DskipTests
> {code}
> * Prepare a Python Virtual Environment
> {code:bash}
> $ cd flink-python/dev
> $ ./lint-python.sh -s basic
> $ source .conda/bin/activate
> {code}
> * Install PyFlink from source code. For more details, you can refer to the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
> {code:bash}
> $ cd flink-python/apache-flink-libraries
> $ python setup.py sdist
> $ pip install dist/*.tar.gz
> $ cd ..
> $ pip install -r dev/dev-requirements.txt
> $ python setpy.py sdist
> $ pip install dist/*.tar.gz
> {code}
> h1. Test
> * Write a python datastream window job  in thread mode. For details of 
> Window, you can refer to the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/].
> {code:python}
> from typing import Tuple
> from pyflink.common import Configuration
> from pyflink.common.time import Time
> from pyflink.common.typeinfo import Types
> from pyflink.common.watermark_strategy import WatermarkStrategy, 
> TimestampAssigner
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.functions import AggregateFunction
> from pyflink.datastream.window import EventTimeSessionWindows
> class SecondColumnTimestampAssigner(TimestampAssigner):
> def extract_timestamp(self, value, record_timestamp) -> int:
> return int(value[1])
> def main():
> config = Configuration()
> # thread mode
> config.set_string("python.execution-mode", "thread")
> # process mode
> # config.set_string("python.execution-mode", "process")
> env = StreamExecutionEnvironment.get_execution_environment(config)
> data_stream = env.from_collection([
> ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 
> 15)],
> type_info=Types.TUPLE([Types.STRING(), Types.INT()]))  # type: 
> DataStream
> watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> class MyAggregateFunction(AggregateFunction):
> def create_accumulator(self) -> Tuple[int, str]:
> return 0, ''
> def add(self, value: Tuple[str, int], accumulator: Tuple[int, str]) 
> -> Tuple[int, str]:
> return value[1] + accumulator[0], value[0]
> def get_result(self, accumulator: Tuple[str, int]):
> return accumulator[1], accumulator[0]
> def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]):
> return acc_a[0] + acc_b[0], acc_a[1]
> ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
> .key_by(lambda x: x[0], key_type=Types.STRING()) \
> .window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
> .aggregate(MyAggregateFunction(),
>accumulator_type=Types.TUPLE([Types.INT(), 
> Types.STRING()]),
>output_type=Types.TUPLE([Types.STRING(), Types.INT()]))
> ds.print()
> env.execute('test_window_aggregate_accumulator_type')
> if __name__ == '__main__':
> main()
> {code}
> * run the python datastream window job and watch the result
> {code:bash}
> $ python demo.py
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28918) Release Testing: Verify FLIP-206 in Python DataStream API

2022-08-15 Thread Huang Xingbo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo reassigned FLINK-28918:


Assignee: Luning Wang

> Release Testing: Verify FLIP-206 in Python DataStream API
> -
>
> Key: FLINK-28918
> URL: https://issues.apache.org/jira/browse/FLINK-28918
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: Luning Wang
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> * Build flink source code and compile source code
> {code:bash}
> $ cd {flink-source-code}
> $ mvn clean install -DskipTests
> {code}
> * Prepare a Python Virtual Environment
> {code:bash}
> $ cd flink-python/dev
> $ ./lint-python.sh -s basic
> $ source .conda/bin/activate
> {code}
> * Install PyFlink from source code. For more details, you can refer to the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
> {code:bash}
> $ cd flink-python/apache-flink-libraries
> $ python setup.py sdist
> $ pip install dist/*.tar.gz
> $ cd ..
> $ pip install -r dev/dev-requirements.txt
> $ python setpy.py sdist
> $ pip install dist/*.tar.gz
> {code}
> h1. Test
> * Write a python datastream job  in thread mode
> {code:python}
> from pyflink.common import Configuration
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> def main():
> config = Configuration()
> config.set_string("python.execution-mode", "thread")
> env = StreamExecutionEnvironment.get_execution_environment(config)
> ds = env.from_collection(
> [(1, '9', 0), (1, '5', 1), (1, '6', 2), (5, '5', 0), (5, '3', 1)],
> type_info=Types.ROW_NAMED(["v1", "v2", "v3"],
>   [Types.INT(), Types.STRING(), Types.INT()]))
> def flat_map_func1(data):
> for i in data:
> yield int(i), 1
> def flat_map_func2(data):
> for i in data:
> yield i
> ds = ds.key_by(lambda x: x[0]) \
> .min_by("v2") \
> .map(lambda x: (x[0], x[1], x[2]),
> output_type=Types.TUPLE([Types.INT(), Types.STRING(), 
> Types.INT()])) \
> .key_by(lambda x: x[2]) \
> .max_by(0) \
> .flat_map(flat_map_func1, output_type=Types.TUPLE([Types.INT(), 
> Types.INT()])) \
> .key_by(lambda x: [1]) \
> .min_by() \
> .flat_map(flat_map_func2, output_type=Types.INT()) \
> .key_by(lambda x: x) \
> .max_by()
> ds.print()
> env.execute("key_by_min_by_max_by_test_batch")
> if __name__ == '__main__':
> main()
> {code}
> * run the python datastream job and watch the result
> {code:bash}
> $ python demo.py
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution

2022-08-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-28980:

Description: 
Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
 - FLIP-168: Speculative Execution core part
 - FLIP-224: Blocklist Mechanism
 - FLIP-245: Source Supports Speculative Execution
 - FLIP-249: Flink Web UI Enhancement for Speculative Execution

This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249.

More details about this feature and how to use it can be found in this 
documentation [PR|https://github.com/apache/flink/pull/20507].

To do the verification, the process can be:
 - Write a Flink job which has a subtask running much slower than others (e.g. 
sleep indefinitely if it runs on a certain host, the hostname can be retrieved 
via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + 
attemptNumer) % 2 == 0)
 - Modify Flink configuration file to enable speculative execution and tune the 
configuration as you like
 - Submit the job. Checking the web UI, logs, metrics and produced result.

  was:
Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
 - FLIP-168: Speculative Execution core part
 - FLIP-224: Blocklist Mechanism
 - FLIP-245: Source Supports Speculative Execution
 - FLIP-249: Flink Web UI Enhancement for Speculative Execution
This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249.

More details about this feature and how to use it can be found in this 
documentation [PR|https://github.com/apache/flink/pull/20507].

To do the verification, the process can be:
 - Write a Flink job which has a subtask running much slower than others (e.g. 
sleep indefinitely if it runs on a certain host, the hostname can be retrieved 
via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + 
attemptNumer) % 2 == 0)
 - Modify Flink configuration file to enable speculative execution and tune the 
configuration as you like
 - Submit the job. Checking the web UI, logs, metrics and produced result.


> Release Testing: Verify FLIP-168 speculative execution
> --
>
> Key: FLINK-28980
> URL: https://issues.apache.org/jira/browse/FLINK-28980
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Priority: Major
> Fix For: 1.16.0
>
>
> Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
> tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
>  - FLIP-168: Speculative Execution core part
>  - FLIP-224: Blocklist Mechanism
>  - FLIP-245: Source Supports Speculative Execution
>  - FLIP-249: Flink Web UI Enhancement for Speculative Execution
> This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249.
> More details about this feature and how to use it can be found in this 
> documentation [PR|https://github.com/apache/flink/pull/20507].
> To do the verification, the process can be:
>  - Write a Flink job which has a subtask running much slower than others 
> (e.g. sleep indefinitely if it runs on a certain host, the hostname can be 
> retrieved via InetAddress.getLocalHost().getHostName(), or if its 
> (subtaskIndex + attemptNumer) % 2 == 0)
>  - Modify Flink configuration file to enable speculative execution and tune 
> the configuration as you like
>  - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution

2022-08-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-28980:

Description: 
Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
 - FLIP-168: Speculative Execution core part
 - FLIP-224: Blocklist Mechanism
 - FLIP-245: Source Supports Speculative Execution
 - FLIP-249: Flink Web UI Enhancement for Speculative Execution
This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249.

More details about this feature and how to use it can be found in this 
documentation [PR|https://github.com/apache/flink/pull/20507].

To do the verification, the process can be:
 - Write a Flink job which has a subtask running much slower than others (e.g. 
sleep indefinitely if it runs on a certain host, the hostname can be retrieved 
via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + 
attemptNumer) % 2 == 0)
 - Modify Flink configuration file to enable speculative execution and tune the 
configuration as you like
 - Submit the job. Checking the web UI, logs, metrics and produced result.

  was:
Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
tasks caused by slow nodes.
This feature currently consists of 4 FLIPs:
 - FLIP-168: Speculative Execution core part
 - FLIP-224: Blocklist Mechanism
 - FLIP-245: Source Supports Speculative Execution
 - FLIP-249: Flink Web UI Enhancement for Speculative Execution

This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249.

More details about this feature and how to use it can be found in this 
documentation [PR|https://github.com/apache/flink/pull/20507].

To do the verification, the process can be:
 - Write a Flink job which has a subtask running much slower than others (e.g. 
sleep indefinitely if it runs on a certain host, the hostname can be retrieved 
via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + 
attemptNumer) % 2 == 0)
 - Modify Flink configuration file to enable speculative execution and tune the 
configuration as you like
 - Submit the job. Checking the web UI, logs, metrics and produced result.


> Release Testing: Verify FLIP-168 speculative execution
> --
>
> Key: FLINK-28980
> URL: https://issues.apache.org/jira/browse/FLINK-28980
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Priority: Major
> Fix For: 1.16.0
>
>
> Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
> tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
>  - FLIP-168: Speculative Execution core part
>  - FLIP-224: Blocklist Mechanism
>  - FLIP-245: Source Supports Speculative Execution
>  - FLIP-249: Flink Web UI Enhancement for Speculative Execution
> This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249.
> More details about this feature and how to use it can be found in this 
> documentation [PR|https://github.com/apache/flink/pull/20507].
> To do the verification, the process can be:
>  - Write a Flink job which has a subtask running much slower than others 
> (e.g. sleep indefinitely if it runs on a certain host, the hostname can be 
> retrieved via InetAddress.getLocalHost().getHostName(), or if its 
> (subtaskIndex + attemptNumer) % 2 == 0)
>  - Modify Flink configuration file to enable speculative execution and tune 
> the configuration as you like
>  - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution

2022-08-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-28980:

Description: 
Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
tasks caused by slow nodes.
This feature currently consists of 4 FLIPs:
 - FLIP-168: Speculative Execution core part
 - FLIP-224: Blocklist Mechanism
 - FLIP-245: Source Supports Speculative Execution
 - FLIP-249: Flink Web UI Enhancement for Speculative Execution

This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249.

More details about this feature and how to use it can be found in this 
documentation [PR|https://github.com/apache/flink/pull/20507].

To do the verification, the process can be:
 - Write a Flink job which has a subtask running much slower than others (e.g. 
sleep indefinitely if it runs on a certain host, the hostname can be retrieved 
via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + 
attemptNumer) % 2 == 0)
 - Modify Flink configuration file to enable speculative execution and tune the 
configuration as you like
 - Submit the job. Checking the web UI, logs, metrics and produced result.

  was:
Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
tasks caused by slow nodes. More details about this feature can be found in 
this documentation [PR|https://github.com/apache/flink/pull/20507].

This feature currently consists of 4 FLIPs:
 - FLIP-168: Speculative Execution core part
 - FLIP-224: Blocklist Mechanism
 - FLIP-245: Source Supports Speculative Execution
 - FLIP-249: Flink Web UI Enhancement for Speculative Execution

This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249.

To do the verification, the process can be:
 - Write a Flink job which has a subtask running much slower than others (e.g. 
sleep indefinitely if it runs on a certain host, the hostname can be retrieved 
via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + 
attemptNumer) % 2 == 0)
 - Modify Flink configuration file to enable speculative execution and tune the 
configuration as you like
 - Submit the job. Checking the web UI, logs, metrics and produced result.


> Release Testing: Verify FLIP-168 speculative execution
> --
>
> Key: FLINK-28980
> URL: https://issues.apache.org/jira/browse/FLINK-28980
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Priority: Major
> Fix For: 1.16.0
>
>
> Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
> tasks caused by slow nodes.
> This feature currently consists of 4 FLIPs:
>  - FLIP-168: Speculative Execution core part
>  - FLIP-224: Blocklist Mechanism
>  - FLIP-245: Source Supports Speculative Execution
>  - FLIP-249: Flink Web UI Enhancement for Speculative Execution
> This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249.
> More details about this feature and how to use it can be found in this 
> documentation [PR|https://github.com/apache/flink/pull/20507].
> To do the verification, the process can be:
>  - Write a Flink job which has a subtask running much slower than others 
> (e.g. sleep indefinitely if it runs on a certain host, the hostname can be 
> retrieved via InetAddress.getLocalHost().getHostName(), or if its 
> (subtaskIndex + attemptNumer) % 2 == 0)
>  - Modify Flink configuration file to enable speculative execution and tune 
> the configuration as you like
>  - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28920) Release Testing: Verify Python DataStream Window

2022-08-15 Thread Luning Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580025#comment-17580025
 ] 

Luning Wang commented on FLINK-28920:
-

I will test this on M1 Chip.

> Release Testing: Verify Python DataStream Window
> 
>
> Key: FLINK-28920
> URL: https://issues.apache.org/jira/browse/FLINK-28920
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> * Build flink source code and compile source code
> {code:bash}
> $ cd {flink-source-code}
> $ mvn clean install -DskipTests
> {code}
> * Prepare a Python Virtual Environment
> {code:bash}
> $ cd flink-python/dev
> $ ./lint-python.sh -s basic
> $ source .conda/bin/activate
> {code}
> * Install PyFlink from source code. For more details, you can refer to the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
> {code:bash}
> $ cd flink-python/apache-flink-libraries
> $ python setup.py sdist
> $ pip install dist/*.tar.gz
> $ cd ..
> $ pip install -r dev/dev-requirements.txt
> $ python setpy.py sdist
> $ pip install dist/*.tar.gz
> {code}
> h1. Test
> * Write a python datastream window job  in thread mode. For details of 
> Window, you can refer to the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/].
> {code:python}
> from typing import Tuple
> from pyflink.common import Configuration
> from pyflink.common.time import Time
> from pyflink.common.typeinfo import Types
> from pyflink.common.watermark_strategy import WatermarkStrategy, 
> TimestampAssigner
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.functions import AggregateFunction
> from pyflink.datastream.window import EventTimeSessionWindows
> class SecondColumnTimestampAssigner(TimestampAssigner):
> def extract_timestamp(self, value, record_timestamp) -> int:
> return int(value[1])
> def main():
> config = Configuration()
> # thread mode
> config.set_string("python.execution-mode", "thread")
> # process mode
> # config.set_string("python.execution-mode", "process")
> env = StreamExecutionEnvironment.get_execution_environment(config)
> data_stream = env.from_collection([
> ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 
> 15)],
> type_info=Types.TUPLE([Types.STRING(), Types.INT()]))  # type: 
> DataStream
> watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> class MyAggregateFunction(AggregateFunction):
> def create_accumulator(self) -> Tuple[int, str]:
> return 0, ''
> def add(self, value: Tuple[str, int], accumulator: Tuple[int, str]) 
> -> Tuple[int, str]:
> return value[1] + accumulator[0], value[0]
> def get_result(self, accumulator: Tuple[str, int]):
> return accumulator[1], accumulator[0]
> def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]):
> return acc_a[0] + acc_b[0], acc_a[1]
> ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
> .key_by(lambda x: x[0], key_type=Types.STRING()) \
> .window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
> .aggregate(MyAggregateFunction(),
>accumulator_type=Types.TUPLE([Types.INT(), 
> Types.STRING()]),
>output_type=Types.TUPLE([Types.STRING(), Types.INT()]))
> ds.print()
> env.execute('test_window_aggregate_accumulator_type')
> if __name__ == '__main__':
> main()
> {code}
> * run the python datastream window job and watch the result
> {code:bash}
> $ python demo.py
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28918) Release Testing: Verify FLIP-206 in Python DataStream API

2022-08-15 Thread Luning Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580024#comment-17580024
 ] 

Luning Wang commented on FLINK-28918:
-

I will test this on M1 Chip.

> Release Testing: Verify FLIP-206 in Python DataStream API
> -
>
> Key: FLINK-28918
> URL: https://issues.apache.org/jira/browse/FLINK-28918
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> * Build flink source code and compile source code
> {code:bash}
> $ cd {flink-source-code}
> $ mvn clean install -DskipTests
> {code}
> * Prepare a Python Virtual Environment
> {code:bash}
> $ cd flink-python/dev
> $ ./lint-python.sh -s basic
> $ source .conda/bin/activate
> {code}
> * Install PyFlink from source code. For more details, you can refer to the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
> {code:bash}
> $ cd flink-python/apache-flink-libraries
> $ python setup.py sdist
> $ pip install dist/*.tar.gz
> $ cd ..
> $ pip install -r dev/dev-requirements.txt
> $ python setpy.py sdist
> $ pip install dist/*.tar.gz
> {code}
> h1. Test
> * Write a python datastream job  in thread mode
> {code:python}
> from pyflink.common import Configuration
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> def main():
> config = Configuration()
> config.set_string("python.execution-mode", "thread")
> env = StreamExecutionEnvironment.get_execution_environment(config)
> ds = env.from_collection(
> [(1, '9', 0), (1, '5', 1), (1, '6', 2), (5, '5', 0), (5, '3', 1)],
> type_info=Types.ROW_NAMED(["v1", "v2", "v3"],
>   [Types.INT(), Types.STRING(), Types.INT()]))
> def flat_map_func1(data):
> for i in data:
> yield int(i), 1
> def flat_map_func2(data):
> for i in data:
> yield i
> ds = ds.key_by(lambda x: x[0]) \
> .min_by("v2") \
> .map(lambda x: (x[0], x[1], x[2]),
> output_type=Types.TUPLE([Types.INT(), Types.STRING(), 
> Types.INT()])) \
> .key_by(lambda x: x[2]) \
> .max_by(0) \
> .flat_map(flat_map_func1, output_type=Types.TUPLE([Types.INT(), 
> Types.INT()])) \
> .key_by(lambda x: [1]) \
> .min_by() \
> .flat_map(flat_map_func2, output_type=Types.INT()) \
> .key_by(lambda x: x) \
> .max_by()
> ds.print()
> env.execute("key_by_min_by_max_by_test_batch")
> if __name__ == '__main__':
> main()
> {code}
> * run the python datastream job and watch the result
> {code:bash}
> $ python demo.py
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution

2022-08-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-28980:

Description: 
Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
tasks caused by slow nodes. More details about this feature can be found in 
this documentation [PR|https://github.com/apache/flink/pull/20507].

This feature currently consists of 4 FLIPs:
 - FLIP-168: Speculative Execution core part
 - FLIP-224: Blocklist Mechanism
 - FLIP-245: Source Supports Speculative Execution
 - FLIP-249: Flink Web UI Enhancement for Speculative Execution

This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249.


To do the verification, the process can be:
 - Write a Flink job which has a subtask running much slower than others (e.g. 
sleep indefinitely if it runs on a certain host, the hostname can be retrieved 
via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + 
attemptNumer) % 2 == 0)
 - Modify Flink configuration file to enable speculative execution and tune the 
configuration as you like
 - Submit the job. Checking the web UI, logs, metrics and produced result.

  was:
Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
tasks caused by slow nodes. More details about this feature can be found in 
this documentation [PR|https://github.com/apache/flink/pull/20507].

This feature currently consists of 4 FLIPs:
 - FLIP-168: Speculative Execution core part
 - FLIP-224: Blocklist Mechanism
 - FLIP-245: Source Supports Speculative Execution
 - FLIP-249: Flink Web UI Enhancement for Speculative Execution

This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249.

To do the verification, the process can be:
 - Write a Flink job which has a subtask running much slower than others (e.g. 
sleep indefinitely if it runs on a certain host, the hostname can be retrieved 
via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + 
attemptNumer) % 2 == 0)
 - Modify Flink configuration file to enable speculative execution and tune the 
configuration as you like
 - Submit the job. Checking the web UI, logs, metrics and produced result.


> Release Testing: Verify FLIP-168 speculative execution
> --
>
> Key: FLINK-28980
> URL: https://issues.apache.org/jira/browse/FLINK-28980
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Priority: Major
> Fix For: 1.16.0
>
>
> Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
> tasks caused by slow nodes. More details about this feature can be found in 
> this documentation [PR|https://github.com/apache/flink/pull/20507].
> This feature currently consists of 4 FLIPs:
>  - FLIP-168: Speculative Execution core part
>  - FLIP-224: Blocklist Mechanism
>  - FLIP-245: Source Supports Speculative Execution
>  - FLIP-249: Flink Web UI Enhancement for Speculative Execution
> This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249.
> To do the verification, the process can be:
>  - Write a Flink job which has a subtask running much slower than others 
> (e.g. sleep indefinitely if it runs on a certain host, the hostname can be 
> retrieved via InetAddress.getLocalHost().getHostName(), or if its 
> (subtaskIndex + attemptNumer) % 2 == 0)
>  - Modify Flink configuration file to enable speculative execution and tune 
> the configuration as you like
>  - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution

2022-08-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-28980:

Description: 
Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
tasks caused by slow nodes. More details about this feature can be found in 
this documentation [PR|https://github.com/apache/flink/pull/20507].

This feature currently consists of 4 FLIPs:
 - FLIP-168: Speculative Execution core part
 - FLIP-224: Blocklist Mechanism
 - FLIP-245: Source Supports Speculative Execution
 - FLIP-249: Flink Web UI Enhancement for Speculative Execution

This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249.

To do the verification, the process can be:
 - Write a Flink job which has a subtask running much slower than others (e.g. 
sleep indefinitely if it runs on a certain host, the hostname can be retrieved 
via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + 
attemptNumer) % 2 == 0)
 - Modify Flink configuration file to enable speculative execution and tune the 
configuration as you like
 - Submit the job. Checking the web UI, logs, metrics and produced result.

  was:
Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
tasks caused by slow nodes. More details about this feature can be found in 
this documentation [PR|https://github.com/apache/flink/pull/20507].

This feature currently consists of 4 FLIPs:
 - FLIP-168: Speculative Execution core part
 - FLIP-224: Blocklist Mechanism
 - FLIP-245: Source Supports Speculative Execution
 - FLIP-249: Flink Web UI Enhancement for Speculative Execution

This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249.


To do the verification, the process can be:
 - Write a Flink job which has a subtask running much slower than others (e.g. 
sleep indefinitely if it runs on a certain host, the hostname can be retrieved 
via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + 
attemptNumer) % 2 == 0)
 - Modify Flink configuration file to enable speculative execution and tune the 
configuration as you like
 - Submit the job. Checking the web UI, logs, metrics and produced result.


> Release Testing: Verify FLIP-168 speculative execution
> --
>
> Key: FLINK-28980
> URL: https://issues.apache.org/jira/browse/FLINK-28980
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Priority: Major
> Fix For: 1.16.0
>
>
> Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
> tasks caused by slow nodes. More details about this feature can be found in 
> this documentation [PR|https://github.com/apache/flink/pull/20507].
> This feature currently consists of 4 FLIPs:
>  - FLIP-168: Speculative Execution core part
>  - FLIP-224: Blocklist Mechanism
>  - FLIP-245: Source Supports Speculative Execution
>  - FLIP-249: Flink Web UI Enhancement for Speculative Execution
> This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249.
> To do the verification, the process can be:
>  - Write a Flink job which has a subtask running much slower than others 
> (e.g. sleep indefinitely if it runs on a certain host, the hostname can be 
> retrieved via InetAddress.getLocalHost().getHostName(), or if its 
> (subtaskIndex + attemptNumer) % 2 == 0)
>  - Modify Flink configuration file to enable speculative execution and tune 
> the configuration as you like
>  - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution

2022-08-15 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-28980:

Description: 
Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
tasks caused by slow nodes. More details about this feature can be found in 
this documentation [PR|https://github.com/apache/flink/pull/20507].

This feature currently consists of 4 FLIPs:
 - FLIP-168: Speculative Execution core part
 - FLIP-224: Blocklist Mechanism
 - FLIP-245: Source Supports Speculative Execution
 - FLIP-249: Flink Web UI Enhancement for Speculative Execution

This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249.

To do the verification, the process can be:
 - Write a Flink job which has a subtask running much slower than others (e.g. 
sleep indefinitely if it runs on a certain host, the hostname can be retrieved 
via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + 
attemptNumer) % 2 == 0)
 - Modify Flink configuration file to enable speculative execution and tune the 
configuration as you like
 - Submit the job. Checking the web UI, logs, metrics and produced result.

  was:
Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
tasks caused by slow nodes. More details about this feature can be found in 
this documentation [PR|https://github.com/apache/flink/pull/20507].

This feature currently consists of 4 FLIPs:
 - FLIP-168: Speculative Execution core part
 - FLIP-224: Blocklist Mechanism
 - FLIP-245: Source Supports Speculative Execution
 - FLIP-249: Flink Web UI Enhancement for Speculative Execution

This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249.
To do the verification, the process can be:
 - Write a Flink job which has a subtask running much slower than others (e.g. 
sleep indefinitely if it runs on a certain host, the hostname can be retrieved 
via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + 
attemptNumer) % 2 == 0)
 - Modify Flink configuration file to enable speculative execution and tune the 
configuration as you like
 - Submit the job. Checking the web UI, logs, metrics and produced result.


> Release Testing: Verify FLIP-168 speculative execution
> --
>
> Key: FLINK-28980
> URL: https://issues.apache.org/jira/browse/FLINK-28980
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Priority: Major
> Fix For: 1.16.0
>
>
> Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
> tasks caused by slow nodes. More details about this feature can be found in 
> this documentation [PR|https://github.com/apache/flink/pull/20507].
> This feature currently consists of 4 FLIPs:
>  - FLIP-168: Speculative Execution core part
>  - FLIP-224: Blocklist Mechanism
>  - FLIP-245: Source Supports Speculative Execution
>  - FLIP-249: Flink Web UI Enhancement for Speculative Execution
> This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249.
> To do the verification, the process can be:
>  - Write a Flink job which has a subtask running much slower than others 
> (e.g. sleep indefinitely if it runs on a certain host, the hostname can be 
> retrieved via InetAddress.getLocalHost().getHostName(), or if its 
> (subtaskIndex + attemptNumer) % 2 == 0)
>  - Modify Flink configuration file to enable speculative execution and tune 
> the configuration as you like
>  - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution

2022-08-15 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-28980:
---

 Summary: Release Testing: Verify FLIP-168 speculative execution
 Key: FLINK-28980
 URL: https://issues.apache.org/jira/browse/FLINK-28980
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.16.0


Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
tasks caused by slow nodes. More details about this feature can be found in 
this documentation [PR|https://github.com/apache/flink/pull/20507].

This feature currently consists of 4 FLIPs:
 - FLIP-168: Speculative Execution core part
 - FLIP-224: Blocklist Mechanism
 - FLIP-245: Source Supports Speculative Execution
 - FLIP-249: Flink Web UI Enhancement for Speculative Execution

This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249.
To do the verification, the process can be:
 - Write a Flink job which has a subtask running much slower than others (e.g. 
sleep indefinitely if it runs on a certain host, the hostname can be retrieved 
via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + 
attemptNumer) % 2 == 0)
 - Modify Flink configuration file to enable speculative execution and tune the 
configuration as you like
 - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27535) Optimize the unit test execution time

2022-08-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-27535:
---
Labels: pull-request-available  (was: )

> Optimize the unit test execution time
> -
>
> Key: FLINK-27535
> URL: https://issues.apache.org/jira/browse/FLINK-27535
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Dong Lin
>Assignee: Yunfeng Zhou
>Priority: Critical
>  Labels: pull-request-available
>
> Currently `mvn package` takes 10 minutes to complete in Github actions. A lot 
> of time is spent in running unit tests for algorithms. For example, 
> LogisticRegressionTest takes 82 seconds and KMeansTest takes 43 seconds in 
> [1]. 
> This time appears to be more than expected. And it will considerably reduce 
> developer velocity if a developer needs to wait for hours to get test results 
> once we have 100+ algorithms in Flink ML.
> We should understand why it takes 82 seconds to run e.g. 
> LogisticRegressionTest and see if there is a way to optimize the test 
> execution time.
> [1] https://github.com/apache/flink-ml/runs/6319402103?check_suite_focus=true.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] yunfengzhou-hub opened a new pull request, #143: [FLINK-27535] Optimize the unit test execution time

2022-08-15 Thread GitBox


yunfengzhou-hub opened a new pull request, #143:
URL: https://github.com/apache/flink-ml/pull/143

   ## What is the purpose of the change
   
   This PR reduces the total execution time of Flink ML CIs so as to improve 
the efficiency of Flink ML development.
   
   This PR mainly focuses on optimizing the execution time of Java tests. 
Python tests might be further optimized in separate PRs.
   
   ## Brief change log
   - Make sure all flink-ml-lib tests create a static mini-cluster for all test 
cases in its class, reducing the overhead to start and shut down clusters
   - Add cache for maven and pip dependencies
   - Adjust algorithm parameters, like learning rate, to make the algorithms 
converge and finish faster in test cases
   - Disable unaligned checkpoints in test cases
   
   ## Does this pull request potentially affect one of the following parts:
   - Dependencies (does it add or upgrade a dependency): (yes)
   - Dependencies are added to `flink-ml-examples` to support running each 
module individually.
   - The public API, i.e., is any changed class annotated with 
@public(Evolving): (no)
   - Does this pull request introduce a new feature? (no)
   - If yes, how is the feature documented? (N/A)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-28926) Release Testing: Verify flip-235 hybrid shuffle mode

2022-08-15 Thread Huang Xingbo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo reassigned FLINK-28926:


Assignee: Yu Chen

> Release Testing: Verify flip-235 hybrid shuffle mode
> 
>
> Key: FLINK-28926
> URL: https://issues.apache.org/jira/browse/FLINK-28926
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Weijie Guo
>Assignee: Yu Chen
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> * Please refer to release note of  FLINK-27862 for a list of changes need to 
> be verified.
>  * Please refer to out document for more details 
> [https://nightlies.apache.org/flink/flink-docs-master/docs/ops/batch/batch_shuffle|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/batch/batch_shuffle/]
>  * Hybrid shuffle have some known limitations: No support for Slot Sharing, 
> Adaptive Batch Scheduler and Speculative Execution. Please make sure you do 
> not using this features in testing.
>  * The changes should be verified only in batch execution mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28926) Release Testing: Verify flip-235 hybrid shuffle mode

2022-08-15 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580019#comment-17580019
 ] 

Weijie Guo commented on FLINK-28926:


[~Yu Chen] Thank you for your interest. Yes, you can take over the test. It 
should be noted that there are still some known problems that have not been 
completely fixed. For example, compression is not supported at present. If you 
encounter problems in the test, feel free to contact me~

> Release Testing: Verify flip-235 hybrid shuffle mode
> 
>
> Key: FLINK-28926
> URL: https://issues.apache.org/jira/browse/FLINK-28926
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Weijie Guo
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> * Please refer to release note of  FLINK-27862 for a list of changes need to 
> be verified.
>  * Please refer to out document for more details 
> [https://nightlies.apache.org/flink/flink-docs-master/docs/ops/batch/batch_shuffle|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/batch/batch_shuffle/]
>  * Hybrid shuffle have some known limitations: No support for Slot Sharing, 
> Adaptive Batch Scheduler and Speculative Execution. Please make sure you do 
> not using this features in testing.
>  * The changes should be verified only in batch execution mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28915) Flink Native k8s mode jar localtion support s3 schema

2022-08-15 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580018#comment-17580018
 ] 

Yang Wang commented on FLINK-28915:
---

Thanks [~aitozi] and [~hjw] for the volunteering. I will assign this ticket to 
[~hjw] and I believe we could work together.

> Flink Native k8s mode jar localtion support s3 schema 
> --
>
> Key: FLINK-28915
> URL: https://issues.apache.org/jira/browse/FLINK-28915
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes, flink-contrib
>Affects Versions: 1.15.0, 1.15.1
>Reporter: hjw
>Priority: Major
>
> As the Flink document show , local is the only supported scheme in Native k8s 
> deployment.
> Is there have a plan to support s3 filesystem? thx.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28915) Flink Native k8s mode jar localtion support s3 schema

2022-08-15 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang reassigned FLINK-28915:
-

Assignee: hjw

> Flink Native k8s mode jar localtion support s3 schema 
> --
>
> Key: FLINK-28915
> URL: https://issues.apache.org/jira/browse/FLINK-28915
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes, flink-contrib
>Affects Versions: 1.15.0, 1.15.1
>Reporter: hjw
>Assignee: hjw
>Priority: Major
>
> As the Flink document show , local is the only supported scheme in Native k8s 
> deployment.
> Is there have a plan to support s3 filesystem? thx.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #20589: [FLINK-28972][python][connector/pulsar] Align Start/StopCursor method to java

2022-08-15 Thread GitBox


flinkbot commented on PR #20589:
URL: https://github.com/apache/flink/pull/20589#issuecomment-1216086074

   
   ## CI report:
   
   * c053812eb15ef64282bc24a8d735fe0a693bac00 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #139: [FLINK-28803] Add Transformer and Estimator for KBinsDiscretizer

2022-08-15 Thread GitBox


yunfengzhou-hub commented on code in PR #139:
URL: https://github.com/apache/flink-ml/pull/139#discussion_r946290131


##
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/KBinsDiscretizerTest.java:
##
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.kbinsdiscretizer.KBinsDiscretizer;
+import org.apache.flink.ml.feature.kbinsdiscretizer.KBinsDiscretizerModel;
+import org.apache.flink.ml.feature.kbinsdiscretizer.KBinsDiscretizerModelData;
+import org.apache.flink.ml.feature.kbinsdiscretizer.KBinsDiscretizerParams;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.ml.util.TestUtils;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/** Tests {@link KBinsDiscretizer} and {@link KBinsDiscretizerModel}. */
+public class KBinsDiscretizerTest extends AbstractTestBase {
+@Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+private StreamExecutionEnvironment env;
+private StreamTableEnvironment tEnv;
+private Table trainTable;
+private Table testTable;
+
+// Column0 for normal cases, column1 for constant cases, column2 for 
numDistinct < numBins
+// cases.
+private static final List TRAIN_INPUT =
+Arrays.asList(
+Row.of(Vectors.dense(1, 10, 0)),
+Row.of(Vectors.dense(1, 10, 0)),
+Row.of(Vectors.dense(1, 10, 0)),
+Row.of(Vectors.dense(4, 10, 0)),
+Row.of(Vectors.dense(5, 10, 0)),
+Row.of(Vectors.dense(6, 10, 0)),
+Row.of(Vectors.dense(7, 10, 0)),
+Row.of(Vectors.dense(10, 10, 0)),
+Row.of(Vectors.dense(13, 10, 3)));
+
+private static final List TEST_INPUT =
+Arrays.asList(
+Row.of(Vectors.dense(-1, 0, 0)),
+Row.of(Vectors.dense(1, 1, 1)),
+Row.of(Vectors.dense(1.5, 1, 2)),
+Row.of(Vectors.dense(5, 2, 3)),
+Row.of(Vectors.dense(7.25, 3, 4)),
+Row.of(Vectors.dense(13, 4, 5)),
+Row.of(Vectors.dense(15, 4, 6)));
+
+private static final double[][] UNIFORM_MODEL_DATA =
+new double[][] {
+new double[] {1, 5, 9, 13},
+new double[] {Double.MIN_VALUE, Double.MAX_VALUE},
+new double[] {0, 1, 2, 3}
+};
+
+private static final List UNIFORM_OUTPUT =
+Arrays.asList(
+Row.of(Vectors.dense(0, 0, 0)),
+Row.of(Vectors.dense(0, 0, 1)),
+Row.of(Vectors.dense(0, 0, 2)),
+Row.of(Vectors.dense(1, 0, 2)),
+Row.of(Vectors.dense(1, 0, 2)),
+Row.of(Vectors.dense(2, 0, 2)),
+Row.of(Vectors.dense(2, 0, 2)));
+
+private static final List QUANTILE_OUTPUT =
+Arrays.asList(
+Row.of(Vectors.dense(0, 0, 0)),
+Row.of(Vectors.dense(0, 0, 0)),
+R

[jira] [Updated] (FLINK-28979) Add another owner into the JM deployment's owner references

2022-08-15 Thread Xin Hao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xin Hao updated FLINK-28979:

Description: 
This action will link the resources into the FlinkDeployment

The JM deployment will look like
{code:java}
ownerReferences:  
- apiVersion: apps/v1
  blockOwnerDeletion: true
  controller: true
  kind: Deployment
  name: xxx
- apiVersion: flink.apache.org/v1beta1   
  blockOwnerDeletion: true 
  controller: false 
  kind: FlinkDeployment 
  name: xxx{code}

  was:
This action will link the resources into the FlinkDeployment

Will look like
{code:java}
ownerReferences:  
- apiVersion: apps/v1
  blockOwnerDeletion: true
  controller: true
  kind: Deployment
  name: xxx
- apiVersion: flink.apache.org/v1beta1   
  blockOwnerDeletion: true 
  controller: false 
  kind: FlinkDeployment 
  name: xxx{code}


> Add another owner into the JM deployment's owner references
> ---
>
> Key: FLINK-28979
> URL: https://issues.apache.org/jira/browse/FLINK-28979
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Xin Hao
>Priority: Minor
>
> This action will link the resources into the FlinkDeployment
> The JM deployment will look like
> {code:java}
> ownerReferences:  
> - apiVersion: apps/v1
>   blockOwnerDeletion: true
>   controller: true
>   kind: Deployment
>   name: xxx
> - apiVersion: flink.apache.org/v1beta1   
>   blockOwnerDeletion: true 
>   controller: false 
>   kind: FlinkDeployment 
>   name: xxx{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28979) Add another owner into the JM deployment's owner references

2022-08-15 Thread Xin Hao (Jira)
Xin Hao created FLINK-28979:
---

 Summary: Add another owner into the JM deployment's owner 
references
 Key: FLINK-28979
 URL: https://issues.apache.org/jira/browse/FLINK-28979
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Xin Hao


This action will link the resources into the FlinkDeployment

Will look like
{code:java}
ownerReferences:  
- apiVersion: apps/v1
  blockOwnerDeletion: true
  controller: true
  kind: Deployment
  name: xxx
- apiVersion: flink.apache.org/v1beta1   
  blockOwnerDeletion: true 
  controller: false 
  kind: FlinkDeployment 
  name: xxx{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28972) Add methods of StartCursor and StopCursor to align the Java

2022-08-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-28972:
---
Labels: pull-request-available  (was: )

> Add methods of StartCursor and StopCursor to align the Java
> ---
>
> Key: FLINK-28972
> URL: https://issues.apache.org/jira/browse/FLINK-28972
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Connectors / Pulsar
>Affects Versions: 1.14.5, 1.15.1, 1.16.0
>Reporter: Luning Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0, 1.15.2, 1.14.6
>
>
> Add fromPublishTime in the StartCursor class
> Add afterEventTime and afterPublishTime in the StopCursor class



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] deadwind4 opened a new pull request, #20589: [FLINK-28972][python][connector/pulsar] Align Start/StopCursor method to java

2022-08-15 Thread GitBox


deadwind4 opened a new pull request, #20589:
URL: https://github.com/apache/flink/pull/20589

   ## What is the purpose of the change
   
   Add fromPublishTime in the StartCursor class
   Add afterEventTime and afterPublishTime in the StopCursor class
   
   ## Brief change log
   
 - *Add from_publish_time in the StartCursor class*
 - *Add after_event_time and after_publish_time in the StopCursor class*
 - *Add EN and CN docs*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added StopCursor event_time and publist_time test case*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   4   >