[jira] [Commented] (FLINK-22593) SavepointITCase.testShouldAddEntropyToSavepointPath unstable

2021-06-16 Thread Robert Metzger (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364699#comment-17364699
 ] 

Robert Metzger commented on FLINK-22593:


https://dev.azure.com/rmetzger/Flink/_build/results?buildId=9136=logs=0a15d512-44ac-5ba5-97ab-13a5d066c22c=634cd701-c189-5dff-24cb-606ed884db87

> SavepointITCase.testShouldAddEntropyToSavepointPath unstable
> 
>
> Key: FLINK-22593
> URL: https://issues.apache.org/jira/browse/FLINK-22593
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.0
>Reporter: Robert Metzger
>Assignee: Anton Kalashnikov
>Priority: Blocker
>  Labels: pull-request-available, stale-blocker, stale-critical, 
> test-stability
> Fix For: 1.14.0
>
>
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=9072=logs=cc649950-03e9-5fae-8326-2f1ad744b536=51cab6ca-669f-5dc0-221d-1e4f7dc4fc85
> {code}
> 2021-05-07T10:56:20.9429367Z May 07 10:56:20 [ERROR] Tests run: 13, Failures: 
> 0, Errors: 1, Skipped: 0, Time elapsed: 33.441 s <<< FAILURE! - in 
> org.apache.flink.test.checkpointing.SavepointITCase
> 2021-05-07T10:56:20.9445862Z May 07 10:56:20 [ERROR] 
> testShouldAddEntropyToSavepointPath(org.apache.flink.test.checkpointing.SavepointITCase)
>   Time elapsed: 2.083 s  <<< ERROR!
> 2021-05-07T10:56:20.9447106Z May 07 10:56:20 
> java.util.concurrent.ExecutionException: 
> java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint 
> triggering task Sink: Unnamed (3/4) of job 4e155a20f0a7895043661a6446caf1cb 
> has not being executed at the moment. Aborting checkpoint. Failure reason: 
> Not all required tasks are currently running.
> 2021-05-07T10:56:20.9448194Z May 07 10:56:20  at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 2021-05-07T10:56:20.9448797Z May 07 10:56:20  at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 2021-05-07T10:56:20.9449428Z May 07 10:56:20  at 
> org.apache.flink.test.checkpointing.SavepointITCase.submitJobAndTakeSavepoint(SavepointITCase.java:305)
> 2021-05-07T10:56:20.9450160Z May 07 10:56:20  at 
> org.apache.flink.test.checkpointing.SavepointITCase.testShouldAddEntropyToSavepointPath(SavepointITCase.java:273)
> 2021-05-07T10:56:20.9450785Z May 07 10:56:20  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-05-07T10:56:20.9451331Z May 07 10:56:20  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-05-07T10:56:20.9451940Z May 07 10:56:20  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-05-07T10:56:20.9452498Z May 07 10:56:20  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-05-07T10:56:20.9453247Z May 07 10:56:20  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2021-05-07T10:56:20.9454007Z May 07 10:56:20  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-05-07T10:56:20.9454687Z May 07 10:56:20  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2021-05-07T10:56:20.9455302Z May 07 10:56:20  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-05-07T10:56:20.9455909Z May 07 10:56:20  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2021-05-07T10:56:20.9456493Z May 07 10:56:20  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2021-05-07T10:56:20.9457074Z May 07 10:56:20  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2021-05-07T10:56:20.9457636Z May 07 10:56:20  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> 2021-05-07T10:56:20.9458157Z May 07 10:56:20  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2021-05-07T10:56:20.9458678Z May 07 10:56:20  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2021-05-07T10:56:20.9459252Z May 07 10:56:20  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2021-05-07T10:56:20.9459865Z May 07 10:56:20  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2021-05-07T10:56:20.9460433Z May 07 10:56:20  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2021-05-07T10:56:20.9461058Z May 07 10:56:20  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2021-05-07T10:56:20.9461607Z May 07 10:56:20  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2021-05-07T10:56:20.9462159Z May 07 10:56:20  at 
> 

[jira] [Comment Edited] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread lynn1.zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364691#comment-17364691
 ] 

lynn1.zhang edited comment on FLINK-22994 at 6/17/21, 4:42 AM:
---

 
{code:java}
public static void main(String[] args) {
final int totalTestCount = 100 * 1;
System.out.println(BinaryStringData.fromString(""));
System.out.println(
"Total test count:"
+ totalTestCount
+ ", The spend with MapMapConverter "
+ testWithMapMapConverter(totalTestCount)
+ " ms.");
System.out.println(
"Total test count:"
+ totalTestCount
+ ", The spend without Converter "
+ testWithOutMapMapConverter(totalTestCount)
+ " ms.");
}{code}
Amazing code, Thank you for your advice. 

I think create another GeneratedExpression that extend GeneratedExpression is a 
good idea. 


was (Author: zicat):
public static void main(String[] args) {final int totalTestCount = 100 * 
1;System.out.println(BinaryStringData.fromString(""));
System.out.println("Total test count:"+ 
totalTestCount
+ ", The spend with MapMapConverter "+ 
testWithMapMapConverter(totalTestCount)
+ " ms.");System.out.println("Total test 
count:"+ totalTestCount
+ ", The spend without Converter "+ 
testWithOutMapMapConverter(totalTestCount)
+ " ms.");
}
Amazing code, Thank you for your advice. 

I think create another GeneratedExpression that extend GeneratedExpression is a 
good idea. 

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Assignee: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be toInternalOrNull and toExternal.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread lynn1.zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364691#comment-17364691
 ] 

lynn1.zhang commented on FLINK-22994:
-

public static void main(String[] args) {final int totalTestCount = 100 * 
1;System.out.println(BinaryStringData.fromString(""));
System.out.println("Total test count:"+ 
totalTestCount
+ ", The spend with MapMapConverter "+ 
testWithMapMapConverter(totalTestCount)
+ " ms.");System.out.println("Total test 
count:"+ totalTestCount
+ ", The spend without Converter "+ 
testWithOutMapMapConverter(totalTestCount)
+ " ms.");
}
Amazing code, Thank you for your advice. 

I think create another GeneratedExpression that extend GeneratedExpression is a 
good idea. 

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Assignee: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be toInternalOrNull and toExternal.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364676#comment-17364676
 ] 

Jingsong Lee edited comment on FLINK-22994 at 6/17/21, 4:27 AM:


[~zicat] I didn't say the conversion is no cost, its cost is very low. 
(Compared with other costs, conversion is low, but the other costs in your test 
are too low)

Your testing [^StringConverterTest.java] is inaccurate. I just modify it like 
this:
{code:java}
public static void main(String[] args) {
final int totalTestCount = 100 * 1;
System.out.println(BinaryStringData.fromString(""));
System.out.println(
"Total test count:"
+ totalTestCount
+ ", The spend with MapMapConverter "
+ testWithMapMapConverter(totalTestCount)
+ " ms.");
System.out.println(
"Total test count:"
+ totalTestCount
+ ", The spend without Converter "
+ testWithOutMapMapConverter(totalTestCount)
+ " ms.");
}
{code}
The result is different.

Load BinaryStringData and its related classes to the classloader. This is very 
important.


was (Author: lzljs3620320):
[~zicat] I didn't say the conversion is no cost, its cost is very low.

Your testing [^StringConverterTest.java] is inaccurate. I just modify it like 
this:
{code:java}
public static void main(String[] args) {
final int totalTestCount = 100 * 1;
System.out.println(BinaryStringData.fromString(""));
System.out.println(
"Total test count:"
+ totalTestCount
+ ", The spend with MapMapConverter "
+ testWithMapMapConverter(totalTestCount)
+ " ms.");
System.out.println(
"Total test count:"
+ totalTestCount
+ ", The spend without Converter "
+ testWithOutMapMapConverter(totalTestCount)
+ " ms.");
}
{code}
The result is different.

Load BinaryStringData and its related classes to the classloader. This is very 
important.

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Assignee: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be toInternalOrNull and toExternal.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364676#comment-17364676
 ] 

Jingsong Lee edited comment on FLINK-22994 at 6/17/21, 4:24 AM:


[~zicat] I didn't say the conversion is no cost, its cost is very low.

Your testing [^StringConverterTest.java] is inaccurate. I just modify it like 
this:
{code:java}
public static void main(String[] args) {
final int totalTestCount = 100 * 1;
System.out.println(BinaryStringData.fromString(""));
System.out.println(
"Total test count:"
+ totalTestCount
+ ", The spend with MapMapConverter "
+ testWithMapMapConverter(totalTestCount)
+ " ms.");
System.out.println(
"Total test count:"
+ totalTestCount
+ ", The spend without Converter "
+ testWithOutMapMapConverter(totalTestCount)
+ " ms.");
}
{code}
The result is different.

Load BinaryStringData and its related classes to the classloader. This is very 
important.


was (Author: lzljs3620320):
[~zicat] I didn't say the conversion is no cost, its cost is very low. Your 
testing [^StringConverterTest.java] is meaningless, its other cost is so low.

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Assignee: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be toInternalOrNull and toExternal.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364676#comment-17364676
 ] 

Jingsong Lee commented on FLINK-22994:
--

[~zicat] I didn't say the conversion is no cost, its cost is very low. Your 
testing [^StringConverterTest.java] is meaningless, its other cost is so low.

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Assignee: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be toInternalOrNull and toExternal.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364674#comment-17364674
 ] 

Jingsong Lee commented on FLINK-22994:
--

BTW, please update the description which is misleading.

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Assignee: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be toInternalOrNull and toExternal.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-22994:


Assignee: lynn1.zhang

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Assignee: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be toInternalOrNull and toExternal.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364673#comment-17364673
 ] 

Jingsong Lee commented on FLINK-22994:
--

[~zicat] I think this is a good improvement, but we should do clean adjustment 
to the code.

First, a type may have multiple external classes, so external class should be 
consistent with DataType.conversionClass, if you provide some external result 
term, the expression should provide DataType instead of LogicalType.

Second, we should not modify GeneratedExpression directly, this will make the 
code more messy. For example, we can introduce a new class:
{code:java}
case class ExternalGeneratedExpression(
dateType: DataType,
override val nullTerm: String,
internalTerm: String,
externalTerm: String,
internalCode: String,
externalCode: String,
internalLiteral: Option[Any] = None,
externalLiteral: Option[Any] = None) extends
GeneratedExpression(
  internalTerm,
  nullTerm,
  internalCode,
  dateType.getLogicalType,
  internalLiteral)
{code}
The UDF can return a ExternalGeneratedExpression, then nested UDF can 
understand that its argument is a ExternalGeneratedExpression which can provide 
the same external class(in the DataType) as what it wants.

What do you think?

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be toInternalOrNull and toExternal.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22980) FileExecutionGraphInfoStoreTest hangs on azure

2021-06-16 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364672#comment-17364672
 ] 

Xintong Song commented on FLINK-22980:
--

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=19037=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=7030a106-e977-5851-a05e-535de648c9c9=9407

> FileExecutionGraphInfoStoreTest hangs on azure
> --
>
> Key: FLINK-22980
> URL: https://issues.apache.org/jira/browse/FLINK-22980
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.13.1
>Reporter: Dawid Wysakowicz
>Assignee: Fabian Paul
>Priority: Blocker
>  Labels: test-stability
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Description: 
h1. BackGround

Flink maintain the udf result as BinaryData, like BinaryStringData. When 
invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
int_ip_2_string(ip) will be toInternalOrNull and toExternal.

Below is the Generated Code

!image-2021-06-15-15-18-12-619.png!   This issue will improve it as below

!image-2021-06-15-15-19-01-103.png!
h1. Performance Compare

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput without udf invoke: 764.50 k/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s

!image-2021-06-15-15-42-08-065.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after this issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 

  was:
h1. BackGround

Flink maintain the udf result as BinaryData, like BinaryStringData. When 
invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
int_ip_2_string(ip) will be serialization and deserialization.

Below is the Generated Code

!image-2021-06-15-15-18-12-619.png!   This issue will improve it as below

!image-2021-06-15-15-19-01-103.png!
h1. Performance Compare

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput without udf invoke: 764.50 k/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s

!image-2021-06-15-15-42-08-065.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after this issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 


> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be toInternalOrNull and toExternal.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22662) YARNHighAvailabilityITCase.testKillYarnSessionClusterEntrypoint fail

2021-06-16 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-22662:
-
Labels: test-stability  (was: auto-deprioritized-critical test-stability)

> YARNHighAvailabilityITCase.testKillYarnSessionClusterEntrypoint fail
> 
>
> Key: FLINK-22662
> URL: https://issues.apache.org/jira/browse/FLINK-22662
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.13.0
>Reporter: Guowei Ma
>Priority: Major
>  Labels: test-stability
>
> {code:java}
> 2021-05-14T00:24:57.8487649Z May 14 00:24:57 [ERROR] 
> testKillYarnSessionClusterEntrypoint(org.apache.flink.yarn.YARNHighAvailabilityITCase)
>   Time elapsed: 34.667 s  <<< ERROR!
> 2021-05-14T00:24:57.8488567Z May 14 00:24:57 
> java.util.concurrent.ExecutionException: 
> 2021-05-14T00:24:57.8489301Z May 14 00:24:57 
> org.apache.flink.runtime.rest.util.RestClientException: 
> [org.apache.flink.runtime.rest.handler.RestHandlerException: 
> org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
> Flink job (610ed4b159ece04c8ee2ec40e7d0c143)
> 2021-05-14T00:24:57.8493142Z May 14 00:24:57  at 
> org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler.propagateException(JobExecutionResultHandler.java:94)
> 2021-05-14T00:24:57.8495823Z May 14 00:24:57  at 
> org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler.lambda$handleRequest$1(JobExecutionResultHandler.java:84)
> 2021-05-14T00:24:57.8496733Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
> 2021-05-14T00:24:57.8497640Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
> 2021-05-14T00:24:57.8498491Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2021-05-14T00:24:57.8499222Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
> 2021-05-14T00:24:57.853Z May 14 00:24:57  at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:234)
> 2021-05-14T00:24:57.8500872Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> 2021-05-14T00:24:57.8501702Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> 2021-05-14T00:24:57.8502662Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2021-05-14T00:24:57.8503472Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
> 2021-05-14T00:24:57.8504269Z May 14 00:24:57  at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1079)
> 2021-05-14T00:24:57.8504892Z May 14 00:24:57  at 
> akka.dispatch.OnComplete.internal(Future.scala:263)
> 2021-05-14T00:24:57.8505565Z May 14 00:24:57  at 
> akka.dispatch.OnComplete.internal(Future.scala:261)
> 2021-05-14T00:24:57.8506062Z May 14 00:24:57  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> 2021-05-14T00:24:57.8506819Z May 14 00:24:57  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> 2021-05-14T00:24:57.8507418Z May 14 00:24:57  at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> 2021-05-14T00:24:57.8508373Z May 14 00:24:57  at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
> 2021-05-14T00:24:57.8509144Z May 14 00:24:57  at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> 2021-05-14T00:24:57.8509972Z May 14 00:24:57  at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> 2021-05-14T00:24:57.8510675Z May 14 00:24:57  at 
> akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
> 2021-05-14T00:24:57.8511376Z May 14 00:24:57  at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
> 2021-05-14T00:24:57.851Z May 14 00:24:57  at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> 2021-05-14T00:24:57.8513090Z May 14 00:24:57  at 
> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
> 2021-05-14T00:24:57.8513835Z May 14 00:24:57  at 
> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
> 2021-05-14T00:24:57.8514576Z May 14 00:24:57  at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> 2021-05-14T00:24:57.8515344Z May 14 00:24:57  at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> 

[jira] [Commented] (FLINK-22662) YARNHighAvailabilityITCase.testKillYarnSessionClusterEntrypoint fail

2021-06-16 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364671#comment-17364671
 ] 

Xintong Song commented on FLINK-22662:
--

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=19037=logs=245e1f2e-ba5b-5570-d689-25ae21e5302f=e7f339b2-a7c3-57d9-00af-3712d4b15354=29524

