[jira] [Created] (FLINK-22994) Improve the performance of invoking nesting udf

2021-06-15 Thread lynn1.zhang (Jira)
lynn1.zhang created FLINK-22994:
---

 Summary: Improve the performance of invoking nesting udf
 Key: FLINK-22994
 URL: https://issues.apache.org/jira/browse/FLINK-22994
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.12.4
 Environment: h5.  
Reporter: lynn1.zhang
 Attachments: image-2021-06-15-15-18-12-619.png, 
image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, 
image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, 
image-2021-06-15-15-30-14-775.png

h1. BackGround

Flink maintain the udf result as binary, like BinaryStringData. When invoking 
nesting udf like select useless(int_ip_2_string(ip)), the result of 
int_ip_2_string(ip) will be serialization and deserialization.

Below is the Generated Code

!image-2021-06-15-15-18-12-619.png!  this issue want to improve it as below

!image-2021-06-15-15-19-01-103.png!
h1. Performance Compare

Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = 
1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
CPU @ 2.30GHz
UDF Introduction:
 * ipip:  input: int ip,output: map ip_info,map size = 14。
 * ip_2_country: input map ip_info,output: string country。
 * ip_2_region: input  map ip_info,output: string region。
 * ip_2_isp_domain: input  map ip_info,output: string isp。
 * ip_2_timezone: input map ip_info,output: string timezone。

h5. The throughput without udf invoke: 764.50 w/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s
!image-2021-06-15-15-28-28-137.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after the issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21833) TemporalRowTimeJoinOperator State Leak Although configure idle.state.retention.time

2021-03-16 Thread lynn1.zhang (Jira)
lynn1.zhang created FLINK-21833:
---

 Summary: TemporalRowTimeJoinOperator State Leak Although configure 
idle.state.retention.time
 Key: FLINK-21833
 URL: https://issues.apache.org/jira/browse/FLINK-21833
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.12.2
Reporter: lynn1.zhang
 Attachments: image-2021-03-17-11-06-21-768.png

Use TemporalRowTimeJoinOperator feature will lead to unlimited data expansion, 
although configure idle.state.retention.time

I have found the bug, and fixed it.

!image-2021-03-17-11-06-21-768.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21345) NullPointerException LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157

2021-02-09 Thread lynn1.zhang (Jira)
lynn1.zhang created FLINK-21345:
---

 Summary: NullPointerException 
LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
 Key: FLINK-21345
 URL: https://issues.apache.org/jira/browse/FLINK-21345
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.12.1
 Environment: Planner: BlinkPlanner
Flink Version: 1.12.1_2.11
Java Version: 1.8
OS: mac os
Reporter: lynn1.zhang


First Step: Create 2 Source Tables as below:
{code:java}
CREATE TABLE test_streaming(
 vid BIGINT,
 ts BIGINT,
 proc AS proctime()
) WITH (
 'connector' = 'kafka',
 'topic' = 'test-streaming',
 'properties.bootstrap.servers' = '127.0.0.1:9092',
 'scan.startup.mode' = 'latest-offset',
 'format' = 'json'
);
CREATE TABLE test_streaming2(
 vid BIGINT,
 ts BIGINT,
 proc AS proctime()
) WITH (
 'connector' = 'kafka',
 'topic' = 'test-streaming2',
 'properties.bootstrap.servers' = '127.0.0.1:9092',
 'scan.startup.mode' = 'latest-offset',
 'format' = 'json'
);
{code}
Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, 
timestamp:proctime()

Third Step, test_streaming union all  test_streaming2 join dim like below:

 
{code:java}
SELECT r.vid,d.name,timestamp_from_long(r.ts)
FROM (
SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2
) AS r,
LATERAL TABLE (dim(r.proc)) AS d
WHERE r.vid = d.vid;
{code}
Exception Detail:(if not union all, the program run ok)

 
{code:java}
Exception in thread "main" java.lang.NullPointerException
at 
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157)
at 
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at scala.collection.immutable.Range.foreach(Range.scala:166)
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at 

[jira] [Created] (FLINK-13072) RocksDBStateBachend is not thread safe and data loss silently

2019-07-02 Thread lynn1.zhang (JIRA)
lynn1.zhang created FLINK-13072:
---

 Summary: RocksDBStateBachend is not thread safe and data loss 
silently
 Key: FLINK-13072
 URL: https://issues.apache.org/jira/browse/FLINK-13072
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.8.1, 1.8.0
Reporter: lynn1.zhang
 Attachments: flink-demo.zip

I create 2 mapstates in one operator, then create 2 threads in apply method, 
each thread operate each map state(the operator is same), the expect result is 
that 2 state have the same result but not. I upload the code, please help to 
try it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)