> YARNHighAvailabilityITCase.testKillYarnSessionClusterEntrypoint fail
> 
>
> Key: FLINK-22662
> URL: https://issues.apache.org/jira/browse/FLINK-22662
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.13.0
>Reporter: Guowei Ma
>Priority: Major
>  Labels: test-stability
>
> {code:java}
> 2021-05-14T00:24:57.8487649Z May 14 00:24:57 [ERROR] 
> testKillYarnSessionClusterEntrypoint(org.apache.flink.yarn.YARNHighAvailabilityITCase)
>   Time elapsed: 34.667 s  <<< ERROR!
> 2021-05-14T00:24:57.8488567Z May 14 00:24:57 
> java.util.concurrent.ExecutionException: 
> 2021-05-14T00:24:57.8489301Z May 14 00:24:57 
> org.apache.flink.runtime.rest.util.RestClientException: 
> [org.apache.flink.runtime.rest.handler.RestHandlerException: 
> org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
> Flink job (610ed4b159ece04c8ee2ec40e7d0c143)
> 2021-05-14T00:24:57.8493142Z May 14 00:24:57  at 
> org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler.propagateException(JobExecutionResultHandler.java:94)
> 2021-05-14T00:24:57.8495823Z May 14 00:24:57  at 
> org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler.lambda$handleRequest$1(JobExecutionResultHandler.java:84)
> 2021-05-14T00:24:57.8496733Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
> 2021-05-14T00:24:57.8497640Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
> 2021-05-14T00:24:57.8498491Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2021-05-14T00:24:57.8499222Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
> 2021-05-14T00:24:57.853Z May 14 00:24:57  at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:234)
> 2021-05-14T00:24:57.8500872Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> 2021-05-14T00:24:57.8501702Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> 2021-05-14T00:24:57.8502662Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2021-05-14T00:24:57.8503472Z May 14 00:24:57  at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
> 2021-05-14T00:24:57.8504269Z May 14 00:24:57  at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1079)
> 2021-05-14T00:24:57.8504892Z May 14 00:24:57  at 
> akka.dispatch.OnComplete.internal(Future.scala:263)
> 2021-05-14T00:24:57.8505565Z May 14 00:24:57  at 
> akka.dispatch.OnComplete.internal(Future.scala:261)
> 2021-05-14T00:24:57.8506062Z May 14 00:24:57  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> 2021-05-14T00:24:57.8506819Z May 14 00:24:57  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> 2021-05-14T00:24:57.8507418Z May 14 00:24:57  at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> 2021-05-14T00:24:57.8508373Z May 14 00:24:57  at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
> 2021-05-14T00:24:57.8509144Z May 14 00:24:57  at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> 2021-05-14T00:24:57.8509972Z May 14 00:24:57  at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> 2021-05-14T00:24:57.8510675Z May 14 00:24:57  at 
> akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
> 2021-05-14T00:24:57.8511376Z May 14 00:24:57  at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
> 2021-05-14T00:24:57.851Z May 14 00:24:57  at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> 2021-05-14T00:24:57.8513090Z May 14 00:24:57  at 
> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
> 2021-05-14T00:24:57.8513835Z May 14 00:24:57  at 
> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
> 2021-05-14T00:24:57.8514576Z May 14 00:24:57  at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> 

[jira] [Created] (FLINK-23015) Implement streaming window Deduplicate operator

2021-06-16 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23015:
--

 Summary: Implement streaming window Deduplicate operator
 Key: FLINK-23015
 URL: https://issues.apache.org/jira/browse/FLINK-23015
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Reporter: JING ZHANG






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23014) Support streaming window Deduplicate in planner

2021-06-16 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23014:
--

 Summary: Support streaming window Deduplicate in planner
 Key: FLINK-23014
 URL: https://issues.apache.org/jira/browse/FLINK-23014
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: JING ZHANG






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364668#comment-17364668
 ] 

Jingsong Lee commented on FLINK-22994:
--

[~zicat] OK, I see... you gave a wrong example in the description...

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be serialization and deserialization.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22891) FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase fails on azure

2021-06-16 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-22891:
-
Affects Version/s: 1.13.1

> FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase fails on azure
> 
>
> Key: FLINK-22891
> URL: https://issues.apache.org/jira/browse/FLINK-22891
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.13.1
>Reporter: Dawid Wysakowicz
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18700=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=05b74a19-4ee4-5036-c46f-ada307df6cf0=8660
> {code}
> Jun 05 21:16:00 [ERROR] Tests run: 11, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 6.24 s <<< FAILURE! - in 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase
> Jun 05 21:16:00 [ERROR] 
> testResourceCanBeAllocatedForDifferentJobWithDeclarationBeforeSlotFree(org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase)
>   Time elapsed: 5.015 s  <<< ERROR!
> Jun 05 21:16:00 java.util.concurrent.TimeoutException
> Jun 05 21:16:00   at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
> Jun 05 21:16:00   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(FineGrainedSlotManagerTestBase.java:121)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase$4.lambda$new$4(AbstractFineGrainedSlotManagerITCase.java:374)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase$Context.runTest(FineGrainedSlotManagerTestBase.java:212)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase$4.(AbstractFineGrainedSlotManagerITCase.java:310)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.testResourceCanBeAllocatedForDifferentJobAfterFree(AbstractFineGrainedSlotManagerITCase.java:308)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.testResourceCanBeAllocatedForDifferentJobWithDeclarationBeforeSlotFree(AbstractFineGrainedSlotManagerITCase.java:262)
> Jun 05 21:16:00   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Jun 05 21:16:00   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Jun 05 21:16:00   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Jun 05 21:16:00   at java.lang.reflect.Method.invoke(Method.java:498)
> Jun 05 21:16:00   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Jun 05 21:16:00   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Jun 05 21:16:00   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Jun 05 21:16:00   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Jun 05 21:16:00   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Jun 05 21:16:00   at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Jun 05 21:16:00   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> Jun 05 21:16:00   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> Jun 05 21:16:00   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Jun 05 21:16:00   at 
> 

[jira] [Reopened] (FLINK-22891) FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase fails on azure

2021-06-16 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song reopened FLINK-22891:
--

> FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase fails on azure
> 
>
> Key: FLINK-22891
> URL: https://issues.apache.org/jira/browse/FLINK-22891
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0
>Reporter: Dawid Wysakowicz
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18700=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=05b74a19-4ee4-5036-c46f-ada307df6cf0=8660
> {code}
> Jun 05 21:16:00 [ERROR] Tests run: 11, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 6.24 s <<< FAILURE! - in 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase
> Jun 05 21:16:00 [ERROR] 
> testResourceCanBeAllocatedForDifferentJobWithDeclarationBeforeSlotFree(org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase)
>   Time elapsed: 5.015 s  <<< ERROR!
> Jun 05 21:16:00 java.util.concurrent.TimeoutException
> Jun 05 21:16:00   at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
> Jun 05 21:16:00   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(FineGrainedSlotManagerTestBase.java:121)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase$4.lambda$new$4(AbstractFineGrainedSlotManagerITCase.java:374)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase$Context.runTest(FineGrainedSlotManagerTestBase.java:212)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase$4.(AbstractFineGrainedSlotManagerITCase.java:310)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.testResourceCanBeAllocatedForDifferentJobAfterFree(AbstractFineGrainedSlotManagerITCase.java:308)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.testResourceCanBeAllocatedForDifferentJobWithDeclarationBeforeSlotFree(AbstractFineGrainedSlotManagerITCase.java:262)
> Jun 05 21:16:00   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Jun 05 21:16:00   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Jun 05 21:16:00   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Jun 05 21:16:00   at java.lang.reflect.Method.invoke(Method.java:498)
> Jun 05 21:16:00   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Jun 05 21:16:00   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Jun 05 21:16:00   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Jun 05 21:16:00   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Jun 05 21:16:00   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Jun 05 21:16:00   at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Jun 05 21:16:00   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> Jun 05 21:16:00   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> Jun 05 21:16:00   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Jun 05 21:16:00   at 
> 

[jira] [Commented] (FLINK-22891) FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase fails on azure

2021-06-16 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364667#comment-17364667
 ] 

Xintong Song commented on FLINK-22891:
--

New instance on the 1.13 branch.

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=19037=logs=d8d26c26-7ec2-5ed2-772e-7a1a1eb8317c=be5fb08e-1ad7-563c-4f1a-a97ad4ce4865=6837

> FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase fails on azure
> 
>
> Key: FLINK-22891
> URL: https://issues.apache.org/jira/browse/FLINK-22891
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0
>Reporter: Dawid Wysakowicz
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18700=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=05b74a19-4ee4-5036-c46f-ada307df6cf0=8660
> {code}
> Jun 05 21:16:00 [ERROR] Tests run: 11, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 6.24 s <<< FAILURE! - in 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase
> Jun 05 21:16:00 [ERROR] 
> testResourceCanBeAllocatedForDifferentJobWithDeclarationBeforeSlotFree(org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase)
>   Time elapsed: 5.015 s  <<< ERROR!
> Jun 05 21:16:00 java.util.concurrent.TimeoutException
> Jun 05 21:16:00   at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
> Jun 05 21:16:00   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(FineGrainedSlotManagerTestBase.java:121)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase$4.lambda$new$4(AbstractFineGrainedSlotManagerITCase.java:374)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase$Context.runTest(FineGrainedSlotManagerTestBase.java:212)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase$4.(AbstractFineGrainedSlotManagerITCase.java:310)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.testResourceCanBeAllocatedForDifferentJobAfterFree(AbstractFineGrainedSlotManagerITCase.java:308)
> Jun 05 21:16:00   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.testResourceCanBeAllocatedForDifferentJobWithDeclarationBeforeSlotFree(AbstractFineGrainedSlotManagerITCase.java:262)
> Jun 05 21:16:00   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Jun 05 21:16:00   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Jun 05 21:16:00   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Jun 05 21:16:00   at java.lang.reflect.Method.invoke(Method.java:498)
> Jun 05 21:16:00   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Jun 05 21:16:00   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Jun 05 21:16:00   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Jun 05 21:16:00   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Jun 05 21:16:00   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Jun 05 21:16:00   at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Jun 05 21:16:00   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> Jun 05 21:16:00   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> Jun 05 21:16:00   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> Jun 05 21:16:00   at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> Jun 05 21:16:00 

[jira] [Commented] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread lynn1.zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364666#comment-17364666
 ] 

lynn1.zhang commented on FLINK-22994:
-

[^StringConverterTest.java] I also test the StringStringConverter, The 
performance is not good.

Total test count:100, The spend with MapMapConverter 7243 ms.
Total test count:100, The spend without Converter 72 ms.

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be serialization and deserialization.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Attachment: StringConverterTest.java

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be serialization and deserialization.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22702) KafkaSourceITCase.testRedundantParallelism failed

2021-06-16 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364665#comment-17364665
 ] 

Xintong Song commented on FLINK-22702:
--

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=19037=logs=72d4811f-9f0d-5fd0-014a-0bc26b72b642=c1d93a6a-ba91-515d-3196-2ee8019fbda7=6563

> KafkaSourceITCase.testRedundantParallelism failed
> -
>
> Key: FLINK-22702
> URL: https://issues.apache.org/jira/browse/FLINK-22702
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.0, 1.12.3
>Reporter: Guowei Ma
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18107=logs=1fc6e7bf-633c-5081-c32a-9dea24b05730=80a658d1-f7f6-5d93-2758-53ac19fd5b19=6847
> {code:java}
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
> exception
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
>   at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:275)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67)
>   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:398)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   ... 1 more
> Caused by: java.lang.IllegalStateException: Consumer is not subscribed to any 
> topics or assigned any partitions
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1223)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
>   at 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)
>   ... 6 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23013) Introduce faker source connector

2021-06-16 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364660#comment-17364660
 ] 

Jingsong Lee commented on FLINK-23013:
--

[~jark] Thanks for the information.

@[~knaufk]

Do you think we can put it in Flink's project?

> Introduce faker source connector
> 
>
> Key: FLINK-23013
> URL: https://issues.apache.org/jira/browse/FLINK-23013
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Jingsong Lee
>Priority: Major
>
> We already have datagen connector.
> But sometimes, we need a more real datagen connector which can produce more 
> natural random records.
> We can integrate [https://github.com/DiUS/java-faker] and introduce a 
> built-in faker connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23013) Introduce faker source connector

2021-06-16 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364659#comment-17364659
 ] 

Jark Wu commented on FLINK-23013:
-

[~knaufk] already open sourced a flink faker connector: 
https://github.com/knaufk/flink-faker


> Introduce faker source connector
> 
>
> Key: FLINK-23013
> URL: https://issues.apache.org/jira/browse/FLINK-23013
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Jingsong Lee
>Priority: Major
>
> We already have datagen connector.
> But sometimes, we need a more real datagen connector which can produce more 
> natural random records.
> We can integrate [https://github.com/DiUS/java-faker] and introduce a 
> built-in faker connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread lynn1.zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364658#comment-17364658
 ] 

lynn1.zhang commented on FLINK-22994:
-

Hi [~lzljs3620320], ipip return the type of map, so the converter is the 
MapMapConverter not StringStringConverter.

[^Test.java] this is my benchmark case which can show the cost difference 
between with converter and without converter.


Total test count:100, The spend with MapMapConverter 12160 ms.
Total test count:100, The spend without Converter 52 ms. 

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: Test.java, image-2021-06-15-15-18-12-619.png, 
> image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, 
> image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, 
> image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png, 
> new_projection_code, old_projection_code, test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be serialization and deserialization.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread lynn1.zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:

Attachment: Test.java

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: Test.java, image-2021-06-15-15-18-12-619.png, 
> image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, 
> image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, 
> image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png, 
> new_projection_code, old_projection_code, test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be serialization and deserialization.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23013) Introduce faker source connector

2021-06-16 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-23013:


 Summary: Introduce faker source connector
 Key: FLINK-23013
 URL: https://issues.apache.org/jira/browse/FLINK-23013
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Reporter: Jingsong Lee


We already have datagen connector.

But sometimes, we need a more real datagen connector which can produce more 
natural random records.

We can integrate [https://github.com/DiUS/java-faker] and introduce a built-in 
faker connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22970) The documentation for `TO_TIMESTAMP` UDF has an incorrect description

2021-06-16 Thread Wei-Che Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364645#comment-17364645
 ] 

Wei-Che Wei commented on FLINK-22970:
-

Hi [~Leonard Xu]
Could you assign this jira to me and help to merge this PR if it looks good to 
you? Thank you.

> The documentation for `TO_TIMESTAMP` UDF has an incorrect description
> -
>
> Key: FLINK-22970
> URL: https://issues.apache.org/jira/browse/FLINK-22970
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Table SQL / API
>Reporter: Wei-Che Wei
>Priority: Minor
>  Labels: pull-request-available
>
> According to this ML discussion 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/confused-about-TO-TIMESTAMP-document-description-td44352.html]
> The description for `TO_TIMESTAMP` udf is not correct. It will use UTC+0 
> timezone instead of session timezone. We should fix this documentation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-23012) Add v1.13 docs link in "Pick Docs Version" for master branch

2021-06-16 Thread Wei-Che Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364644#comment-17364644
 ] 

Wei-Che Wei edited comment on FLINK-23012 at 6/17/21, 2:46 AM:
---

Hi [~jark]
I have created this PR [1] that also addressed this issue. would you mind to 
review it? thank you.

[1] https://github.com/apache/flink/pull/16162


was (Author: tonywei):
Hi [~jark]

> Add v1.13 docs link in "Pick Docs Version" for master branch
> 
>
> Key: FLINK-23012
> URL: https://issues.apache.org/jira/browse/FLINK-23012
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: Jark Wu
>Priority: Major
> Attachments: image-2021-06-17-10-26-29-125.png
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/
>  !image-2021-06-17-10-26-29-125.png|thumbnail! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23012) Add v1.13 docs link in "Pick Docs Version" for master branch

2021-06-16 Thread Wei-Che Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364644#comment-17364644
 ] 

Wei-Che Wei commented on FLINK-23012:
-

Hi [~jark]

> Add v1.13 docs link in "Pick Docs Version" for master branch
> 
>
> Key: FLINK-23012
> URL: https://issues.apache.org/jira/browse/FLINK-23012
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: Jark Wu
>Priority: Major
> Attachments: image-2021-06-17-10-26-29-125.png
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/
>  !image-2021-06-17-10-26-29-125.png|thumbnail! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22899) ValuesUpsertSinkFunction needs to use global upsert

2021-06-16 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-22899.

Resolution: Fixed

Fixed via:

master: 3a4036129865fcddb809335b3a55a949a75dc8ca

> ValuesUpsertSinkFunction needs to use global upsert
> ---
>
> Key: FLINK-22899
> URL: https://issues.apache.org/jira/browse/FLINK-22899
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> At present, each task does its own upsert. We need to simulate the external 
> connector and use the global map to do the upsert.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-16 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364640#comment-17364640
 ] 

Jingsong Lee commented on FLINK-22994:
--

[~zicat] I mean, the cost of the converter itself is very low. You can take a 
look to StringStringConverter.

I got the code difference. But what I want is benchmark case, which can show 
the cost difference between with converter and without converter.

> Improve the performance of invoking nesting udf
> ---
>
> Key: FLINK-22994
> URL: https://issues.apache.org/jira/browse/FLINK-22994
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.4
> Environment: h5.  
>Reporter: lynn1.zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2021-06-15-15-18-12-619.png, 
> image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, 
> image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, 
> image-2021-06-15-15-30-14-775.png, image-2021-06-15-15-42-08-065.png, 
> new_projection_code, old_projection_code, test.sql
>
>
> h1. BackGround
> Flink maintain the udf result as BinaryData, like BinaryStringData. When 
> invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
> int_ip_2_string(ip) will be serialization and deserialization.
> Below is the Generated Code
> !image-2021-06-15-15-18-12-619.png!   This issue will improve it as below
> !image-2021-06-15-15-19-01-103.png!
> h1. Performance Compare
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23012) Add v1.13 docs link in "Pick Docs Version" for master branch

2021-06-16 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-23012:

Description: 
https://ci.apache.org/projects/flink/flink-docs-master/

 !image-2021-06-17-10-26-29-125.png|thumbnail! 

  was: !image-2021-06-17-10-26-29-125.png|thumbnail! 


> Add v1.13 docs link in "Pick Docs Version" for master branch
> 
>
> Key: FLINK-23012
> URL: https://issues.apache.org/jira/browse/FLINK-23012
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: Jark Wu
>Priority: Major
> Attachments: image-2021-06-17-10-26-29-125.png
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/
>  !image-2021-06-17-10-26-29-125.png|thumbnail! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23012) Add v1.13 docs link in "Pick Docs Version" for master branch

2021-06-16 Thread Jark Wu (Jira)
Jark Wu created FLINK-23012:
---

 Summary: Add v1.13 docs link in "Pick Docs Version" for master 
branch
 Key: FLINK-23012
 URL: https://issues.apache.org/jira/browse/FLINK-23012
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.14.0
Reporter: Jark Wu
 Attachments: image-2021-06-17-10-26-29-125.png

 !image-2021-06-17-10-26-29-125.png|thumbnail! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22983) Support lazy evaluation of Python UDF in Flink SQL

2021-06-16 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-22983:
-
Component/s: (was: Table SQL / Runtime)
 API / Python

> Support lazy evaluation of Python UDF in Flink SQL
> --
>
> Key: FLINK-22983
> URL: https://issues.apache.org/jira/browse/FLINK-22983
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.13.1
>Reporter: Maciej Bryński
>Priority: Major
>
> Edited:
> Lazy evaluation of Python UDF in logical condition is not working. 
> Description in second comment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22983) Support lazy evaluation of Python UDF in Flink SQL

2021-06-16 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364638#comment-17364638
 ] 

Jingsong Lee commented on FLINK-22983:
--

[~maver1ck] Thanks for the explanation.

> Support lazy evaluation of Python UDF in Flink SQL
> --
>
> Key: FLINK-22983
> URL: https://issues.apache.org/jira/browse/FLINK-22983
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.1
>Reporter: Maciej Bryński
>Priority: Major
>
> Edited:
> Lazy evaluation of Python UDF in logical condition is not working. 
> Description in second comment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15455) Enable TCP connection reuse across multiple jobs.

2021-06-16 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao updated FLINK-15455:

Labels:   (was: stale-major)

> Enable TCP connection reuse across multiple jobs.
> -
>
> Key: FLINK-15455
> URL: https://issues.apache.org/jira/browse/FLINK-15455
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Major
>
> Currently, tcp connections can be only reuse by tasks residing in the same 
> TaskManager and consumes the same IntermediateResult. And after job finish or 
> failover, the TCP connections are closed and new connections must be setup 
> latter.
> As an improvement, we can make tcp connections a cluster level resource which 
> can be reused by multi jobs. The advantages are as follows:
>  # Reduce the number of TCP connections so we can save some resources.
>  # Reduce the overhead of connection setup and close so restarted jobs after 
> failover and latter jobs submitted to the same session cluster can reuse the 
> previous connections.
> We use Flink session cluster as a service for ad-hoc queries and the users 
> can produce some statistics or create some statements and reports at any 
> time. Most of the queries finish in 2s and we find tcp connection reuse help 
> a lot to reduce the average execution time which means more queries can be 
> processed using the same resource and time with even better user experience.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22980) FileExecutionGraphInfoStoreTest hangs on azure

2021-06-16 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364632#comment-17364632
 ] 

Xintong Song commented on FLINK-22980:
--

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=19035=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=7030a106-e977-5851-a05e-535de648c9c9=9049

> FileExecutionGraphInfoStoreTest hangs on azure
> --
>
> Key: FLINK-22980
> URL: https://issues.apache.org/jira/browse/FLINK-22980
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.13.1
>Reporter: Dawid Wysakowicz
>Assignee: Fabian Paul
>Priority: Blocker
>  Labels: test-stability
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership

2021-06-16 Thread Eduardo Winpenny Tejedor (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364607#comment-17364607
 ] 

Eduardo Winpenny Tejedor commented on FLINK-22483:
--

OK [~trohrmann] I see what you're saying.  That can be made to work for 
{{JobMaster}}'s invocation of {{CompletedCheckpointStore::recover}} but what 
about all other invocations? That method gets called from various other places, 
is the idea to change those too? If so, where is the 
{{CompletedCheckpointStore::recover}} call meant to be moved for those?

> Recover checkpoints when JobMaster gains leadership
> ---
>
> Key: FLINK-22483
> URL: https://issues.apache.org/jira/browse/FLINK-22483
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.13.0
>Reporter: Robert Metzger
>Priority: Critical
> Fix For: 1.14.0
>
>
> Recovering checkpoints (from the CompletedCheckpointStore) is a potentially 
> long-lasting/blocking operation, for example if the file system 
> implementation is retrying to connect to a unavailable storage backend.
> Currently, we are calling the CompletedCheckpointStore.recover() method from 
> the main thread of the JobManager, making it unresponsive to any RPC call 
> while the recover method is blocked:
> {code}
> 2021-04-02 20:33:31,384 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job XXX 
> switched from state RUNNING to RESTARTING.
> com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to 
> minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused 
> (Connection refused)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207)
>  ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153)
>  ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
>  ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
>  ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
>  ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
>  ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
>  ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?]
>   at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?]
>   at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) 
> ~[?:?]
>   at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) 
> ~[?:?]
>   at 
> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) 
> ~[?:?]
>   at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905)
>  ~[?:?]
>   at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
>   at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902)
>  ~[?:?]
>   at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887)
>  ~[?:?]
>   at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880)
>  ~[?:?]
>   at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819)
>  ~[?:?]
>   at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
>   at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818)
>  ~[?:?]
>   at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) 
> ~[?:1.8.0_282]
>   at XXX.recover(KubernetesHaCheckpointStore.java:69) 
> ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?]
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314)
>  

[jira] [Updated] (FLINK-17987) KafkaITCase.testStartFromGroupOffsets fails with SchemaException: Error reading field

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-17987:
---
Labels: stale-critical test-stability  (was: test-stability)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Critical but is unassigned and neither itself nor its Sub-Tasks have been 
updated for 7 days. I have gone ahead and marked it "stale-critical". If this 
ticket is critical, please either assign yourself or give an update. 
Afterwards, please remove the label or in 7 days the issue will be 
deprioritized.


> KafkaITCase.testStartFromGroupOffsets fails with SchemaException: Error 
> reading field
> -
>
> Key: FLINK-17987
> URL: https://issues.apache.org/jira/browse/FLINK-17987
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Tests
>Affects Versions: 1.12.0, 1.14.0
>Reporter: Robert Metzger
>Priority: Critical
>  Labels: stale-critical, test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=2276=logs=c5f0071e-1851-543e-9a45-9ac140befc32=684b1416-4c17-504e-d5ab-97ee44e08a20
> {code}
> 2020-05-27T13:05:24.5355101Z Test 
> testStartFromGroupOffsets(org.apache.flink.streaming.connectors.kafka.KafkaITCase)
>  failed with:
> 2020-05-27T13:05:24.5355935Z 
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
> 'api_versions': Error reading array of size 131084, only 12 bytes available
> 2020-05-27T13:05:24.5356501Z  at 
> org.apache.kafka.common.protocol.types.Schema.read(Schema.java:77)
> 2020-05-27T13:05:24.5356911Z  at 
> org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:308)
> 2020-05-27T13:05:24.5357350Z  at 
> org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:152)
> 2020-05-27T13:05:24.5357838Z  at 
> org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:687)
> 2020-05-27T13:05:24.5358333Z  at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:811)
> 2020-05-27T13:05:24.5358840Z  at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:544)
> 2020-05-27T13:05:24.5359297Z  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
> 2020-05-27T13:05:24.5359832Z  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
> 2020-05-27T13:05:24.5360659Z  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
> 2020-05-27T13:05:24.5361292Z  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161)
> 2020-05-27T13:05:24.5361885Z  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:245)
> 2020-05-27T13:05:24.5362454Z  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:657)
> 2020-05-27T13:05:24.5363089Z  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1425)
> 2020-05-27T13:05:24.5363558Z  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1384)
> 2020-05-27T13:05:24.5364130Z  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl$KafkaOffsetHandlerImpl.setCommittedOffset(KafkaTestEnvironmentImpl.java:444)
> 2020-05-27T13:05:24.5365027Z  at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runStartFromGroupOffsets(KafkaConsumerTestBase.java:554)
> 2020-05-27T13:05:24.5365596Z  at 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase.testStartFromGroupOffsets(KafkaITCase.java:158)
> 2020-05-27T13:05:24.5366035Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-05-27T13:05:24.5366425Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-05-27T13:05:24.5366871Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-05-27T13:05:24.5367285Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-05-27T13:05:24.5367675Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-05-27T13:05:24.5368142Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-05-27T13:05:24.5368655Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-05-27T13:05:24.5369103Z  at 
> 

[jira] [Updated] (FLINK-12046) Increase records per invocation for serializerAvro benchmark to stabilize it

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-12046:
---
  Labels: auto-deprioritized-major auto-unassigned  (was: auto-unassigned 
stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Increase records per invocation for serializerAvro benchmark to stabilize it
> 
>
> Key: FLINK-12046
> URL: https://issues.apache.org/jira/browse/FLINK-12046
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Tests
>Reporter: Yu Li
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> We have observed waved result of this benchmark and it turns out to be caused 
> by testing data is too small. To stabilize the case, we need to increase the 
> records per invocation for the benchmark. More details please see this [email 
> thread|https://s.apache.org/E1i7]
> To be specified, this change is for the 
> [flink-benchmark|https://github.com/dataArtisans/flink-benchmarks] project.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12242) Support partition related operations in GenericHiveMetastoreCatalog

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-12242:
---
  Labels: auto-deprioritized-major auto-unassigned  (was: auto-unassigned 
stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Support partition related operations in GenericHiveMetastoreCatalog
> ---
>
> Key: FLINK-12242
> URL: https://issues.apache.org/jira/browse/FLINK-12242
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Reporter: Bowen Li
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> Support partition related operations in GenericHiveMetastoreCatalog, which 
> implements ReadableWritableCatalog API



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22443) can not be execute an extreme long sql under batch mode

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-22443:
---

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as a 
Blocker but is unassigned and neither itself nor its Sub-Tasks have been 
updated for 1 days. I have gone ahead and marked it "stale-blocker". If this 
ticket is a Blocker, please either assign yourself or give an update. 
Afterwards, please remove the label or in 7 days the issue will be 
deprioritized.


> can not be execute an extreme long sql under batch mode
> ---
>
> Key: FLINK-22443
> URL: https://issues.apache.org/jira/browse/FLINK-22443
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.2
> Environment: execute command
>  
> {code:java}
> bin/sql-client.sh embedded -d conf/sql-client-batch.yaml 
> {code}
> content of conf/sql-client-batch.yaml
>  
> {code:java}
> catalogs:
> - name: bnpmphive
>   type: hive
>   hive-conf-dir: /home/gum/hive/conf
>   hive-version: 3.1.2
> execution:
>   planner: blink
>   type: batch
>   #type: streaming
>   result-mode: table
>   parallelism: 4
>   max-parallelism: 2000
>   current-catalog: bnpmphive
>   #current-database: snmpprobe 
> #configuration:
> #  table.sql-dialect: hivemodules:
>- name: core
>  type: core
>- name: myhive
>  type: hivedeployment:
>   # general cluster communication timeout in ms
>   response-timeout: 5000
>   # (optional) address from cluster to gateway
>   gateway-address: ""
>   # (optional) port from cluster to gateway
>   gateway-port: 0
> {code}
>  
>Reporter: macdoor615
>Priority: Blocker
>  Labels: stale-blocker, stale-critical
> Attachments: flink-gum-taskexecutor-8-hb3-prod-hadoop-002.log.4.zip, 
> raw_p_restapi_hcd.csv.zip
>
>
> 1. execute an extreme long sql under batch mode
>  
> {code:java}
> select
> 'CD' product_name,
> r.code business_platform,
> 5 statisticperiod,
> cast('2021-03-24 00:00:00' as timestamp) coltime,
> cast(r1.indicatorvalue as double) as YWPT_ZHQI_CD_038_GZ_2,
> cast(r2.indicatorvalue as double) as YWPT_ZHQI_CD_038_YW_7,
> cast(r3.indicatorvalue as double) as YWPT_ZHQI_CD_038_YW_5,
> cast(r4.indicatorvalue as double) as YWPT_ZHQI_CD_038_YW_6,
> cast(r5.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00029,
> cast(r6.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00028,
> cast(r7.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00015,
> cast(r8.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00014,
> cast(r9.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00011,
> cast(r10.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00010,
> cast(r11.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00013,
> cast(r12.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00012,
> cast(r13.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00027,
> cast(r14.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00026,
> cast(r15.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00046,
> cast(r16.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00047,
> cast(r17.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00049,
> cast(r18.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00048,
> cast(r19.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00024,
> cast(r20.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00025,
> cast(r21.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00022,
> cast(r22.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00023,
> cast(r23.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00054,
> cast(r24.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00055,
> cast(r25.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00033,
> cast(r26.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00032,
> cast(r27.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00053,
> cast(r28.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00052,
> cast(r29.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00051,
> cast(r30.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00050,
> cast(r31.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00043,
> cast(r32.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00042,
> cast(r33.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00017,
> cast(r34.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00016,
> cast(r35.indicatorvalue as double) as YWPT_ZHQI_CD_038_GZ_3,
> cast(r36.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00045,
> cast(r37.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00044,
> cast(r38.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00038,
> cast(r39.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00039,
> cast(r40.indicatorvalue as double) as YWPT_ZHQI_CD_038_XT_00037,
> 

[jira] [Updated] (FLINK-22194) KafkaSourceReaderTest.testCommitOffsetsWithoutAliveFetchers fail due to commit timeout

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-22194:
---
  Labels: auto-deprioritized-critical stale-major test-stability  (was: 
stale-critical stale-major test-stability)
Priority: Major  (was: Critical)

This issue was labeled "stale-critical" 7 ago and has not received any updates 
so it is being deprioritized. If this ticket is actually Critical, please raise 
the priority and ask a committer to assign you the issue or revive the public 
discussion.


> KafkaSourceReaderTest.testCommitOffsetsWithoutAliveFetchers fail due to 
> commit timeout
> --
>
> Key: FLINK-22194
> URL: https://issues.apache.org/jira/browse/FLINK-22194
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.0, 1.12.4
>Reporter: Guowei Ma
>Priority: Major
>  Labels: auto-deprioritized-critical, stale-major, test-stability
> Fix For: 1.14.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16308=logs=b0097207-033c-5d9a-b48c-6d4796fbe60d=e8fcc430-213e-5cce-59d4-6942acf09121=6535
> {code:java}
> [ERROR] 
> testCommitOffsetsWithoutAliveFetchers(org.apache.flink.connector.kafka.source.reader.KafkaSourceReaderTest)
>   Time elapsed: 60.123 s  <<< ERROR!
> java.util.concurrent.TimeoutException: The offset commit did not finish 
> before timeout.
>   at 
> org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:210)
>   at 
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReaderTest.pollUntil(KafkaSourceReaderTest.java:285)
>   at 
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReaderTest.testCommitOffsetsWithoutAliveFetchers(KafkaSourceReaderTest.java:129)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
>   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-4642) Remove unnecessary Guava dependency from flink-streaming-java

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-4642:
--
Labels: auto-unassigned stale-major  (was: auto-unassigned)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Remove unnecessary Guava dependency from flink-streaming-java
> -
>
> Key: FLINK-4642
> URL: https://issues.apache.org/jira/browse/FLINK-4642
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Priority: Major
>  Labels: auto-unassigned, stale-major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-4808) Allow skipping failed checkpoints

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-4808:
--
Labels: stale-major  (was: )

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Allow skipping failed checkpoints
> -
>
> Key: FLINK-4808
> URL: https://issues.apache.org/jira/browse/FLINK-4808
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing
>Affects Versions: 1.1.2, 1.1.3
>Reporter: Stephan Ewen
>Priority: Major
>  Labels: stale-major
>
> Currently, if Flink cannot complete a checkpoint, it results in a failure and 
> recovery.
> To make the impact of less stable storage infrastructure on the performance 
> of Flink less severe, Flink should be able to tolerate a certain number of 
> failed checkpoints and simply keep executing.
> This should be controllable via a parameter, for example:
> {code}
> env.getCheckpointConfig().setAllowedFailedCheckpoints(3);
> {code}
> A value of {{-1}} could indicate an infinite number of checkpoint failures 
> tolerated by Flink.
> The default value should still be {{0}}, to keep compatibility with the 
> existing behavior.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-4194) Implement isEndOfStream() for KinesisDeserializationSchema

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-4194:
--
Labels: auto-unassigned stale-major  (was: auto-unassigned)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Implement isEndOfStream() for KinesisDeserializationSchema
> --
>
> Key: FLINK-4194
> URL: https://issues.apache.org/jira/browse/FLINK-4194
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kinesis
>Reporter: Robert Metzger
>Priority: Major
>  Labels: auto-unassigned, stale-major
>
> **Original JIRA title: KinesisDeserializationSchema.isEndOfStream() is never 
> called. The corresponding part of the code has been commented out with 
> reference to this JIRA.**
> The Kinesis connector does not respect the 
> {{KinesisDeserializationSchema.isEndOfStream()}} method.
> The purpose of this method is to stop consuming from a source, based on input 
> data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-4942) Improve processing performance of HeapInternalTimerService with key groups

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-4942:
--
Labels: auto-unassigned stale-major  (was: auto-unassigned)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Improve processing performance of HeapInternalTimerService with key groups
> --
>
> Key: FLINK-4942
> URL: https://issues.apache.org/jira/browse/FLINK-4942
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stefan Richter
>Priority: Major
>  Labels: auto-unassigned, stale-major
>
> Currently, key groups awareness in `HeapInternalTimerService` is basically 
> implemented as (hash) map of (hash) sets. Purpose of this is grouping key 
> groups together in a way that allows easy serialization into key groups.
> However, this data layout comes along with some significant performance 
> decrease, in particular when the number of key groups is high.
> I suggest to keep all timers in one set again at runtime, thus being as fast 
> as in previous versions without key groups.
> Instead, we can perform a very fast online partitioning into key groups 
> before a snapshot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-4498) Better Cassandra sink documentation

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-4498:
--
Labels: auto-unassigned stale-major  (was: auto-unassigned)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Better Cassandra sink documentation
> ---
>
> Key: FLINK-4498
> URL: https://issues.apache.org/jira/browse/FLINK-4498
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Cassandra, Documentation
>Affects Versions: 1.1.0
>Reporter: Elias Levy
>Priority: Major
>  Labels: auto-unassigned, stale-major
>
> The Cassandra sink documentation is somewhat muddled and could be improved.  
> For instance, the fact that is only supports tuples and POJO's that use 
> DataStax Mapper annotations is only mentioned in passing, and it is not clear 
> that the reference to tuples only applies to Flink Java tuples and not Scala 
> tuples.  
> The documentation also does not mention that setQuery() is only necessary for 
> tuple streams. 
> The explanation of the write ahead log could use some cleaning up to clarify 
> when it is appropriate to use, ideally with an example.  Maybe this would be 
> best as a blog post to expand on the type of non-deterministic streams this 
> applies to.
> It would also be useful to mention that tuple elements will be mapped to 
> Cassandra columns using the Datastax Java driver's default encoders, which 
> are somewhat limited (e.g. to write to a blob column the type in the tuple 
> must be a java.nio.ByteBuffer and not just a byte[]).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-4855) Add partitionedKeyBy to DataStream

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-4855:
--
Labels: auto-unassigned stale-major  (was: auto-unassigned)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Add partitionedKeyBy to DataStream
> --
>
> Key: FLINK-4855
> URL: https://issues.apache.org/jira/browse/FLINK-4855
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Xiaowei Jiang
>Priority: Major
>  Labels: auto-unassigned, stale-major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> After we do any interesting operations (e.g. reduce) on KeyedStream, the 
> result becomes DataStream. In a lot of cases, the output still has the same 
> or compatible keys with the KeyedStream (logically). But to do further 
> operations on these keys, we are forced to use keyby again. This works 
> semantically, but is costly in two aspects. First, it destroys the 
> possibility of chaining, which is one of the most important optimization 
> technique. Second, keyby will greatly expand the connected components of 
> tasks, which has implications in failover optimization.
> To address this shortcoming, we propose a new operator partitionedKeyBy.
> DataStream {
> public  KeyedStream partitionedKeyBy(KeySelector key)
> }
> Semantically, DataStream.partitionedKeyBy(key) is equivalent to 
> DataStream.keyBy(partitionedKey) where partitionedKey is key plus the taskid 
> as an extra field. This guarantees that records from different tasks will 
> never produce the same keys.
> With this, it's possible to do
> ds.keyBy(key1).reduce(func1)
> .partitionedKeyBy(key1).reduce(func2)
> .partitionedKeyBy(key2).reduce(func3);
> Most importantly, in certain cases, we will be able to chains these into a 
> single vertex.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-4854) Efficient Batch Operator in Streaming

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-4854:
--
Labels: auto-unassigned features stale-major  (was: auto-unassigned 
features)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Efficient Batch Operator in Streaming
> -
>
> Key: FLINK-4854
> URL: https://issues.apache.org/jira/browse/FLINK-4854
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Xiaowei Jiang
>Priority: Major
>  Labels: auto-unassigned, features, stale-major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Very often, it's more efficient to process a batch of records at once instead 
> of processing them one by one. We can use window to achieve this 
> functionality. However, window will store all records in states, which can be 
> costly. It's desirable to have an efficient implementation of batch operator. 
> The batch operator works per task and behave similarly to aligned windows. 
> Here is an example of how the interface looks like to a user.
> {code}
> interface BatchFunction {
> // add the record to the buffer
> // returns if the batch is ready to be flushed
> boolean addRecord(T record);
> // process all pending records in the buffer
> void flush(Collector collector) ;
> }
> DataStream ds = ...
> BatchFunction func = ...
> ds.batch(func);
> {code}
> The operator calls addRecord for each record. The batch function saves the 
> record in its own buffer. The addRecord returns if the pending buffer should 
> be flushed. In that case, the operator invokes flush.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22932) RocksDBStateBackendWindowITCase fails with savepoint timeout

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-22932:
---
Labels: stale-critical test-stability  (was: test-stability)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Critical but is unassigned and neither itself nor its Sub-Tasks have been 
updated for 7 days. I have gone ahead and marked it "stale-critical". If this 
ticket is critical, please either assign yourself or give an update. 
Afterwards, please remove the label or in 7 days the issue will be 
deprioritized.


> RocksDBStateBackendWindowITCase fails with savepoint timeout
> 
>
> Key: FLINK-22932
> URL: https://issues.apache.org/jira/browse/FLINK-22932
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.13.1
>Reporter: Roman Khachatryan
>Priority: Critical
>  Labels: stale-critical, test-stability
> Fix For: 1.13.2
>
>
> Initially 
> [reported|https://issues.apache.org/jira/browse/FLINK-22067?focusedCommentId=17358306=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17358306]
>  in FLINK-22067
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18709=logs=a8bc9173-2af6-5ba8-775c-12063b4f1d54=46a16c18-c679-5905-432b-9be5d8e27bc6=10183
> Savepoint is triggered but is not completed in time.
> {noformat}
> 2021-06-06T22:27:46.4845045Z Jun 06 22:27:46 java.lang.RuntimeException: 
> Failed to take savepoint
> 2021-06-06T22:27:46.4846088Z Jun 06 22:27:46  at 
> org.apache.flink.state.api.utils.SavepointTestBase.takeSavepoint(SavepointTestBase.java:71)
> 2021-06-06T22:27:46.4847049Z Jun 06 22:27:46  at 
> org.apache.flink.state.api.utils.SavepointTestBase.takeSavepoint(SavepointTestBase.java:46)
> 2021-06-06T22:27:46.4848262Z Jun 06 22:27:46  at 
> org.apache.flink.state.api.SavepointWindowReaderITCase.testApplyEvictorWindowStateReader(SavepointWindowReaderITCase.java:350)
> 2021-06-06T22:27:46.4854133Z Jun 06 22:27:46  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-06-06T22:27:46.4855430Z Jun 06 22:27:46  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-06-06T22:27:46.4856528Z Jun 06 22:27:46  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-06-06T22:27:46.4857487Z Jun 06 22:27:46  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-06-06T22:27:46.4858685Z Jun 06 22:27:46  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2021-06-06T22:27:46.4859773Z Jun 06 22:27:46  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-06-06T22:27:46.4860964Z Jun 06 22:27:46  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2021-06-06T22:27:46.4862306Z Jun 06 22:27:46  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-06-06T22:27:46.4863756Z Jun 06 22:27:46  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2021-06-06T22:27:46.4864993Z Jun 06 22:27:46  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2021-06-06T22:27:46.4866179Z Jun 06 22:27:46  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> 2021-06-06T22:27:46.4867272Z Jun 06 22:27:46  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2021-06-06T22:27:46.4868255Z Jun 06 22:27:46  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2021-06-06T22:27:46.4869045Z Jun 06 22:27:46  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2021-06-06T22:27:46.4869902Z Jun 06 22:27:46  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2021-06-06T22:27:46.4871038Z Jun 06 22:27:46  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2021-06-06T22:27:46.4871756Z Jun 06 22:27:46  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2021-06-06T22:27:46.4872502Z Jun 06 22:27:46  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2021-06-06T22:27:46.4873389Z Jun 06 22:27:46  at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2021-06-06T22:27:46.4874150Z Jun 06 22:27:46  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2021-06-06T22:27:46.4874914Z Jun 06 22:27:46  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2021-06-06T22:27:46.4875661Z Jun 06 22:27:46  at 
> 

[jira] [Updated] (FLINK-22533) Allow creating custom metrics

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-22533:
---
Labels: stale-major  (was: )

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Allow creating custom metrics
> -
>
> Key: FLINK-22533
> URL: https://issues.apache.org/jira/browse/FLINK-22533
> Project: Flink
>  Issue Type: Improvement
>  Components: Stateful Functions
>Reporter: Igal Shilman
>Priority: Major
>  Labels: stale-major
> Fix For: statefun-3.1.0
>
>
> Currently it is not possible to create custom metrics in StateFun.
> Let us consider supporting these. 
>  
> Mailing list thread: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-metrics-in-Stateful-Functions-td43282.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-4815) Automatic fallback to earlier checkpoints when checkpoint restore fails

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-4815:
--
Labels: stale-major  (was: )

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Automatic fallback to earlier checkpoints when checkpoint restore fails
> ---
>
> Key: FLINK-4815
> URL: https://issues.apache.org/jira/browse/FLINK-4815
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing
>Reporter: Stephan Ewen
>Priority: Major
>  Labels: stale-major
>
> Flink should keep multiple completed checkpoints.
> When the restore of one completed checkpoint fails for a certain number of 
> times, the CheckpointCoordinator should fall back to an earlier checkpoint to 
> restore.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-4785) Flink string parser doesn't handle string fields containing two consecutive double quotes

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-4785:
--
Labels: csv stale-major  (was: csv)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Flink string parser doesn't handle string fields containing two consecutive 
> double quotes
> -
>
> Key: FLINK-4785
> URL: https://issues.apache.org/jira/browse/FLINK-4785
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataSet
>Affects Versions: 1.1.2
>Reporter: Flavio Pompermaier
>Priority: Major
>  Labels: csv, stale-major
>
> To reproduce the error run 
> https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/flink/Csv2RowExample.java



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-4965) AllPairsShortestPaths

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-4965:
--
Labels: algorithm auto-unassigned stale-major  (was: algorithm 
auto-unassigned)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> AllPairsShortestPaths
> -
>
> Key: FLINK-4965
> URL: https://issues.apache.org/jira/browse/FLINK-4965
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Graph Processing (Gelly)
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Priority: Major
>  Labels: algorithm, auto-unassigned, stale-major
>
> Add a Gelly implementation of {{AllPairsShortestPaths}} to complement the 
> existing {{SingleSourceShortestPaths}}.
> Flink algorithms excel at processing big, sparse data. APSP is big, really 
> big, but not at all sparse. Considering only undirected graphs, each 
> component of size {{n}} will have {{n choose 2}} shortest paths (1,000 
> vertices => ~million paths, 1,000,000 vertices => ~trillion shortest paths).
> Considerations are directed vs undirected and weighted vs unweighted graphs. 
> The actual shortest path (not merely the distance) is required for follow-on 
> algorithms such as betweenness centrality.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22584) Use protobuf-shaded in StateFun core.

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-22584:
---
Labels: developer-experience stale-major  (was: developer-experience)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Use protobuf-shaded in StateFun core.
> -
>
> Key: FLINK-22584
> URL: https://issues.apache.org/jira/browse/FLINK-22584
> Project: Flink
>  Issue Type: Improvement
>  Components: Stateful Functions
>Reporter: Igal Shilman
>Priority: Major
>  Labels: developer-experience, stale-major
> Fix For: statefun-3.1.0
>
>
> We have *statefun-protobuf-shaded* module, that was introduced for the remote 
> Java sdk.
> we can use it to shade protobuf internally, to reduce the dependency surface.
> The major hurdle we need to overcome is that, in embedded functions, we have 
> to be able to accept instances of protobuf generated messages by the user.
> For example:
> {code:java}
> UserProfile userProfile = UserProfile.newBilder().build();
> context.send(..., userProfile) {code}
> If we will simply use the shaded Protobuf version, we will get immediately a 
> class cast exception.
> One way to overcome this is to use reflection and find the well known methods 
> on the generated classes and call toBytes() / parseFrom() reflectively.
> This however will cause a significant slow down, even by using MethodHandles.
> A small experiment that I've previously done with ByteBuddy mitigates this, 
> by generating 
> accessors, in pre-flight:
> {code:java}
> package org.apache.flink.statefun.flink.common.protobuf.serde;
> import static net.bytebuddy.matcher.ElementMatchers.named;import 
> java.io.InputStream;
> import java.io.OutputStream;
> import java.lang.reflect.InvocationTargetException;
> import java.lang.reflect.Method;
> import net.bytebuddy.ByteBuddy;
> import net.bytebuddy.dynamic.DynamicType;
> import net.bytebuddy.implementation.FixedValue;
> import net.bytebuddy.implementation.MethodCall;
> import net.bytebuddy.implementation.bytecode.assign.Assigner;final class 
> ReflectiveProtobufSerde {  @SuppressWarnings({"unchecked", "rawtypes"})
>   static  ProtobufSerde ofProtobufGeneratedType(Class type) {
> try {
>   DynamicType.Unloaded unloaded = 
> configureByteBuddy(type);  Class writer = 
> unloaded.load(type.getClassLoader()).getLoaded();  return 
> (ProtobufSerde) writer.getDeclaredConstructor().newInstance();
> } catch (Throwable e) {
>   throw new IllegalArgumentException();
> }
>   }  @SuppressWarnings("rawtypes")
>   private static DynamicType.Unloaded 
> configureByteBuddy(Class type)
>   throws NoSuchMethodException, InvocationTargetException, 
> IllegalAccessException {
> Method writeToMethod = type.getMethod("writeTo", OutputStream.class);
> Method parseFromMethod = type.getMethod("parseFrom", InputStream.class);
> Method getSerializedSizeMethod = type.getMethod("getSerializedSize");
> // get the message full name
> Method getDescriptorMethod = type.getMethod("getDescriptor");
> Object descriptor = getDescriptorMethod.invoke(null);
> Method getFullNameMethod = descriptor.getClass().getMethod("getFullName");
> String messageFullName = (String) getFullNameMethod.invoke(descriptor);   
>  return new ByteBuddy()
> .subclass(ProtobufSerde.class)
> .typeVariable("M", type)
> .method(named("writeTo"))
> .intercept(
> MethodCall.invoke(writeToMethod)
> .onArgument(0)
> .withArgument(1)
> .withAssigner(Assigner.DEFAULT, Assigner.Typing.DYNAMIC))
> .method(named("parseFrom"))
> .intercept(MethodCall.invoke(parseFromMethod).withArgument(0))
> .method(named("getSerializedSize"))
> .intercept(
> MethodCall.invoke(getSerializedSizeMethod)
> .onArgument(0)
> .withAssigner(Assigner.DEFAULT, Assigner.Typing.DYNAMIC))
> .method(named("getMessageFullName"))
> .intercept(FixedValue.value(messageFullName))
> .make();
>   }
> }
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-4883) Prevent UDFs implementations through Scala singleton objects

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-4883:
--
Labels: auto-unassigned pull-request-available stale-major  (was: 
auto-unassigned pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Prevent UDFs implementations through Scala singleton objects
> 
>
> Key: FLINK-4883
> URL: https://issues.apache.org/jira/browse/FLINK-4883
> Project: Flink
>  Issue Type: Bug
>  Components: API / Scala
>Reporter: Stefan Richter
>Priority: Major
>  Labels: auto-unassigned, pull-request-available, stale-major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, user can create and use UDFs in Scala like this:
> {code}
> object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] 
> {
> ...
> }
> {code}
> However, this leads to problems as the UDF is now a singleton that Flink 
> could use across several operator instances, which leads to job failures. We 
> should detect and prevent the usage of singleton UDFs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22945) StackOverflowException can happen when a large scale job is CANCELING/FAILING

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-22945:
---
Labels: stale-critical  (was: )

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Critical but is unassigned and neither itself nor its Sub-Tasks have been 
updated for 7 days. I have gone ahead and marked it "stale-critical". If this 
ticket is critical, please either assign yourself or give an update. 
Afterwards, please remove the label or in 7 days the issue will be 
deprioritized.


> StackOverflowException can happen when a large scale job is CANCELING/FAILING
> -
>
> Key: FLINK-22945
> URL: https://issues.apache.org/jira/browse/FLINK-22945
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.13.1, 1.12.4
>Reporter: Zhu Zhu
>Priority: Critical
>  Labels: stale-critical
>
> The pending requests in ExecutionSlotAllocator are not cleared when a job 
> transitions to CANCELING or FAILING, while all vertices will be canceled and 
> assigned slot will be returned. The returned slot is possible to be used to 
> fulfill the pending request of a CANCELED vertex and the assignment will fail 
> immediately and the slot will be returned and used to fulfilled another 
> vertex in a recursive way. StackOverflow can happen in this way when there 
> are many vertices, and fatal error can happen and lead to JM will crash. A 
> sample call stack is attached below.
> To fix this problem, we should clear the pending requests in 
> ExecutionSlotAllocator when a job is CANCELING or FAILING. Besides that, I 
> think it's better to also improve the call stack of slot assignment to avoid 
> similar StackOverflowException to occur.
> ...
>   at 
> org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:533)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:512)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_102]
>   at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> ~[?:1.8.0_102]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316)
>  ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
>   at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_102]
>   at 
> 

[jira] [Updated] (FLINK-15455) Enable TCP connection reuse across multiple jobs.

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-15455:
---
Labels: stale-major  (was: )

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Enable TCP connection reuse across multiple jobs.
> -
>
> Key: FLINK-15455
> URL: https://issues.apache.org/jira/browse/FLINK-15455
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Major
>  Labels: stale-major
>
> Currently, tcp connections can be only reuse by tasks residing in the same 
> TaskManager and consumes the same IntermediateResult. And after job finish or 
> failover, the TCP connections are closed and new connections must be setup 
> latter.
> As an improvement, we can make tcp connections a cluster level resource which 
> can be reused by multi jobs. The advantages are as follows:
>  # Reduce the number of TCP connections so we can save some resources.
>  # Reduce the overhead of connection setup and close so restarted jobs after 
> failover and latter jobs submitted to the same session cluster can reuse the 
> previous connections.
> We use Flink session cluster as a service for ad-hoc queries and the users 
> can produce some statistics or create some statements and reports at any 
> time. Most of the queries finish in 2s and we find tcp connection reuse help 
> a lot to reduce the average execution time which means more queries can be 
> processed using the same resource and time with even better user experience.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22643) Too many TCP connections among TaskManagers for large scale jobs

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-22643:
---
Labels: stale-major  (was: )

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Too many TCP connections among TaskManagers for large scale jobs
> 
>
> Key: FLINK-22643
> URL: https://issues.apache.org/jira/browse/FLINK-22643
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.13.0
>Reporter: Zhilong Hong
>Priority: Major
>  Labels: stale-major
> Fix For: 1.14.0
>
>
> For the large scale jobs, there will be too many TCP connections among 
> TaskManagers. Let's take an example.
> For a streaming job with 20 JobVertices, each JobVertex has 500 parallelism. 
> We divide the vertices into 5 slot sharing groups. Each TaskManager has 5 
> slots. Thus there will be 400 taskmanagers in this job. Let's assume that job 
> runs on a cluster with 20 machines.
> If all the job edges are all-to-all edges, there will be 19 * 20 * 399 * 2 = 
> 303,240 TCP connections for each machine. If we run several jobs on this 
> cluster, the TCP connections may exceed the maximum limit of linux, which is 
> 1,048,576. This will stop the TaskManagers from creating new TCP connections 
> and cause task failovers.
> As we run our production jobs on a K8S cluster, the job always failover due 
> to exceptions related to network, such as {{Sending the partition request to 
> 'null' failed}}, and etc.
> We think that we can decrease the number of connections by letting tasks 
> reuse the same connection. We implemented a POC that makes all tasks on the 
> same TaskManager reuse one TCP connection. For the example job we mentioned 
> above, the number of connections will decrease from 303,240 to 15960. With 
> the POC, the frequency of meeting exceptions related to network in our 
> production jobs drops significantly.
> The POC is illustrated in: 
> https://github.com/wsry/flink/commit/bf1c09e80450f40d018a1d1d4fe3dfd2de777fdc
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12022) Enable StreamWriter to update file length on sync flush

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-12022:
---
  Labels: auto-deprioritized-major auto-unassigned  (was: auto-unassigned 
stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Enable StreamWriter to update file length on sync flush
> ---
>
> Key: FLINK-12022
> URL: https://issues.apache.org/jira/browse/FLINK-12022
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.6.4, 1.7.2
>Reporter: Paul Lin
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> Currently, users of file systems that do not support truncating have to 
> struggle with BucketingSink and use its valid length file to indicate the 
> checkpointed data position. The problem is that by default the file length 
> will only be updated when a block is full or the file is closed, but when the 
> job crashes and the file is not closed properly, the file length is still 
> behind its actual value and the checkpointed file length. When the job 
> restarts, it looks like data loss, because the valid length is bigger than 
> the file. This situation lasts until namenode notices the change of block 
> size of the file, and it could be half an hour or more.
> So I propose to add an option to StreamWriterBase to update file lengths on 
> each flush. This can be expensive because it involves namenode and should be 
> used when strong consistency is needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12008) Support read a whole directory or multiple input data files for read apis of HadoopInputs

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-12008:
---
  Labels: auto-deprioritized-major auto-unassigned  (was: auto-unassigned 
stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Support read a whole directory or multiple input data files for read apis of 
> HadoopInputs
> -
>
> Key: FLINK-12008
> URL: https://issues.apache.org/jira/browse/FLINK-12008
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hadoop Compatibility
>Reporter: vinoyang
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> Currently, the read APIs provided by {{HadoopInputs}} only can read one path. 
> I think it's not strong enough. We should support read a whole directory or 
> multiple input files.
> Hadoop provides {{org.apache.hadoop.mapred.FileInputFormat.setInputPaths()}} 
> to support this requirement. 
> Spark's {{sequenceFile}} API calls this 
> API([https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1049].)
> Flink calls {{org.apache.hadoop.mapred.FileInputFormat.addInputPath}} which 
> only  supports one path.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12061) Add more window operator contract tests to table runtime

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-12061:
---
  Labels: auto-deprioritized-major auto-unassigned  (was: auto-unassigned 
stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Add more window operator contract tests to table runtime
> 
>
> Key: FLINK-12061
> URL: https://issues.apache.org/jira/browse/FLINK-12061
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime, Tests
>Reporter: Kurt Young
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> [FLINK-11959|https://issues.apache.org/jira/browse/FLINK-11959] introduced a 
> window operator for table runtime. But the tests are not enough comparing to 
> DataStream's window operator. We can borrow more tests from it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12138) Limit input split count of each source task for better failover experience

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-12138:
---
  Labels: auto-deprioritized-major auto-unassigned  (was: auto-unassigned 
stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Limit input split count of each source task for better failover experience
> --
>
> Key: FLINK-12138
> URL: https://issues.apache.org/jira/browse/FLINK-12138
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.9.0
>Reporter: Zhu Zhu
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> Flink currently use an InputSplitAssigner to dynamically assign input splits 
> to source tasks. A task requests a new split after finishes processing the 
> previous one. Thus to achieve a better load balance.
> However, in cases that the slots are fewer than the source tasks, only the 
> first launched source tasks can request splits and it will last till all the 
> splits are consumed. This is not failover friendly, as users sometimes 
> intentionally set a larger parallelism to reduce the failover impact.
> For example, a job runs in an 10 slots session and it has an 1000 parallelism 
> source vertex to consume 1 splits, all vertices are not connected to 
> others. Currently, 10 of 1000 source task will be launched and will only 
> finish after all the input splits are consumed. If a task fails, at most 
> ~1000 splits need to be re-processed. While if 1000 tasks can run at once, 
> only ~10 splits needs to be re-processed.
>  
> We's propose add a cap for the input splits count that each source task shall 
> process. Once the cap is reached, the task cannot get any more split from the 
> InputSplitAssigner and finishes then. Thus slot space can be made for other 
> source tasks.
> Theoretically, it would be proper to set the cap to be max(Input split 
> size)/avg(input split size).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12045) Support force migration between incompatible state schemas/serializers

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-12045:
---
  Labels: auto-deprioritized-major auto-unassigned pull-request-available  
(was: auto-unassigned pull-request-available stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Support force migration between incompatible state schemas/serializers
> --
>
> Key: FLINK-12045
> URL: https://issues.apache.org/jira/browse/FLINK-12045
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System, Runtime / State Backends
>Reporter: Yu Li
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned, 
> pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Under certain circumstances user would choose to do force state schema 
> migration, for example since we suggest to avoid using Kryo for state 
> serialization, user might want to use POJO/Avro serializers instead and wish 
> Flink automatically perform a force migration when required to.
> This JIRA targets at supporting such force migration requirements.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-12113:
---
  Labels: auto-deprioritized-major auto-unassigned  (was: auto-unassigned 
stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> User code passing to fromCollection(Iterator, Class) not cleaned
> 
>
> Key: FLINK-12113
> URL: https://issues.apache.org/jira/browse/FLINK-12113
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.7.2
>Reporter: yankai zhang
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
> Attachments: image-2019-04-07-21-52-37-264.png, 
> image-2019-04-08-23-19-27-359.png
>
>
>  
> {code:java}
> interface IS extends Iterator, Serializable { }
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }, Object.class);
> {code}
> Code piece above throws exception:
> {code:java}
> org.apache.flink.api.common.InvalidProgramException: The implementation of 
> the SourceFunction is not serializable. The object probably contains or 
> references non serializable fields.
>   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
> {code}
> And my workaround is wrapping clean around iterator instance, like this:
>  
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(env.clean(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }), Object.class);
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22611) add documentation about the semantics of timestamp for time window output

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-22611:
---
  Labels: auto-deprioritized-major  (was: stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> add documentation about the semantics of timestamp for time window output
> -
>
> Key: FLINK-22611
> URL: https://issues.apache.org/jira/browse/FLINK-22611
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zhenhao Li
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> It seems that all windowed aggregation output receives the timestamp metadata 
> if event time is being used for the processing.
> However, I couldn't find anything on the documentation pages about what 
> timestamp value each output gets. Intuitively, it makes sense to assume it is 
> the maximal timestamp from the input records of the window.
> After digging in the source code, I got the impression that the timestamp of 
> the output is actually 1 millisecond before the window end time. 
> See 
> https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java#L375
> and
> https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java#L84
> Correct me if I'm wrong. If I'm right, I think we should add documentation 
> about it since this is very important because it is not uncommon to chain 
> multiple windowed processes. Understanding the timestamp semantics is 
> critical to the correctness of the code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22547) OperatorCoordinatorHolderTest. verifyCheckpointEventOrderWhenCheckpointFutureCompletesLate fail

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-22547:
---
  Labels: auto-deprioritized-major test-stability  (was: stale-major 
test-stability)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> OperatorCoordinatorHolderTest. 
> verifyCheckpointEventOrderWhenCheckpointFutureCompletesLate fail
> ---
>
> Key: FLINK-22547
> URL: https://issues.apache.org/jira/browse/FLINK-22547
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.13.0, 1.14.0
>Reporter: Guowei Ma
>Priority: Minor
>  Labels: auto-deprioritized-major, test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17499=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=05b74a19-4ee4-5036-c46f-ada307df6cf0=7502
> {noformat}
> 2021-05-02T22:45:49.7343556Z May 02 22:45:49 java.lang.AssertionError
> 2021-05-02T22:45:49.7344688Z May 02 22:45:49  at 
> org.junit.Assert.fail(Assert.java:86)
> 2021-05-02T22:45:49.7345646Z May 02 22:45:49  at 
> org.junit.Assert.assertTrue(Assert.java:41)
> 2021-05-02T22:45:49.7346698Z May 02 22:45:49  at 
> org.junit.Assert.assertTrue(Assert.java:52)
> 2021-05-02T22:45:49.7353570Z May 02 22:45:49  at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolderTest.checkpointEventValueAtomicity(OperatorCoordinatorHolderTest.java:363)
> 2021-05-02T22:45:49.7355384Z May 02 22:45:49  at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolderTest.verifyCheckpointEventOrderWhenCheckpointFutureCompletesLate(OperatorCoordinatorHolderTest.java:331)
> 2021-05-02T22:45:49.7356826Z May 02 22:45:49  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-05-02T22:45:49.7904883Z May 02 22:45:49  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-05-02T22:45:49.7905443Z May 02 22:45:49  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-05-02T22:45:49.7905918Z May 02 22:45:49  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-05-02T22:45:49.7906402Z May 02 22:45:49  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2021-05-02T22:45:49.7907018Z May 02 22:45:49  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-05-02T22:45:49.7907555Z May 02 22:45:49  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2021-05-02T22:45:49.7909318Z May 02 22:45:49  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-05-02T22:45:49.7910078Z May 02 22:45:49  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2021-05-02T22:45:49.7910869Z May 02 22:45:49  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2021-05-02T22:45:49.7911597Z May 02 22:45:49  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> 2021-05-02T22:45:49.7912383Z May 02 22:45:49  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2021-05-02T22:45:49.7914058Z May 02 22:45:49  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2021-05-02T22:45:49.7915214Z May 02 22:45:49  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2021-05-02T22:45:49.7916058Z May 02 22:45:49  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2021-05-02T22:45:49.7916852Z May 02 22:45:49  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2021-05-02T22:45:49.7917550Z May 02 22:45:49  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2021-05-02T22:45:49.7919076Z May 02 22:45:49  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2021-05-02T22:45:49.7920292Z May 02 22:45:49  at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2021-05-02T22:45:49.7921041Z May 02 22:45:49  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2021-05-02T22:45:49.7921788Z May 02 22:45:49  at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 2021-05-02T22:45:49.7922652Z May 02 22:45:49  at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
> 2021-05-02T22:45:49.7923564Z May 02 22:45:49  at 
> 

[jira] [Updated] (FLINK-12182) AggregateProjectMergeRule can not handle LogicalWindowAggregate

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-12182:
---
  Labels: auto-deprioritized-major auto-unassigned  (was: auto-unassigned 
stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> AggregateProjectMergeRule can not handle LogicalWindowAggregate
> ---
>
> Key: FLINK-12182
> URL: https://issues.apache.org/jira/browse/FLINK-12182
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Hequn Cheng
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> Currently, we add the AggregateProjectMergeRule.INSTANCE in logical rule sets 
> and use it to remove project for aggregates. However, there are some bugs 
> when this rule be applied for the LogicalWindowAggregate.
> When the project was removed, the input field names are changed, so the rule 
> has to update all fields in the Aggregate, while the field name in 
> LogicalWindow in LogicalWindowAggregate has not taken into consideration in 
> AggregateProjectMergeRule, as it is a rule in Calcite.
> This problem also lies in other Aggregate rules such as 
> FilterAggregateTransposeRule and AggregateProjectPullUpConstantsRule, etc.
> As a quick fix, I think we can change
> {code:java}
> AggregateProjectMergeRule.INSTANCE,
> {code}
> to
> {code:java}
> new AggregateProjectMergeRule(
>   classOf[LogicalAggregate], classOf[Project], 
> RelFactories.LOGICAL_BUILDER),
> {code}
> Of course, we need a complete solution for the LogicalWindowAggregate(use 
> LogicalAggregate to express LogicalWindowAggregate), but not in this jira.
> Any suggestions are welcomed!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12149) Support Proto for Streaming File Sink

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-12149:
---
  Labels: auto-deprioritized-major auto-unassigned  (was: auto-unassigned 
stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Support Proto for Streaming File Sink 
> --
>
> Key: FLINK-12149
> URL: https://issues.apache.org/jira/browse/FLINK-12149
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Reporter: Kailash Hassan Dayanand
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> Currently we are supporting AuroParquetWriters in flink 
> here:[https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/formats/parquet/avro/ParquetAvroWriters.html].
>  Supporting an protoParquetWriter within Flink will be a good addition as 
> well. 
> Currently Parquet already have support for ProtoParquetWriters here: 
> [https://github.com/apache/parquet-mr/blob/master/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java].
>  We can extend the classes from here and make it available in the appropriate 
> format to support ProtoParquet writing. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12252) Support not escaped table name for External Catalogs in INSERT SQL statement

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-12252:
---
  Labels: auto-deprioritized-major auto-unassigned pull-request-available  
(was: auto-unassigned pull-request-available stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Support not escaped table name for External Catalogs in INSERT SQL statement
> 
>
> Key: FLINK-12252
> URL: https://issues.apache.org/jira/browse/FLINK-12252
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Legacy Planner
>Affects Versions: 1.8.0, 1.9.0
>Reporter: Artsem Semianenka
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned, 
> pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In case if I want to write SQL stream query which inserting data into the 
> sink which described in the external catalog I have to escape full table 
> description in quotes. Example :
> INSERT INTO +`test.db3.tb3`+ SELECT a,b,c,d FROM test.db2.tb2
> I see a discrepancy in query semantic because the SELECT statement described 
> without quotes.
> I propose to add support of queries without quotes in INSERT statement like 
>  INSERT INTO test.db3.tb3 SELECT a,b,c,d FROM test.db2.tb2
> Pull request is available



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12256) Implement Confluent Schema Registry Catalog

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-12256:
---
  Labels: auto-deprioritized-major auto-unassigned  (was: auto-unassigned 
stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Implement Confluent Schema Registry Catalog
> ---
>
> Key: FLINK-12256
> URL: https://issues.apache.org/jira/browse/FLINK-12256
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Reporter: Artsem Semianenka
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
> Attachments: Xnip2020-04-24_17-25-39.jpg
>
>
>  KafkaReadableCatalog is a special implementation of ReadableCatalog 
> interface (which introduced in 
> [FLIP-30|https://cwiki.apache.org/confluence/display/FLINK/FLIP-30%3A+Unified+Catalog+APIs]
>  )  to retrieve meta information such topic name/schema of the topic from 
> Apache Kafka and Confluent Schema Registry. 
> New ReadableCatalog allows a user to run SQL queries like:
> {code:java}
> Select * form kafka.topic_name  
> {code}
> without the need for manual definition of the table schema.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12210) Fix a bug in AbstractPagedInputView.readLine

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-12210:
---
  Labels: auto-deprioritized-major auto-unassigned pull-request-available  
(was: auto-unassigned pull-request-available stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Fix a bug in AbstractPagedInputView.readLine
> 
>
> Key: FLINK-12210
> URL: https://issues.apache.org/jira/browse/FLINK-12210
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Reporter: Liya Fan
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned, 
> pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In AbstractPagedInputView.readLine, character '\r' is removed in two places, 
> which is redundant. In addition, only trailing '\r' should be removed. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22857) Add possibility to call built-in functions in SpecializedFunction

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-22857:
---
Labels: stale-assigned  (was: )

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issue is assigned but has not 
received an update in 14, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a 
comment updating the community on your progress.  If this issue is waiting on 
feedback, please consider this a reminder to the committer/reviewer. Flink is a 
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone 
else may work on it. If the "warning_label" label is not removed in 7 days, the 
issue will be automatically unassigned.


> Add possibility to call built-in functions in SpecializedFunction
> -
>
> Key: FLINK-22857
> URL: https://issues.apache.org/jira/browse/FLINK-22857
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: stale-assigned
>
> This is the last missing piece to avoid code generation when developing 
> built-in functions. Core operations such as CAST, EQUALS, etc. will still use 
> code generation but other built-in functions should be able to use these core 
> operations without the need for generating code. It should be possible to 
> call other built-in functions via a SpecializedFunction.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22844) Add doc to introduce ExplainDetails for EXPLAIN sytnax

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-22844:
---
Labels: pull-request-available stale-assigned  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issue is assigned but has not 
received an update in 14, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a 
comment updating the community on your progress.  If this issue is waiting on 
feedback, please consider this a reminder to the committer/reviewer. Flink is a 
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone 
else may work on it. If the "warning_label" label is not removed in 7 days, the 
issue will be automatically unassigned.


> Add doc to introduce ExplainDetails for EXPLAIN sytnax 
> ---
>
> Key: FLINK-22844
> URL: https://issues.apache.org/jira/browse/FLINK-22844
> Project: Flink
>  Issue Type: New Feature
>  Components: Documentation, Table SQL / API
>Affects Versions: 1.14.0
>Reporter: WeiNan Zhao
>Assignee: WeiNan Zhao
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.14.0
>
>
> Link to FLINK-20562,add doc to introduct this new sytax.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16952) Parquet file system format support filter pushdown

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-16952:
---
Labels: auto-unassigned stale-assigned  (was: auto-unassigned)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issue is assigned but has not 
received an update in 14, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a 
comment updating the community on your progress.  If this issue is waiting on 
feedback, please consider this a reminder to the committer/reviewer. Flink is a 
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone 
else may work on it. If the "warning_label" label is not removed in 7 days, the 
issue will be automatically unassigned.


> Parquet file system format support filter pushdown
> ---
>
> Key: FLINK-16952
> URL: https://issues.apache.org/jira/browse/FLINK-16952
> Project: Flink
>  Issue Type: Sub-task
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Jingsong Lee
>Assignee: luoyuxia
>Priority: Major
>  Labels: auto-unassigned, stale-assigned
> Fix For: 1.14.0
>
>
> We can create the conversion between Flink Expression(NOTE: should be new 
> Expression instead of PlannerExpression) and parquet FilterPredicate.
> And apply to Parquet file system format.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-23003) Resource leak in RocksIncrementalSnapshotStrategy

2021-06-16 Thread Roman Khachatryan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan reassigned FLINK-23003:
-

Assignee: Yanfei Lei

> Resource leak in RocksIncrementalSnapshotStrategy
> -
>
> Key: FLINK-23003
> URL: https://issues.apache.org/jira/browse/FLINK-23003
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.14.0, 1.13.1, 1.12.4
> Environment: Flink: 1.14-SNAPSHOT
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.13.2
>
>
> We found that `RocksDBStateUploader` in `RocksIncrementalSnapshotStrategy` is 
> not closed correctly after being used. It would lead to a resource leak.
> `RocksDBStateUploader` inherits `RocksDBStateDataTransfer`, and 
> `RocksDBStateDataTransfer` holds an `ExecutorService`. `RocksDBStateUploader` 
> uses the `ExecutorService` to upload files to DFS asynchronously.
> When `RocksDBKeyedStateBackend` is cleaned up, all resources held by the 
> backend should be closed, but now `RocksIncrementalSnapshotStrategy` lacks a 
> close() function.
> And we encountered an example caused by this problem. When we benchmarked the 
> performance of incremental rescaling, we observed that the forked VM of JMH 
> can't exit normally.
> {code:java}
> [INFO]
> [INFO] --- exec-maven-plugin:1.6.0:exec (default-cli) @ benchmark ---
> # JMH version: 1.19
> # VM version: JDK 1.8.0_281, VM 25.281-b09
> # VM invoker: /home/leiyanfei.lyf/jdk1.8.0_281/jre/bin/java
> # VM options: -Djava.rmi.server.hostname=127.0.0.1 
> -Dcom.sun.management.jmxremote.authenticate=false 
> -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.ssl
> # Warmup: 10 iterations, 1 s each
> # Measurement: 10 iterations, 1 s each
> # Timeout: 10 min per iteration
> # Threads: 1 thread, will synchronize iterations
> # Benchmark mode: Average time, time/op
> # Benchmark: 
> org.apache.flink.state.RocksIncrementalCheckpointScaleUpBenchmark.ScalingUp
> # Parameters: (numberElements = 100, parallelism1 = 2, parallelism2 = 3)# Run 
> progress: 0.00% complete, ETA 00:02:00
> # Fork: 1 of 3
> # Warmup Iteration   1: 244.717 ms/op
> # Warmup Iteration   2: 104.749 ms/op
> # Warmup Iteration   3: 104.182 ms/op
> ...
> Iteration   1: 96.600 ms/op
> Iteration   2: 108.463 ms/op
> Iteration   3: 93.657 ms/op
> ... threads? Waiting 24 seconds more...>Non-finished threads:
> ...
> Thread[pool-15-thread-4,5,main]
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>   at 
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
>   at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> {code}
>  
> The root cause of this example is that the `{{RocksDBStateUploader}}` in 
> `{{RocksIncrementalSnapshotStrategy`}} is not closed normally when 
> `{{RocksDBKeyedStateBackend`}} is disposed.
>  
> The solution to this problem is quite straightforward, 
> `{{RocksDBStateUploader`}} in `{{RocksIncrementalSnapshotStrategy}}` can be 
> closed when cleaning up `{{RocksDBKeyedStateBackend}}`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22886) Thread leak in RocksDBStateUploader

2021-06-16 Thread Yue Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364376#comment-17364376
 ] 

Yue Ma commented on FLINK-22886:


[~roman_khachatryan]  Follow your suggestion, i merge the resulting PR with two 
commits .  [https://github.com/apache/flink/pull/16171]  
would you please take a review?
 

> Thread leak in RocksDBStateUploader
> ---
>
> Key: FLINK-22886
> URL: https://issues.apache.org/jira/browse/FLINK-22886
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.3, 1.13.1, 1.12.4
>Reporter: Jiayi Liao
>Assignee: Yue Ma
>Priority: Major
>  Labels: critical, pull-request-available
> Attachments: image-2021-06-06-13-46-34-604.png
>
>
> {{ExecutorService}} in {{RocksDBStateUploader}} is not shut down, which may 
> leak thread when tasks fail.
> BTW, we should name the thread group in {{ExecutorService}}, otherwise what 
> we see in the stack, is a lot of threads named with pool-m-thread-n like this:
>  
> !image-2021-06-06-13-46-34-604.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23011) FLIP-27 sources are generating non-deterministic results when using event time

2021-06-16 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-23011:
---
Issue Type: Bug  (was: New Feature)

> FLIP-27 sources are generating non-deterministic results when using event time
> --
>
> Key: FLINK-23011
> URL: https://issues.apache.org/jira/browse/FLINK-23011
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.14.0, 1.13.1, 1.12.4
> Environment: 
>Reporter: Piotr Nowojski
>Priority: Critical
>
> FLIP-27 sources currently start in the {{StreamStatus.IDLE}} state and they 
> switch to {{ACTIVE}} only after emitting first {{Watermark}}. Until this 
> happens, downstream operators are ignoring {{IDLE}} inputs from calculating 
> the input (min) watermark. 
> An extreme example to what problem this leads to, are completely bogus 
> results if for example one FLIP-27 source subtask is slower than others for 
> some reason:
> {code:java}
> env.getConfig().setAutoWatermarkInterval(2000);
> env.setParallelism(2);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
> 10));
> DataStream eventStream =
> env.fromSource(
> new NumberSequenceSource(0, Long.MAX_VALUE),
> WatermarkStrategy.forMonotonousTimestamps()
> .withTimestampAssigner(new 
> LongTimestampAssigner()),
> "NumberSequenceSource")
> .map(
> new RichMapFunction() {
> @Override
> public Long map(Long value) throws Exception {
> if 
> (getRuntimeContext().getIndexOfThisSubtask() == 0) {
> Thread.sleep(1);
> }
> return 1L;
> }
> });
> eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print();
> (...)
> private static class LongTimestampAssigner implements 
> SerializableTimestampAssigner {
> private long counter = 0;
> @Override
> public long extractTimestamp(Long record, long recordTimeStamp) {
> return counter++;
> }
> }
> {code}
> In such case, after 2 seconds ({{setAutoWatermarkInterval}}) the not 
> throttled subtask (subTaskId == 1) generates very high watermarks. The other 
> source subtask (subTaskId == 0) emits very low watermarks. If the non 
> throttled watermark reaches the downstream {{WindowOperator}} first, while 
> the other input channel is still idle, it will take those high watermarks as 
> combined input watermark for the the whole {{WindowOperator}}. When the input 
> channel from the throttled source subtask finally receives it's {{ACTIVE}} 
> status and a much lower watermark, that's already too late.
> Actual output of the example program:
> {noformat}
> 1596
> 2000
> 1000
> 1000
> 1000
> 1000
> 1000
> 1000
> (...)
> {noformat}
> while the expected output should be always "2000" (2000 records fitting in 
> every 1 second global window)
> {noformat}
> 2000
> 2000
> 2000
> 2000
> (...)
> {noformat}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22946) Network buffer deadlock introduced by unaligned checkpoint

2021-06-16 Thread Guokuai Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guokuai Huang updated FLINK-22946:
--
Labels: pull-request-available  (was: stale-blocker)

> Network buffer deadlock introduced by unaligned checkpoint
> --
>
> Key: FLINK-22946
> URL: https://issues.apache.org/jira/browse/FLINK-22946
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.13.0, 1.13.1
>Reporter: Guokuai Huang
>Assignee: Guokuai Huang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.13.2
>
> Attachments: Screen Shot 2021-06-09 at 6.39.47 PM.png, Screen Shot 
> 2021-06-09 at 7.02.04 PM.png
>
>
> We recently encountered deadlock when using unaligned checkpoint. Below are 
> two thread stacks that cause deadlock:
> {code:java}
> "Channel state writer Join(xx) (34/256)#1": at 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296)
>  - waiting to lock <0x0007296dfa90> (a 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460)
>  at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
>  at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
>  at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
>  at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
>  at 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399)
>  at 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200)
>  - locked <0x0007296bc450> (a 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
>  at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
>  at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
>  at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
>  at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.write(ChannelStateCheckpointWriter.java:173)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.writeInput(ChannelStateCheckpointWriter.java:131)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$write$0(ChannelStateWriteRequest.java:63)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$785/722492780.accept(Unknown
>  Source) at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$buildWriteRequest$2(ChannelStateWriteRequest.java:93)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$786/1360749026.accept(Unknown
>  Source) at 
> org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:212)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:82)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:59)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:96)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:75)
>  at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl$$Lambda$253/502209879.run(Unknown
>  Source) at java.lang.Thread.run(Thread.java:745){code}
> {code:java}
> "Join(xx) (34/256)#1": at 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296)
>  - waiting to lock <0x0007296bc450> (a 
> 

[jira] [Commented] (FLINK-13703) AvroTypeInfo requires objects to be strict POJOs (mutable, with setters)

2021-06-16 Thread Robert Metzger (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364270#comment-17364270
 ] 

Robert Metzger commented on FLINK-13703:


it seems that we really can relax the check here to cover this case. I don't 
have time to work on this right now, but I'm happy to review a PR (as long as 
it has good testing ;) )

> AvroTypeInfo requires objects to be strict POJOs (mutable, with setters)
> 
>
> Key: FLINK-13703
> URL: https://issues.apache.org/jira/browse/FLINK-13703
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Alexander Fedulov
>Priority: Minor
>
> There exists an option to generate Avro sources which would represent 
> immutable objects (`createSetters` option set to false) 
> [\[1\]|https://github.com/commercehub-oss/gradle-avro-plugin] , 
> [\[2\]|https://avro.apache.org/docs/current/api/java/org/apache/avro/mojo/AbstractAvroMojo.html].
>  Those objects still have full arguments constructors and are being correctly 
> dealt with by Avro. 
>  `AvroTypeInfo` in Flink performs a check to verify if a Class complies to 
> the strict POJO requirements (including setters) and throws an 
> IllegalStateException("Expecting type to be a PojoTypeInfo") otherwise. Can 
> this check be relaxed to provide better immutability support?
> +Steps to reproduce:+
> 1) Generate Avro sources from schema using `createSetters` option.
> 2) Use generated class in 
> `ConfluentRegistryAvroDeserializationSchema.forSpecific(GeneratedClass.class, 
> schemaRegistryUrl)`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-13703) AvroTypeInfo requires objects to be strict POJOs (mutable, with setters)

2021-06-16 Thread Robert Metzger (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger reassigned FLINK-13703:
--

Assignee: Robert Metzger

> AvroTypeInfo requires objects to be strict POJOs (mutable, with setters)
> 
>
> Key: FLINK-13703
> URL: https://issues.apache.org/jira/browse/FLINK-13703
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Alexander Fedulov
>Assignee: Robert Metzger
>Priority: Minor
>
> There exists an option to generate Avro sources which would represent 
> immutable objects (`createSetters` option set to false) 
> [\[1\]|https://github.com/commercehub-oss/gradle-avro-plugin] , 
> [\[2\]|https://avro.apache.org/docs/current/api/java/org/apache/avro/mojo/AbstractAvroMojo.html].
>  Those objects still have full arguments constructors and are being correctly 
> dealt with by Avro. 
>  `AvroTypeInfo` in Flink performs a check to verify if a Class complies to 
> the strict POJO requirements (including setters) and throws an 
> IllegalStateException("Expecting type to be a PojoTypeInfo") otherwise. Can 
> this check be relaxed to provide better immutability support?
> +Steps to reproduce:+
> 1) Generate Avro sources from schema using `createSetters` option.
> 2) Use generated class in 
> `ConfluentRegistryAvroDeserializationSchema.forSpecific(GeneratedClass.class, 
> schemaRegistryUrl)`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-13703) AvroTypeInfo requires objects to be strict POJOs (mutable, with setters)

2021-06-16 Thread Robert Metzger (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger reassigned FLINK-13703:
--

Assignee: (was: Robert Metzger)

> AvroTypeInfo requires objects to be strict POJOs (mutable, with setters)
> 
>
> Key: FLINK-13703
> URL: https://issues.apache.org/jira/browse/FLINK-13703
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Alexander Fedulov
>Priority: Minor
>
> There exists an option to generate Avro sources which would represent 
> immutable objects (`createSetters` option set to false) 
> [\[1\]|https://github.com/commercehub-oss/gradle-avro-plugin] , 
> [\[2\]|https://avro.apache.org/docs/current/api/java/org/apache/avro/mojo/AbstractAvroMojo.html].
>  Those objects still have full arguments constructors and are being correctly 
> dealt with by Avro. 
>  `AvroTypeInfo` in Flink performs a check to verify if a Class complies to 
> the strict POJO requirements (including setters) and throws an 
> IllegalStateException("Expecting type to be a PojoTypeInfo") otherwise. Can 
> this check be relaxed to provide better immutability support?
> +Steps to reproduce:+
> 1) Generate Avro sources from schema using `createSetters` option.
> 2) Use generated class in 
> `ConfluentRegistryAvroDeserializationSchema.forSpecific(GeneratedClass.class, 
> schemaRegistryUrl)`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-22968) Improve exception message when using toAppendStream[String]

2021-06-16 Thread Nicholas Jiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17363608#comment-17363608
 ] 

Nicholas Jiang edited comment on FLINK-22968 at 6/16/21, 11:59 AM:
---

[~jark], as the 'toAppendStream' is soft-deprecated, there is no need to 
improve the message for this method and this issue could be closed.

[~twalthr], what do you think about?


was (Author: nicholasjiang):
[~jark], If the 'toAppendStream' is deprecated, there is no need to improve the 
message for this method and this issue could be closed.

[~twalthr], what do you think about?

> Improve exception message when using toAppendStream[String]
> ---
>
> Key: FLINK-22968
> URL: https://issues.apache.org/jira/browse/FLINK-22968
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.3, 1.13.1
> Environment: {color:#FF}*Flink-1.13.1 and Flink-1.12.1*{color}
>Reporter: DaChun
>Assignee: Nicholas Jiang
>Priority: Major
> Attachments: test.scala, this_error.txt
>
>
> {code:scala}
> package com.bytedance.one
> import org.apache.flink.streaming.api.scala.{DataStream, 
> StreamExecutionEnvironment}
> import org.apache.flink.table.api.Table
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.api.scala._
> object test {
>   def main(args: Array[String]): Unit = {
> val env: StreamExecutionEnvironment = StreamExecutionEnvironment
>   .createLocalEnvironmentWithWebUI()
> val stream: DataStream[String] = env.readTextFile("data/wc.txt")
> val tableEnvironment: StreamTableEnvironment = 
> StreamTableEnvironment.create(env)
> val table: Table = tableEnvironment.fromDataStream(stream)
> tableEnvironment.createTemporaryView("wc", table)
> val res: Table = tableEnvironment.sqlQuery("select * from wc")
> tableEnvironment.toAppendStream[String](res).print()
> env.execute("test")
>   }
> }
> {code}
> When I run the program, the error is as follows:
> The specific error is in this_ error.txt,
> But the error prompts me to write an issue
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
> It's very easy to read a stream and convert it into a table mode. The generic 
> type is string
> Then an error is reported. The code is in test. Scala and the error is in 
> this_ error.txt
> But I try to create an entity class by myself, and then assign generics to 
> this entity class, and then I can pass it normally,
> Or with row type, do you have to create your own entity class, not with 
> string type?
> h3. Summary
> We should improve the exception message a bit, we can throw the given type 
> (String) is not allowed in {{toAppendStream}}.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-22968) Improve exception message when using toAppendStream[String]

2021-06-16 Thread Nicholas Jiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17363608#comment-17363608
 ] 

Nicholas Jiang edited comment on FLINK-22968 at 6/16/21, 11:55 AM:
---

[~jark], If the 'toAppendStream' is deprecated, there is no need to improve the 
message for this method and this issue could be closed.

[~twalthr], what do you think about?


was (Author: nicholasjiang):
[~jark], If the 'toAppendStream' is deprecated, there is no need to improve the 
message for this method and this issue could be closed.
[~twalthr], what do you think about?

> Improve exception message when using toAppendStream[String]
> ---
>
> Key: FLINK-22968
> URL: https://issues.apache.org/jira/browse/FLINK-22968
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.3, 1.13.1
> Environment: {color:#FF}*Flink-1.13.1 and Flink-1.12.1*{color}
>Reporter: DaChun
>Assignee: Nicholas Jiang
>Priority: Major
> Attachments: test.scala, this_error.txt
>
>
> {code:scala}
> package com.bytedance.one
> import org.apache.flink.streaming.api.scala.{DataStream, 
> StreamExecutionEnvironment}
> import org.apache.flink.table.api.Table
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.api.scala._
> object test {
>   def main(args: Array[String]): Unit = {
> val env: StreamExecutionEnvironment = StreamExecutionEnvironment
>   .createLocalEnvironmentWithWebUI()
> val stream: DataStream[String] = env.readTextFile("data/wc.txt")
> val tableEnvironment: StreamTableEnvironment = 
> StreamTableEnvironment.create(env)
> val table: Table = tableEnvironment.fromDataStream(stream)
> tableEnvironment.createTemporaryView("wc", table)
> val res: Table = tableEnvironment.sqlQuery("select * from wc")
> tableEnvironment.toAppendStream[String](res).print()
> env.execute("test")
>   }
> }
> {code}
> When I run the program, the error is as follows:
> The specific error is in this_ error.txt,
> But the error prompts me to write an issue
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
> It's very easy to read a stream and convert it into a table mode. The generic 
> type is string
> Then an error is reported. The code is in test. Scala and the error is in 
> this_ error.txt
> But I try to create an entity class by myself, and then assign generics to 
> this entity class, and then I can pass it normally,
> Or with row type, do you have to create your own entity class, not with 
> string type?
> h3. Summary
> We should improve the exception message a bit, we can throw the given type 
> (String) is not allowed in {{toAppendStream}}.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-22968) Improve exception message when using toAppendStream[String]

2021-06-16 Thread Nicholas Jiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17363608#comment-17363608
 ] 

Nicholas Jiang edited comment on FLINK-22968 at 6/16/21, 11:51 AM:
---

[~jark], If the 'toAppendStream' is deprecated, there is no need to improve the 
message for this method and this issue could be closed.
[~twalthr], what do you think about?


was (Author: nicholasjiang):
[~twalthr], as you mentioned, does this need to improve exception messages for 
toAppendStream?

> Improve exception message when using toAppendStream[String]
> ---
>
> Key: FLINK-22968
> URL: https://issues.apache.org/jira/browse/FLINK-22968
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.3, 1.13.1
> Environment: {color:#FF}*Flink-1.13.1 and Flink-1.12.1*{color}
>Reporter: DaChun
>Assignee: Nicholas Jiang
>Priority: Major
> Attachments: test.scala, this_error.txt
>
>
> {code:scala}
> package com.bytedance.one
> import org.apache.flink.streaming.api.scala.{DataStream, 
> StreamExecutionEnvironment}
> import org.apache.flink.table.api.Table
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.api.scala._
> object test {
>   def main(args: Array[String]): Unit = {
> val env: StreamExecutionEnvironment = StreamExecutionEnvironment
>   .createLocalEnvironmentWithWebUI()
> val stream: DataStream[String] = env.readTextFile("data/wc.txt")
> val tableEnvironment: StreamTableEnvironment = 
> StreamTableEnvironment.create(env)
> val table: Table = tableEnvironment.fromDataStream(stream)
> tableEnvironment.createTemporaryView("wc", table)
> val res: Table = tableEnvironment.sqlQuery("select * from wc")
> tableEnvironment.toAppendStream[String](res).print()
> env.execute("test")
>   }
> }
> {code}
> When I run the program, the error is as follows:
> The specific error is in this_ error.txt,
> But the error prompts me to write an issue
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
> It's very easy to read a stream and convert it into a table mode. The generic 
> type is string
> Then an error is reported. The code is in test. Scala and the error is in 
> this_ error.txt
> But I try to create an entity class by myself, and then assign generics to 
> this entity class, and then I can pass it normally,
> Or with row type, do you have to create your own entity class, not with 
> string type?
> h3. Summary
> We should improve the exception message a bit, we can throw the given type 
> (String) is not allowed in {{toAppendStream}}.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-23011) FLIP-27 sources are generating non-deterministic results when using event time

2021-06-16 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364252#comment-17364252
 ] 

Piotr Nowojski edited comment on FLINK-23011 at 6/16/21, 11:36 AM:
---

[~AHeise] suggested that as a hotfix we could block emitting 
watermarks/switching to {{IDLE}} until all splits are assigned 
({{SourceReader#notifyNoMoreSplits}}). This would be a very special case of the 
more general solution that I was proposing with {{SplitEnumerator}} capping the 
watermarks. This hotfix should work well in the cases where number of splits is 
determined/known from the beginning.


was (Author: pnowojski):
[~AHeise] suggested that as a hotfix we could block emitting 
watermarks/switching to {{IDLE}} until all splits are assigned 
({{SourceReader#notifyNoMoreSplits}}). This would be a very special case of the 
more general solution that I was proposing with {{SplitEnumerator}} capping the 
watermarks, that should work well in the cases where number of splits is 
determined/known from the beginning.

> FLIP-27 sources are generating non-deterministic results when using event time
> --
>
> Key: FLINK-23011
> URL: https://issues.apache.org/jira/browse/FLINK-23011
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.14.0, 1.13.1, 1.12.4
> Environment: 
>Reporter: Piotr Nowojski
>Priority: Critical
>
> FLIP-27 sources currently start in the {{StreamStatus.IDLE}} state and they 
> switch to {{ACTIVE}} only after emitting first {{Watermark}}. Until this 
> happens, downstream operators are ignoring {{IDLE}} inputs from calculating 
> the input (min) watermark. 
> An extreme example to what problem this leads to, are completely bogus 
> results if for example one FLIP-27 source subtask is slower than others for 
> some reason:
> {code:java}
> env.getConfig().setAutoWatermarkInterval(2000);
> env.setParallelism(2);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
> 10));
> DataStream eventStream =
> env.fromSource(
> new NumberSequenceSource(0, Long.MAX_VALUE),
> WatermarkStrategy.forMonotonousTimestamps()
> .withTimestampAssigner(new 
> LongTimestampAssigner()),
> "NumberSequenceSource")
> .map(
> new RichMapFunction() {
> @Override
> public Long map(Long value) throws Exception {
> if 
> (getRuntimeContext().getIndexOfThisSubtask() == 0) {
> Thread.sleep(1);
> }
> return 1L;
> }
> });
> eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print();
> (...)
> private static class LongTimestampAssigner implements 
> SerializableTimestampAssigner {
> private long counter = 0;
> @Override
> public long extractTimestamp(Long record, long recordTimeStamp) {
> return counter++;
> }
> }
> {code}
> In such case, after 2 seconds ({{setAutoWatermarkInterval}}) the not 
> throttled subtask (subTaskId == 1) generates very high watermarks. The other 
> source subtask (subTaskId == 0) emits very low watermarks. If the non 
> throttled watermark reaches the downstream {{WindowOperator}} first, while 
> the other input channel is still idle, it will take those high watermarks as 
> combined input watermark for the the whole {{WindowOperator}}. When the input 
> channel from the throttled source subtask finally receives it's {{ACTIVE}} 
> status and a much lower watermark, that's already too late.
> Actual output of the example program:
> {noformat}
> 1596
> 2000
> 1000
> 1000
> 1000
> 1000
> 1000
> 1000
> (...)
> {noformat}
> while the expected output should be always "2000" (2000 records fitting in 
> every 1 second global window)
> {noformat}
> 2000
> 2000
> 2000
> 2000
> (...)
> {noformat}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22984) UnsupportedOperationException when using Python UDF to generate watermark

2021-06-16 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364253#comment-17364253
 ] 

Dian Fu commented on FLINK-22984:
-

Thanks [~maver1ck] for reporting this issue. This is a good catch and we should 
support it.

> UnsupportedOperationException when using Python UDF to generate watermark
> -
>
> Key: FLINK-22984
> URL: https://issues.apache.org/jira/browse/FLINK-22984
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.13.0, 1.13.1
>Reporter: Maciej Bryński
>Priority: Critical
>
> Hi,
> I'm trying to use output of Python UDF (parse_data) to set watermark for the 
> table
> {code:java}
> CREATE TABLE test (
> data BYTES,
> ts as parse_data(data).ts,
> WATERMARK for ts as ts
> ) WITH (
>'connector' = 'kafka',
>'topic' = 'test',
>'properties.bootstrap.servers' = 'localhost:9092',
>'properties.group.id' = 'flink',
>'scan.startup.mode' = 'earliest-offset',
>'format' = 'raw'
> ){code}
> Then running SELECT on this table gives me exception
> {code:java}
> Py4JJavaError: An error occurred while calling o311.hasNext.
> : java.lang.RuntimeException: Failed to fetch next result
>   at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
>   at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
>   at 
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>   at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>   at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>   at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>   at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.io.IOException: Failed to fetch job execution result
>   at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:177)
>   at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
>   at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
>   ... 13 more
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>   at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022)
>   at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
>   ... 15 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
>   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680)
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658)
>   at 
> java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2094)
>   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134)
>   at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:174)
>   ... 15 more
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>   at 
> 

[jira] [Commented] (FLINK-23011) FLIP-27 sources are generating non-deterministic results when using event time

2021-06-16 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364252#comment-17364252
 ] 

Piotr Nowojski commented on FLINK-23011:


[~AHeise] suggested that as a hotfix we could block emitting 
watermarks/switching to {{IDLE}} until all splits are assigned 
({{SourceReader#notifyNoMoreSplits}}). This would be a very special case of the 
more general solution that I was proposing with {{SplitEnumerator}} capping the 
watermarks, that should work well in the cases where number of splits is 
determined/known from the beginning.

> FLIP-27 sources are generating non-deterministic results when using event time
> --
>
> Key: FLINK-23011
> URL: https://issues.apache.org/jira/browse/FLINK-23011
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.14.0, 1.13.1, 1.12.4
> Environment: 
>Reporter: Piotr Nowojski
>Priority: Critical
>
> FLIP-27 sources currently start in the {{StreamStatus.IDLE}} state and they 
> switch to {{ACTIVE}} only after emitting first {{Watermark}}. Until this 
> happens, downstream operators are ignoring {{IDLE}} inputs from calculating 
> the input (min) watermark. 
> An extreme example to what problem this leads to, are completely bogus 
> results if for example one FLIP-27 source subtask is slower than others for 
> some reason:
> {code:java}
> env.getConfig().setAutoWatermarkInterval(2000);
> env.setParallelism(2);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
> 10));
> DataStream eventStream =
> env.fromSource(
> new NumberSequenceSource(0, Long.MAX_VALUE),
> WatermarkStrategy.forMonotonousTimestamps()
> .withTimestampAssigner(new 
> LongTimestampAssigner()),
> "NumberSequenceSource")
> .map(
> new RichMapFunction() {
> @Override
> public Long map(Long value) throws Exception {
> if 
> (getRuntimeContext().getIndexOfThisSubtask() == 0) {
> Thread.sleep(1);
> }
> return 1L;
> }
> });
> eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print();
> (...)
> private static class LongTimestampAssigner implements 
> SerializableTimestampAssigner {
> private long counter = 0;
> @Override
> public long extractTimestamp(Long record, long recordTimeStamp) {
> return counter++;
> }
> }
> {code}
> In such case, after 2 seconds ({{setAutoWatermarkInterval}}) the not 
> throttled subtask (subTaskId == 1) generates very high watermarks. The other 
> source subtask (subTaskId == 0) emits very low watermarks. If the non 
> throttled watermark reaches the downstream {{WindowOperator}} first, while 
> the other input channel is still idle, it will take those high watermarks as 
> combined input watermark for the the whole {{WindowOperator}}. When the input 
> channel from the throttled source subtask finally receives it's {{ACTIVE}} 
> status and a much lower watermark, that's already too late.
> Actual output of the example program:
> {noformat}
> 1596
> 2000
> 1000
> 1000
> 1000
> 1000
> 1000
> 1000
> (...)
> {noformat}
> while the expected output should be always "2000" (2000 records fitting in 
> every 1 second global window)
> {noformat}
> 2000
> 2000
> 2000
> 2000
> (...)
> {noformat}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-23011) FLIP-27 sources are generating non-deterministic results when using event time

2021-06-16 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364191#comment-17364191
 ] 

Piotr Nowojski edited comment on FLINK-23011 at 6/16/21, 11:27 AM:
---

Even after fixing FLINK-22926 the problem will persist, as there still would be 
a race condition between registering splits/switching to ACTIVE and emitting 
watermarks between different subtasks.

Hotfix might be to force FLIP-27 source to start ACTIVE (as the legacy sources 
are doing). However this doesn't work if there will be more then one split 
assigned to single {{SourceReader}}, and there can be a delay between assigning 
first and second split.

The problem is that if source doesn't know about some splits, because they 
haven't been yet assigned (or even discovered), it can not know what watermarks 
would result from those unknown splits. This gets more visible if you think 
about some source, where discovering splits takes long time, for example some 
{{FileSource}}.

I think the proper solution should be something like {{SplitEnumerator}} 
emitting it's own watermarks, that would be capping/combined with the 
watermarks emitted from the sources.


was (Author: pnowojski):
Even after fixing FLINK-22926 the problem will persist, as there still would be 
a race condition between registering splits/switching to ACTIVE and emitting 
watermarks between different subtasks.

Hotfix might be to force FLIP-27 source to start ACTIVE (as the legacy sources 
are doing). However this doesn't work if there will be more then one split 
assigned to single {{SourceReader}}, and there can be a delay between assigning 
first and second split.

The problem is that if source doesn't know about some splits, because they 
haven't been yet assigned (or even discovered), it can not know what watermarks 
would result from those unknown splits. This gets more visible if you think 
about some source, where discovering splits takes long time, for example some 
{{FileSource}}.

I think the proper solution should be something like {{SplitEnumerator}} 
emitting it's own watermarks, that would be combined with the watermarks 
emitted from the sources.

> FLIP-27 sources are generating non-deterministic results when using event time
> --
>
> Key: FLINK-23011
> URL: https://issues.apache.org/jira/browse/FLINK-23011
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.14.0, 1.13.1, 1.12.4
> Environment: 
>Reporter: Piotr Nowojski
>Priority: Critical
>
> FLIP-27 sources currently start in the {{StreamStatus.IDLE}} state and they 
> switch to {{ACTIVE}} only after emitting first {{Watermark}}. Until this 
> happens, downstream operators are ignoring {{IDLE}} inputs from calculating 
> the input (min) watermark. 
> An extreme example to what problem this leads to, are completely bogus 
> results if for example one FLIP-27 source subtask is slower than others for 
> some reason:
> {code:java}
> env.getConfig().setAutoWatermarkInterval(2000);
> env.setParallelism(2);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
> 10));
> DataStream eventStream =
> env.fromSource(
> new NumberSequenceSource(0, Long.MAX_VALUE),
> WatermarkStrategy.forMonotonousTimestamps()
> .withTimestampAssigner(new 
> LongTimestampAssigner()),
> "NumberSequenceSource")
> .map(
> new RichMapFunction() {
> @Override
> public Long map(Long value) throws Exception {
> if 
> (getRuntimeContext().getIndexOfThisSubtask() == 0) {
> Thread.sleep(1);
> }
> return 1L;
> }
> });
> eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print();
> (...)
> private static class LongTimestampAssigner implements 
> SerializableTimestampAssigner {
> private long counter = 0;
> @Override
> public long extractTimestamp(Long record, long recordTimeStamp) {
> return counter++;
> }
> }
> {code}
> In such case, after 2 seconds ({{setAutoWatermarkInterval}}) the not 
> throttled subtask (subTaskId == 1) generates very high watermarks. The other 
> source subtask (subTaskId == 0) emits very low watermarks. If the non 
> throttled watermark reaches the downstream {{WindowOperator}} first, while 
> the other input channel is still idle, it will take those high watermarks as 
> combined input watermark for the the whole 

[jira] [Comment Edited] (FLINK-23011) FLIP-27 sources are generating non-deterministic results when using event time

2021-06-16 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364191#comment-17364191
 ] 

Piotr Nowojski edited comment on FLINK-23011 at 6/16/21, 11:24 AM:
---

Even after fixing FLINK-22926 the problem will persist, as there still would be 
a race condition between registering splits/switching to ACTIVE and emitting 
watermarks between different subtasks.

Hotfix might be to force FLIP-27 source to start ACTIVE (as the legacy sources 
are doing). However this doesn't work if there will be more then one split 
assigned to single {{SourceReader}}, and there can be a delay between assigning 
first and second split.

The problem is that if source doesn't know about some splits, because they 
haven't been yet assigned (or even discovered), it can not know what watermarks 
would result from those unknown splits. This gets more visible if you think 
about some source, where discovering splits takes long time, for example some 
{{FileSource}}.

I think the proper solution should be something like {{SplitEnumerator}} 
emitting it's own watermarks, that would be combined with the watermarks 
emitted from the sources.


was (Author: pnowojski):
Even after fixing FLINK-22926 the problem will persist, as there still would be 
a race condition between registering splits/switching to ACTIVE and emitting 
watermarks between different subtasks.

Hotfix might be to force FLIP-27 source to start ACTIVE (as the legacy sources 
are doing). However it would be only hiding the real problem.

The problem is that if source doesn't know about some splits, because they 
haven't been yet assigned (or even discovered), it can not know what watermarks 
would result from those unknown splits. This gets more visible if you think 
about some source, where discovering splits takes long time, for example some 
{{FileSource}}.

I think the proper solution should be something like {{SplitEnumerator}} 
emitting it's own watermarks, that would be combined with the watermarks 
emitted from the sources.

> FLIP-27 sources are generating non-deterministic results when using event time
> --
>
> Key: FLINK-23011
> URL: https://issues.apache.org/jira/browse/FLINK-23011
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.14.0, 1.13.1, 1.12.4
> Environment: 
>Reporter: Piotr Nowojski
>Priority: Critical
>
> FLIP-27 sources currently start in the {{StreamStatus.IDLE}} state and they 
> switch to {{ACTIVE}} only after emitting first {{Watermark}}. Until this 
> happens, downstream operators are ignoring {{IDLE}} inputs from calculating 
> the input (min) watermark. 
> An extreme example to what problem this leads to, are completely bogus 
> results if for example one FLIP-27 source subtask is slower than others for 
> some reason:
> {code:java}
> env.getConfig().setAutoWatermarkInterval(2000);
> env.setParallelism(2);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
> 10));
> DataStream eventStream =
> env.fromSource(
> new NumberSequenceSource(0, Long.MAX_VALUE),
> WatermarkStrategy.forMonotonousTimestamps()
> .withTimestampAssigner(new 
> LongTimestampAssigner()),
> "NumberSequenceSource")
> .map(
> new RichMapFunction() {
> @Override
> public Long map(Long value) throws Exception {
> if 
> (getRuntimeContext().getIndexOfThisSubtask() == 0) {
> Thread.sleep(1);
> }
> return 1L;
> }
> });
> eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print();
> (...)
> private static class LongTimestampAssigner implements 
> SerializableTimestampAssigner {
> private long counter = 0;
> @Override
> public long extractTimestamp(Long record, long recordTimeStamp) {
> return counter++;
> }
> }
> {code}
> In such case, after 2 seconds ({{setAutoWatermarkInterval}}) the not 
> throttled subtask (subTaskId == 1) generates very high watermarks. The other 
> source subtask (subTaskId == 0) emits very low watermarks. If the non 
> throttled watermark reaches the downstream {{WindowOperator}} first, while 
> the other input channel is still idle, it will take those high watermarks as 
> combined input watermark for the the whole {{WindowOperator}}. When the input 
> channel from the throttled source subtask finally receives it's {{ACTIVE}} 
> status and a much 

[jira] [Commented] (FLINK-23011) FLIP-27 sources are generating non-deterministic results when using event time

2021-06-16 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364245#comment-17364245
 ] 

Piotr Nowojski commented on FLINK-23011:


Let me elaborate on {{FileSource}} example. Let's say we have a bucketed file 
source where, split equals to a single file, with buckets (directories?) 
created per each hour. New files/buckets can be appearing as you go. 

If bucket for [12:00, 13:00) is committed, {{SplitEnumerator}} could emit 
capped watermark for 13:00, and the already assigned splits will be bumping the 
current watermarks until this cap, as the splits are being read. Then next 
bucket [13:00, 14:00) is created, {{SourceReaders}} could even start reading 
files from this bucket before it’s fully committed, but the watermark cap is 
bumped only when this next bucket is committed.

It's also important in this case to automatically switch {{SourceReader}} to 
idle if they don't have assigned splits. As for whatever the reason, for some 
buckets there can be fewer splits than parallel instances of the 
{{SourceReader}}s. In this case you need idleness to make progress.

> FLIP-27 sources are generating non-deterministic results when using event time
> --
>
> Key: FLINK-23011
> URL: https://issues.apache.org/jira/browse/FLINK-23011
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.14.0, 1.13.1, 1.12.4
> Environment: 
>Reporter: Piotr Nowojski
>Priority: Critical
>
> FLIP-27 sources currently start in the {{StreamStatus.IDLE}} state and they 
> switch to {{ACTIVE}} only after emitting first {{Watermark}}. Until this 
> happens, downstream operators are ignoring {{IDLE}} inputs from calculating 
> the input (min) watermark. 
> An extreme example to what problem this leads to, are completely bogus 
> results if for example one FLIP-27 source subtask is slower than others for 
> some reason:
> {code:java}
> env.getConfig().setAutoWatermarkInterval(2000);
> env.setParallelism(2);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
> 10));
> DataStream eventStream =
> env.fromSource(
> new NumberSequenceSource(0, Long.MAX_VALUE),
> WatermarkStrategy.forMonotonousTimestamps()
> .withTimestampAssigner(new 
> LongTimestampAssigner()),
> "NumberSequenceSource")
> .map(
> new RichMapFunction() {
> @Override
> public Long map(Long value) throws Exception {
> if 
> (getRuntimeContext().getIndexOfThisSubtask() == 0) {
> Thread.sleep(1);
> }
> return 1L;
> }
> });
> eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print();
> (...)
> private static class LongTimestampAssigner implements 
> SerializableTimestampAssigner {
> private long counter = 0;
> @Override
> public long extractTimestamp(Long record, long recordTimeStamp) {
> return counter++;
> }
> }
> {code}
> In such case, after 2 seconds ({{setAutoWatermarkInterval}}) the not 
> throttled subtask (subTaskId == 1) generates very high watermarks. The other 
> source subtask (subTaskId == 0) emits very low watermarks. If the non 
> throttled watermark reaches the downstream {{WindowOperator}} first, while 
> the other input channel is still idle, it will take those high watermarks as 
> combined input watermark for the the whole {{WindowOperator}}. When the input 
> channel from the throttled source subtask finally receives it's {{ACTIVE}} 
> status and a much lower watermark, that's already too late.
> Actual output of the example program:
> {noformat}
> 1596
> 2000
> 1000
> 1000
> 1000
> 1000
> 1000
> 1000
> (...)
> {noformat}
> while the expected output should be always "2000" (2000 records fitting in 
> every 1 second global window)
> {noformat}
> 2000
> 2000
> 2000
> 2000
> (...)
> {noformat}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22974) No execution checkpointing config desc in flink-conf.yaml

2021-06-16 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364246#comment-17364246
 ] 

Yun Tang commented on FLINK-22974:
--

Thanks for the reminder, I'll take a look at the PR.

> No execution checkpointing config desc in flink-conf.yaml 
> --
>
> Key: FLINK-22974
> URL: https://issues.apache.org/jira/browse/FLINK-22974
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.13.1
>Reporter: tinawenqiao
>Assignee: tinawenqiao
>Priority: Minor
>
> We found that there is no parameter description for execution checkpointing 
> in flink-conf.yaml. It may cause a misunderstanding.  I think we can add some 
> important parameters, such as 
> execution.checkpointing.interval,execution.checkpointing.externalized-checkpoint-retention,execution.checkpointing.tolerable-failed-checkpoints
>  etc.
>  
> #==
>  # Fault tolerance and checkpointing
>  
> #==
>  # The backend that will be used to store operator state checkpoints if
>  # checkpointing is enabled. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22974) No execution checkpointing config desc in flink-conf.yaml

2021-06-16 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-22974:


Assignee: tinawenqiao

> No execution checkpointing config desc in flink-conf.yaml 
> --
>
> Key: FLINK-22974
> URL: https://issues.apache.org/jira/browse/FLINK-22974
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.13.1
>Reporter: tinawenqiao
>Assignee: tinawenqiao
>Priority: Minor
>
> We found that there is no parameter description for execution checkpointing 
> in flink-conf.yaml. It may cause a misunderstanding.  I think we can add some 
> important parameters, such as 
> execution.checkpointing.interval,execution.checkpointing.externalized-checkpoint-retention,execution.checkpointing.tolerable-failed-checkpoints
>  etc.
>  
> #==
>  # Fault tolerance and checkpointing
>  
> #==
>  # The backend that will be used to store operator state checkpoints if
>  # checkpointing is enabled. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19863) SQLClientHBaseITCase.testHBase failed with "java.io.IOException: Process failed due to timeout"

2021-06-16 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364241#comment-17364241
 ] 

Xintong Song commented on FLINK-19863:
--

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=19020=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529=29093

> SQLClientHBaseITCase.testHBase failed with "java.io.IOException: Process 
> failed due to timeout"
> ---
>
> Key: FLINK-19863
> URL: https://issues.apache.org/jira/browse/FLINK-19863
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: 1.12.0, 1.12.3
>Reporter: Dian Fu
>Priority: Critical
>  Labels: auto-unassigned, pull-request-available, stale-critical, 
> test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8541=logs=91bf6583-3fb2-592f-e4d4-d79d79c3230a=3425d8ba-5f03-540a-c64b-51b8481bf7d6
> {code}
> 00:50:02,589 [main] INFO  
> org.apache.flink.tests.util.flink.FlinkDistribution  [] - Stopping 
> Flink cluster.
> 00:50:04,106 [main] INFO  
> org.apache.flink.tests.util.flink.FlinkDistribution  [] - Stopping 
> Flink cluster.
> 00:50:04,741 [main] INFO  
> org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource [] - Backed up 
> logs to 
> /home/vsts/work/1/s/flink-end-to-end-tests/artifacts/flink-b3924665-1ac9-4309-8255-20f0dc94e7b9.
> 00:50:04,788 [main] INFO  
> org.apache.flink.tests.util.hbase.LocalStandaloneHBaseResource [] - Stopping 
> HBase Cluster
> 00:50:16,243 [main] ERROR 
> org.apache.flink.tests.util.hbase.SQLClientHBaseITCase   [] - 
> 
> Test testHBase[0: 
> hbase-version:1.4.3](org.apache.flink.tests.util.hbase.SQLClientHBaseITCase) 
> failed with:
> java.io.IOException: Process failed due to timeout.
>   at 
> org.apache.flink.tests.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:130)
>   at 
> org.apache.flink.tests.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:108)
>   at 
> org.apache.flink.tests.util.flink.FlinkDistribution.submitSQLJob(FlinkDistribution.java:221)
>   at 
> org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource$StandaloneClusterController.submitSQLJob(LocalStandaloneFlinkResource.java:196)
>   at 
> org.apache.flink.tests.util.hbase.SQLClientHBaseITCase.executeSqlStatements(SQLClientHBaseITCase.java:215)
>   at 
> org.apache.flink.tests.util.hbase.SQLClientHBaseITCase.testHBase(SQLClientHBaseITCase.java:152)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12398) Support partitioned view in catalog API

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-12398:
---
  Labels: auto-deprioritized-major  (was: stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Support partitioned view in catalog API
> ---
>
> Key: FLINK-12398
> URL: https://issues.apache.org/jira/browse/FLINK-12398
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Bowen Li
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Partitioned view is not a rare thing in common databases:
> SQL Server: 
> https://docs.microsoft.com/en-us/sql/t-sql/statements/create-view-transact-sql?view=sql-server-2017#partitioned-views
> Oracle: 
> https://docs.oracle.com/cd/A57673_01/DOC/server/doc/A48506/partview.htm
> Hive: https://cwiki.apache.org/confluence/display/Hive/PartitionedViews
> The work may include moving {{isPartitioend()}} and {{getPartitionKeys}} from 
> {{CatalogTable}} to {{CatalogBaseTable}}, and other changes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12467) DataStreamAllroundTestProgram should define pause between checkpoints

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-12467:
---
  Labels: auto-deprioritized-major auto-unassigned  (was: auto-unassigned 
stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> DataStreamAllroundTestProgram should define pause between checkpoints
> -
>
> Key: FLINK-12467
> URL: https://issues.apache.org/jira/browse/FLINK-12467
> Project: Flink
>  Issue Type: Bug
>  Components: Test Infrastructure
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned
>
> Since the {{DataStreamAllroundTestProgram}} does not define a minimum pause 
> between checkpoints it can happen that the {{CheckpointCoordinator}} attempts 
> to start a checkpoint while another one is still on-going.
> This results in a failed checkpoint if this checkpoint exceeds the maximum 
> number of concurrent checkpoints, which in our e2e tests results in a test 
> failure since the logs contain exceptions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12441) Add column stats for decimal type

2021-06-16 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-12441:
---
  Labels: auto-deprioritized-major  (was: stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Add column stats for decimal type
> -
>
> Key: FLINK-12441
> URL: https://issues.apache.org/jira/browse/FLINK-12441
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Xuefu Zhang
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> As a followup for FLINK-12365, add column stats type for decimal (e.g. 
> CatalogColumnStatisticsDataDecimal).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22886) Thread leak in RocksDBStateUploader

2021-06-16 Thread Yue Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364229#comment-17364229
 ] 

Yue Ma commented on FLINK-22886:


[~roman_khachatryan] thx, I will solve it soon

> Thread leak in RocksDBStateUploader
> ---
>
> Key: FLINK-22886
> URL: https://issues.apache.org/jira/browse/FLINK-22886
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.3, 1.13.1, 1.12.4
>Reporter: Jiayi Liao
>Assignee: Yue Ma
>Priority: Major
>  Labels: critical, pull-request-available
> Attachments: image-2021-06-06-13-46-34-604.png
>
>
> {{ExecutorService}} in {{RocksDBStateUploader}} is not shut down, which may 
> leak thread when tasks fail.
> BTW, we should name the thread group in {{ExecutorService}}, otherwise what 
> we see in the stack, is a lot of threads named with pool-m-thread-n like this:
>  
> !image-2021-06-06-13-46-34-604.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   >