[jira] [Created] (FLINK-35827) Equality bewteen a row field and a row constant is wrong in SQL

2024-07-12 Thread yisha zhou (Jira)
yisha zhou created FLINK-35827:
--

 Summary: Equality bewteen a row field and a row constant is wrong 
in SQL
 Key: FLINK-35827
 URL: https://issues.apache.org/jira/browse/FLINK-35827
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: yisha zhou


To reproduce the issue, you can add codes below in RowTypeTest
{code:java}
testAllApis(
  'f2 === row(2, "foo", true),
  "f2 = row(2, 'foo', true)",
  "true"
) {code}
f2 is actually the same as the constant `row(2, "foo", true)`, however the 
result of expression `f2 = row(2, 'foo', true)`  is false.   

The root cause is that `ScalarOperatorGens.generateEquals` generates code like 
`$leftTerm.equals($rightTerm)` for row types. However f2 may be a 
GenericRowData, the constant may be a BinaryRowData, the equality between them 
are false.
 
And after investigating the code, I believe logic in 
`EqualiserCodeGenerator.generateEqualsCode` can handle the issue here. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35357) Add "kubernetes.operator.plugins.listeners" parameter description to the Operator configuration document

2024-05-14 Thread Yang Zhou (Jira)
Yang Zhou created FLINK-35357:
-

 Summary: Add "kubernetes.operator.plugins.listeners" parameter 
description to the Operator configuration document
 Key: FLINK-35357
 URL: https://issues.apache.org/jira/browse/FLINK-35357
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Yang Zhou


In Flink Operator "Custom Flink Resource Listeners" in practice (doc: 
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/plugins/#custom-flink-resource
 -listeners)

It was found that the "Operator Configuration Reference" document did not 
explain the "Custom Flink Resource Listeners" configuration parameters.

So I wanted to come up with adding: 
kubernetes.operator.plugins.listeners..class: 
 , after all it is useful.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35066) TwoInputOperator in IterationBody cannot use keyBy

2024-04-09 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-35066:


 Summary: TwoInputOperator in IterationBody cannot use keyBy
 Key: FLINK-35066
 URL: https://issues.apache.org/jira/browse/FLINK-35066
 Project: Flink
  Issue Type: Technical Debt
  Components: Library / Machine Learning
Affects Versions: ml-2.3.0
Reporter: Yunfeng Zhou


Implementing a UDF KeyedRichCoProcessFunction or CoFlatMapFunction inside 
IterationBody yields a “java.lang.ClassCastException: 
org.apache.flink.iteration.IterationRecord cannot be cast to class 
org.apache.flink.api.java.tuple.Tuple” error.
More details about this bug can be found at 
https://lists.apache.org/thread/bgkw1g2tdgnp1xy1clsqtcfs3h18pkd6



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35037) Optimize uniqueKeys and upsertKeys inference of windows with ROW_NUMBER

2024-04-07 Thread yisha zhou (Jira)
yisha zhou created FLINK-35037:
--

 Summary: Optimize uniqueKeys and upsertKeys inference of windows 
with ROW_NUMBER
 Key: FLINK-35037
 URL: https://issues.apache.org/jira/browse/FLINK-35037
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.20.0
Reporter: yisha zhou


In current Implementation, relNodes with Window type will only deliver 
upsert/unique keys of their inputs if these keys contains the partition keys.

However windows with ROW_NUMBER can also produce upsert/unique keys.

For example:
{code:java}
select id, name, score, age, class,
    row_number() over(partition by class order by name) as rn,
    rank() over (partition by class order by score) as rk,
    dense_rank() over (partition by class order by score) as drk,
    avg(score) over (partition by class order by score) as avg_score,
    max(score) over (partition by age) as max_score,
    count(id) over (partition by age) as cnt
from student {code}
(class, rn) is a valid uniqueKeys candidate. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34976) LD_PRELOAD environment may not be effective after su to flink user

2024-03-31 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-34976:
-

 Summary: LD_PRELOAD environment may not be effective after su to 
flink user
 Key: FLINK-34976
 URL: https://issues.apache.org/jira/browse/FLINK-34976
 Project: Flink
  Issue Type: New Feature
  Components: flink-docker
Affects Versions: 1.19.0
Reporter: xiaogang zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34656) Generated code for `ITEM` operator should return null when getting element of a null map/array/row

2024-03-12 Thread yisha zhou (Jira)
yisha zhou created FLINK-34656:
--

 Summary: Generated code for `ITEM` operator should return null 
when getting element of a null map/array/row
 Key: FLINK-34656
 URL: https://issues.apache.org/jira/browse/FLINK-34656
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.20.0
Reporter: yisha zhou


In FieldAccessFromTableITCase we can find that the expected result of f0[1] is 
null when f0 is a null array. 

However, behavior in generated code for ITEM is not consistent with case above. 
The main code is:

 
{code:java}
val arrayAccessCode =
  s"""
 |${array.code}
 |${index.code}
 |boolean $nullTerm = ${array.nullTerm} || ${index.nullTerm} ||
 |   $idxStr < 0 || $idxStr >= ${array.resultTerm}.size() || $arrayIsNull;
 |$resultTypeTerm $resultTerm = $nullTerm ? $defaultTerm : $arrayGet;
 |""".stripMargin {code}
If `array.nullTerm` is true, a default value of element type will be returned, 
for example -1 for null bigint array.

The reason why FieldAccessFromTableITCase can get expected result is that the 
ReduceExpressionsRule generated an expression code for that case like:
{code:java}
boolean isNull$0 = true || false ||
   ((int) 1) - 1 < 0 || ((int) 1) - 1 >= 
((org.apache.flink.table.data.ArrayData) null).size() || 
((org.apache.flink.table.data.ArrayData) null).isNullAt(((int) 1) - 1);
long result$0 = isNull$0 ? -1L : ((org.apache.flink.table.data.ArrayData) 
null).getLong(((int) 1) - 1);
if (isNull$0) {
  out.setField(0, null);
} else {
  out.setField(0, result$0);
} {code}
The reduced expr will be a null literal.
 

I think the behaviors for getting element of a null value should be unified.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34536) Support reading long value as Timestamp column in JSON format

2024-02-28 Thread yisha zhou (Jira)
yisha zhou created FLINK-34536:
--

 Summary: Support reading long value as Timestamp column in JSON 
format
 Key: FLINK-34536
 URL: https://issues.apache.org/jira/browse/FLINK-34536
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.19.0
Reporter: yisha zhou


In many scenarios, timestamp data is stored as Long value and expected to be 
operated as Timestamp value. It's not user-friendly to use an UDF to convert 
the data before operating it.

Meanwhile, in Avro format, it seems it can receive several types of input and 
convert it into TimestampData. Hope the same ability can be introduced into 
JSON format.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34529) Projection cannot be pushed down through rank operator.

2024-02-27 Thread yisha zhou (Jira)
yisha zhou created FLINK-34529:
--

 Summary: Projection cannot be pushed down through rank operator.
 Key: FLINK-34529
 URL: https://issues.apache.org/jira/browse/FLINK-34529
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.19.0
Reporter: yisha zhou


When there is a rank/deduplicate operator, the projection based on output of 
this operator cannot be pushed down to the input of it.

The following code can help reproducing the issue:
{code:java}
val util = streamTestUtil() 
util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c)
util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f)
val sql =
  """
|SELECT a FROM (
|  SELECT a, f,
|  ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num
|  FROM  T1, T2
|  WHERE T1.a = T2.d
|)
|WHERE rank_num = 1
  """.stripMargin

util.verifyPlan(sql){code}
The plan is expected to be:
{code:java}
Calc(select=[a])
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
select=[a, c, f])
   +- Exchange(distribution=[hash[f]])
  +- Calc(select=[a, c, f])
 +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
:  +- Calc(select=[a, c])
: +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[d]])
   +- Calc(select=[d, f])
  +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) 
{code}
Notice that the 'select' of Join operator is [a, c, d, f]. However the actual 
plan is:
{code:java}
Calc(select=[a])
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
select=[a, c, f])
   +- Exchange(distribution=[hash[f]])
  +- Calc(select=[a, c, f])
 +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, 
f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
:  +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[d]])
   +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
 {code}
the 'select' of Join operator is [a, b, c, d, e, f], which means the projection 
in the final Calc is not passed through the Rank.

And I think an easy way to fix this issue is to add 
org.apache.calcite.rel.rules.ProjectWindowTransposeRule into 
FlinkStreamRuleSets.LOGICAL_OPT_RULES.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34519) Refine checkpoint scheduling and canceling logic

2024-02-26 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-34519:


 Summary: Refine checkpoint scheduling and canceling logic
 Key: FLINK-34519
 URL: https://issues.apache.org/jira/browse/FLINK-34519
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Checkpointing
Affects Versions: 1.20.0
Reporter: Yunfeng Zhou


In the current implementation, CheckpointCoordinator#startCheckpointScheduler 
would stop the checkpoint scheduler before starting it, and 
CheckpointCoordinator#stopCheckpointScheduler would cancel all ongoing and 
pending checkpoints. When a stop-with-savepoint request is received, checkpoint 
coordinator would trigger stopCheckpointScheduler before creating the 
savepoint, and start the scheduler afterwards if the savepoint fails.

The problem with this behavior is that it mixed up different checkpointing 
types. For example, stopCheckpointScheduler() only needs to cancel previous 
periodic checkpoints, while the current behavior cancels ongoing savepoints as 
well. This behavior is still acceptable for now, given that periodic 
checkpointing is enabled so long as a job is running, and two users would 
hardly trigger savepoints at the same time. However, as the Batch-Streaming 
Unification optimizations need to change some of these assumptions, the 
checkpoint coordinator should fix this problem.

To be exact, checkpoint coordinator should at least distinguish between the 
following semantics.
 - Periodic checkpoint is enabled to ensure that failover recovery time should 
be kept within a time limit.
 - Periodic checkpoint is disabled to reduce corresponding performance 
overhead, but the ability to checkpoint still exists and users can trigger a 
savepoint anytime.
 - Checkpoint or savepoint is not allowed due to job status or topological 
requirements.

It should also be supported for a Flink job to change between the checkpointing 
semantics mentioned above dynamically during runtime.

Besides, checkpoints canceled in stopCheckpointScheduler() would fail with an 
error message saying "Checkpoint Coordinator is suspending", which is ambiguous 
for debugging. The detailed reason should be recorded as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34406) Expose more RuntimeContext functionalities in FunctionContext

2024-02-07 Thread yisha zhou (Jira)
yisha zhou created FLINK-34406:
--

 Summary: Expose more RuntimeContext functionalities in 
FunctionContext
 Key: FLINK-34406
 URL: https://issues.apache.org/jira/browse/FLINK-34406
 Project: Flink
  Issue Type: New Feature
Reporter: yisha zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34402) Class loading conflicts when using PowerMock in ITcase.

2024-02-06 Thread yisha zhou (Jira)
yisha zhou created FLINK-34402:
--

 Summary:  Class loading conflicts when using PowerMock in ITcase.
 Key: FLINK-34402
 URL: https://issues.apache.org/jira/browse/FLINK-34402
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.19.0
Reporter: yisha zhou
 Fix For: 1.19.0


Currently when no user jars exist, system classLoader will be used to load 
classes as default. However, if we use powerMock to create some ITCases, the 
framework will utilize JavassistMockClassLoader to load classes.  Forcing the 
use of the system classLoader can lead to class loading conflict issue.

Therefore we should use Thread.currentThread().getContextClassLoader() instead 
of 
ClassLoader.getSystemClassLoader() here.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34371) FLIP-331: Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment

2024-02-05 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-34371:


 Summary: FLIP-331: Support EndOfStreamTrigger and 
isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment
 Key: FLINK-34371
 URL: https://issues.apache.org/jira/browse/FLINK-34371
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing, Runtime / Task
Reporter: Yunfeng Zhou


This is an umbrella ticket for FLIP-331.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34077) Sphinx version needs upgrade

2024-01-14 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-34077:


 Summary: Sphinx version needs upgrade
 Key: FLINK-34077
 URL: https://issues.apache.org/jira/browse/FLINK-34077
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.19.0
Reporter: Yunfeng Zhou


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357&view=logs&j=9cada3cb-c1d3-5621-16da-0f718fb86602&t=c67e71ed-6451-5d26-8920-5a8cf9651901]
 
{code:java}
 Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d 
_build/doctrees -a -W . _build/htmlJan 14 15:49:17 Running Sphinx v4.5.0
Jan 14 15:49:17Jan 14 15:49:17 Sphinx version error:Jan 14 15:49:17 The 
sphinxcontrib.applehelp extension used by this project needs at least Sphinx 
v5.0; it therefore cannot be built with this version.Jan 14 15:49:17 
Makefile:76: recipe for target 'html' failedJan 14 15:49:17 make: *** 
[html] Error 2Jan 14 15:49:18 ==sphinx checks... 
[FAILED]=== {code}
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34015) execution.savepoint.ignore-unclaimed-state is invalid when passing this parameter by dynamic properties

2024-01-07 Thread Renxiang Zhou (Jira)
Renxiang Zhou created FLINK-34015:
-

 Summary: execution.savepoint.ignore-unclaimed-state is invalid 
when passing this parameter by dynamic properties
 Key: FLINK-34015
 URL: https://issues.apache.org/jira/browse/FLINK-34015
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.17.0
Reporter: Renxiang Zhou
 Attachments: image-2024-01-08-14-22-09-758.png, 
image-2024-01-08-14-24-30-665.png, image-2024-01-08-14-29-04-347.png

We set `execution.savepoint.ignore-unclaimed-state` to true and use -D option 
to submit the job, but unfortunately we found the value is still false in 
jobmanager log.

Pic 1: we  set `execution.savepoint.ignore-unclaimed-state` to true in 
submiting job.
!image-2024-01-08-14-22-09-758.png|width=1012,height=222!

Pic 2: The value is still false in jmlog.

!image-2024-01-08-14-24-30-665.png|width=651,height=51!

 

Besides, the parameter `execution.savepoint-restore-mode` has the same problem 
since when we pass it by -D option.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33741) introduce stat dump period and statsLevel configuration

2023-12-04 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-33741:
-

 Summary: introduce stat dump period and statsLevel configuration
 Key: FLINK-33741
 URL: https://issues.apache.org/jira/browse/FLINK-33741
 Project: Flink
  Issue Type: New Feature
Reporter: xiaogang zhou


I'd like to introduce 2 rocksdb statistic related configuration.

Then we can customize stats
{code:java}
// code placeholder
Statistics s = new Statistics();
s.setStatsLevel(EXCEPT_TIME_FOR_MUTEX);
currentOptions.setStatsDumpPeriodSec(internalGetOption(RocksDBConfigurableOptions.STATISTIC_DUMP_PERIOD))
.setStatistics(s); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2023-12-03 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-33728:
-

 Summary: do not rewatch when KubernetesResourceManagerDriver watch 
fail
 Key: FLINK-33728
 URL: https://issues.apache.org/jira/browse/FLINK-33728
 Project: Flink
  Issue Type: New Feature
  Components: Deployment / Kubernetes
Reporter: xiaogang zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33249) comment should be parsed by StringLiteral() instead of SqlCharStringLiteral to avoid parsing failure

2023-10-11 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-33249:
-

 Summary: comment should be parsed by StringLiteral() instead of 
SqlCharStringLiteral to avoid parsing failure
 Key: FLINK-33249
 URL: https://issues.apache.org/jira/browse/FLINK-33249
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.17.1
Reporter: xiaogang zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33174) enabling tablesample bernoulli in flink

2023-10-01 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-33174:
-

 Summary: enabling tablesample bernoulli in flink
 Key: FLINK-33174
 URL: https://issues.apache.org/jira/browse/FLINK-33174
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.17.1
Reporter: xiaogang zhou


I'd like to introduce a table sample function to enable fast sampling to 
streamings. 

this is enlighted by https://issues.apache.org/jira/browse/CALCITE-5971



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33162) seperate the executor in DefaultDispatcherResourceManagerComponentFactory for MetricFetcher and webMonitorEndpoint

2023-09-26 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-33162:
-

 Summary: seperate the executor in 
DefaultDispatcherResourceManagerComponentFactory for MetricFetcher and 
webMonitorEndpoint
 Key: FLINK-33162
 URL: https://issues.apache.org/jira/browse/FLINK-33162
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / REST
Affects Versions: 1.16.0
Reporter: xiaogang zhou
 Fix For: 1.19.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33038) remove getMinRetentionTime in StreamExecDeduplicate

2023-09-05 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-33038:
-

 Summary: remove getMinRetentionTime in StreamExecDeduplicate
 Key: FLINK-33038
 URL: https://issues.apache.org/jira/browse/FLINK-33038
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: xiaogang zhou
 Fix For: 1.19.0


I suggest to remove the 

StreamExecDeduplicate method in StreamExecDeduplicate as the ttl is controlled 
by the state meta data.

 

Please let me take the issue if possible



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32998) if function result not correct

2023-08-30 Thread zhou (Jira)
zhou created FLINK-32998:


 Summary: if function result not correct
 Key: FLINK-32998
 URL: https://issues.apache.org/jira/browse/FLINK-32998
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.15.4
Reporter: zhou
 Attachments: image-2023-08-30-18-29-16-277.png, 
image-2023-08-30-18-30-05-568.png

!image-2023-08-30-18-29-16-277.png!

!image-2023-08-30-18-30-05-568.png!

if function result not correct,not result in origin field value, cut off the 
filed(word) value 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32881) Client supports making savepoints in detach mode

2023-08-16 Thread Renxiang Zhou (Jira)
Renxiang Zhou created FLINK-32881:
-

 Summary: Client supports making savepoints in detach mode
 Key: FLINK-32881
 URL: https://issues.apache.org/jira/browse/FLINK-32881
 Project: Flink
  Issue Type: Improvement
  Components: API / State Processor, Client / Job Submission
Affects Versions: 1.19.0
Reporter: Renxiang Zhou
 Fix For: 1.19.0
 Attachments: image-2023-08-16-17-14-34-740.png, 
image-2023-08-16-17-14-44-212.png

When triggering a savepoint using the command-line tool, the client needs to 
wait for the job to finish creating the savepoint before it can exit. For jobs 
with large state, the savepoint creation process can be time-consuming, leading 
to the following problems:
 # Platform users may need to manage thousands of Flink tasks on a single 
client machine. With the current savepoint triggering mode, all savepoint 
creation threads on that machine have to wait for the job to finish the 
snapshot, resulting in significant resource waste;
 # If the savepoint producing time exceeds the client's timeout duration, the 
client will throw a timeout exception and report that the trggering savepoint 
process fails. Since different jobs have varying savepoint durations, it is 
difficult to adjust the client's timeout parameter.

Therefore, we propose adding a detach mode to trigger savepoints on the client 
side, just similar to the detach mode behavior when submitting jobs. Here are 
some specific details:
 # The savepoint UUID will be generated on the client side. After successfully 
triggering the savepoint, the client immediately returns the UUID information.
 # Add a "dump-pending-savepoints" API interface that allows the client to 
check whether the triggered savepoint has been successfully created.

By implementing these changes, the client can detach from the savepoint 
creation process, reducing resource waste, and providing a way to check the 
status of savepoint creation.

!image-2023-08-16-17-14-34-740.png!!image-2023-08-16-17-14-44-212.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32765) create view should reuse calcite tree

2023-08-06 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-32765:
-

 Summary: create view should reuse calcite tree
 Key: FLINK-32765
 URL: https://issues.apache.org/jira/browse/FLINK-32765
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.16.1
Reporter: xiaogang zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32738) PROTOBUF format supports projection push down

2023-08-02 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-32738:
-

 Summary: PROTOBUF format supports projection push down
 Key: FLINK-32738
 URL: https://issues.apache.org/jira/browse/FLINK-32738
 Project: Flink
  Issue Type: Sub-task
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.18.0
Reporter: xiaogang zhou
 Fix For: 1.18.0


support projection push down for protobuf



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32732) auto offset reset should be exposed to user

2023-08-02 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-32732:
-

 Summary: auto offset reset should be exposed to user
 Key: FLINK-32732
 URL: https://issues.apache.org/jira/browse/FLINK-32732
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.16.1
Reporter: xiaogang zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32514) FLIP-309: Support using larger checkpointing interval when source is processing backlog

2023-07-03 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-32514:


 Summary: FLIP-309: Support using larger checkpointing interval 
when source is processing backlog
 Key: FLINK-32514
 URL: https://issues.apache.org/jira/browse/FLINK-32514
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Checkpointing
Reporter: Yunfeng Zhou


Umbrella issue for 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32494) Cannot convert list literal to Table with PyFlink

2023-06-29 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-32494:


 Summary: Cannot convert list literal to Table with PyFlink
 Key: FLINK-32494
 URL: https://issues.apache.org/jira/browse/FLINK-32494
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.16.1
Reporter: Yunfeng Zhou


During my attempt to convert a list or array to a PyFlink Table using the 
following program

{code:python}
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.java_gateway import get_gateway
from pyflink.table import (
expressions as native_flink_expr,
StreamTableEnvironment,
)
from pyflink.table.types import DataTypes

if __name__ == "__main__":
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
table = t_env.from_elements([(1, ), (2, ), (3, )])
# table = table.add_or_replace_columns(
# native_flink_expr.lit([], DataTypes.ARRAY(DataTypes.INT()).not_null())
# )
table = table.add_or_replace_columns(

native_flink_expr.lit(get_gateway().new_array(get_gateway().jvm.java.lang.Integer,
 0))
)
table.execute().print()
{code}

The following exception would be thrown
{code}
ClassCastException: [Ljava.lang.Integer; cannot be cast to java.util.List
{code}

If I use the following code to create the literal expression along with the 
program above
{code:python}
table = table.add_or_replace_columns(
native_flink_expr.lit([], DataTypes.ARRAY(DataTypes.INT()).not_null())
)
{code}

The following exception would be thrown
{code}
Data type 'ARRAY NOT NULL' with conversion class '[Ljava.lang.Integer;' 
does not support a value literal of class 'java.util.ArrayList'.
{code}

As PyFlink does not provide a document explaining how to create Table with list 
literals, and my attempts described above both fail, there might be some bug in 
PyFlink with this function.






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32464) AssertionError when converting between Table and SQL with selection and type cast

2023-06-28 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-32464:


 Summary: AssertionError when converting between Table and SQL with 
selection and type cast
 Key: FLINK-32464
 URL: https://issues.apache.org/jira/browse/FLINK-32464
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.16.1
Reporter: Yunfeng Zhou


In an attempt to convert table between Table API and SQL API using the 
following program

```java
public static void main(String[] args) {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Table table = tEnv.fromValues(1, 2, 3);

tEnv.createTemporaryView("input_table", table);
table = tEnv.sqlQuery("SELECT MAP[f0, 1] AS f1 from input_table");

table = table.select($("f1").cast(DataTypes.MAP(DataTypes.INT(), 
DataTypes.INT(;

tEnv.createTemporaryView("input_table_2", table);
tEnv.sqlQuery("SELECT * from input_table_2");
}
```
The following exception is thrown.

```
Exception in thread "main" java.lang.AssertionError: Conversion to relational 
algebra failed to preserve datatypes:
validated type:
RecordType((INTEGER, INTEGER) MAP NOT NULL f1-MAP) NOT NULL
converted type:
RecordType((INTEGER, INTEGER) MAP f1-MAP) NOT NULL
rel:
LogicalProject(f1-MAP=[CAST(MAP($0, 1)):(INTEGER, INTEGER) MAP])
  LogicalValues(tuples=[[{ 1 }, { 2 }, { 3 }]])

at 
org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:470)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:215)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:191)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1498)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1253)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:374)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:262)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:703)
at org.apache.flink.streaming.connectors.redis.RedisSinkITCase.main
```

It seems that there is a bug with the Table-SQL conversion and selection 
process when type cast is involved.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32367) lead function second param leat to ClassCastException

2023-06-16 Thread zhou (Jira)
zhou created FLINK-32367:


 Summary: lead function second param leat to ClassCastException
 Key: FLINK-32367
 URL: https://issues.apache.org/jira/browse/FLINK-32367
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.15.4
Reporter: zhou
 Attachments: image-2023-06-16-15-49-49-003.png, 
image-2023-06-16-18-12-05-861.png

!image-2023-06-16-18-12-05-861.png!!image-2023-06-16-15-49-49-003.png!

lead function second param is expression (window_length/2),throw a exception 

if lead function second param is number,it worked well



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32132) Cast function CODEGEN does not work as expected when nullOnFailure enabled

2023-05-19 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-32132:
-

 Summary: Cast function CODEGEN does not work as expected when 
nullOnFailure enabled
 Key: FLINK-32132
 URL: https://issues.apache.org/jira/browse/FLINK-32132
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.16.1
Reporter: xiaogang zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32115) json_value support cache

2023-05-16 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-32115:
-

 Summary: json_value support cache
 Key: FLINK-32115
 URL: https://issues.apache.org/jira/browse/FLINK-32115
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.16.1
Reporter: xiaogang zhou


[https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java]

 

hive support json object cache for previous deserialized value, could we 
consider use a cache objects in JsonValueCallGen? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31605) Table#to_pandas check table boundedness

2023-03-24 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-31605:


 Summary: Table#to_pandas check table boundedness
 Key: FLINK-31605
 URL: https://issues.apache.org/jira/browse/FLINK-31605
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Affects Versions: 1.17.0
Reporter: Yunfeng Zhou


When `Table#to_pandas` is invoked on a table with an unbounded source, this 
method would not throw exceptions. Instead, it would be infinitely executing 
without exiting. It would be better to check the boundedness of a table before 
execution and throw exceptions on unbounded cases.

It might be possible to reference the code in 
`StreamGraphGenerator#existsUnboundedSource` to judge whether a Table is 
bounded or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31249) Checkpoint Timer failed to process timeout events when it blocked at writing _metadata to DFS

2023-02-27 Thread renxiang zhou (Jira)
renxiang zhou created FLINK-31249:
-

 Summary: Checkpoint Timer failed to process timeout events when it 
blocked at writing _metadata to DFS
 Key: FLINK-31249
 URL: https://issues.apache.org/jira/browse/FLINK-31249
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.16.0, 1.11.6
Reporter: renxiang zhou
 Fix For: 1.18.0
 Attachments: image-2023-02-28-11-25-03-637.png

The jobmanager-future thread may be blocked at writing metadata to DFS caused 
by a DFS failure, and the CheckpointCoordinator Lock is hold by this thread. 

When the next Checkpoint is triggered, the Checkpoint Timer thread waits for 
the lock to be released.  If the previous checkpoint times out, the checkpoint 
timer does not execute the timeout event since it is blocked at waiting for the 
lock. As a result, the previous checkpoint cannot be cancelled.

!image-2023-02-28-11-25-03-637.png|width=1144,height=248!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31225) rocksdb max open file can lead to oom

2023-02-25 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-31225:
-

 Summary: rocksdb max open file can lead to oom 
 Key: FLINK-31225
 URL: https://issues.apache.org/jira/browse/FLINK-31225
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.16.1
Reporter: xiaogang zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit

2023-02-15 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-31089:
-

 Summary: pin L0 index in memory can lead to slow memory grow 
finally lead to memory beyond limit
 Key: FLINK-31089
 URL: https://issues.apache.org/jira/browse/FLINK-31089
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.16.1
Reporter: xiaogang zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30959) UNIX_TIMESTAMP's return value does not meet expected

2023-02-07 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-30959:


 Summary: UNIX_TIMESTAMP's return value does not meet expected
 Key: FLINK-30959
 URL: https://issues.apache.org/jira/browse/FLINK-30959
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.15.2
Reporter: Yunfeng Zhou


When running the following pyflink program

 
{code:python}
import pandas as pd
from pyflink.datastream import StreamExecutionEnvironment, HashMapStateBackend
from pyflink.table import StreamTableEnvironment

if __name__ == "__main__":
input_data = pd.DataFrame(
[
["Alex", 100.0, "2022-01-01 08:00:00.001 +0800"],
["Emma", 400.0, "2022-01-01 00:00:00.003 +"],
["Alex", 200.0, "2022-01-01 08:00:00.005 +0800"],
["Emma", 300.0, "2022-01-01 00:00:00.007 +"],
["Jack", 500.0, "2022-01-01 08:00:00.009 +0800"],
["Alex", 450.0, "2022-01-01 00:00:00.011 +"],
],
columns=["name", "avg_cost", "time"],
)

env = StreamExecutionEnvironment.get_execution_environment()
env.set_state_backend(HashMapStateBackend())
t_env = StreamTableEnvironment.create(env)

input_table = t_env.from_pandas(input_data)

t_env.create_temporary_view("input_table", input_table)

time_format = "-MM-dd HH:mm:ss.SSS X"

output_table = t_env.sql_query(
f"SELECT *, UNIX_TIMESTAMP(`time`, '{time_format}') AS unix_time FROM 
input_table"
)

output_table.execute().print()

{code}

The actual output is 


{code}
+++++--+
| op |   name |   avg_cost |
   time |unix_time |
+++++--+
| +I |   Alex |  100.0 |  
2022-01-01 08:00:00.001 +0800 |   1640995200 |
| +I |   Emma |  400.0 |  
2022-01-01 00:00:00.003 + |   1640995200 |
| +I |   Alex |  200.0 |  
2022-01-01 08:00:00.005 +0800 |   1640995200 |
| +I |   Emma |  300.0 |  
2022-01-01 00:00:00.007 + |   1640995200 |
| +I |   Jack |  500.0 |  
2022-01-01 08:00:00.009 +0800 |   1640995200 |
| +I |   Alex |  450.0 |  
2022-01-01 00:00:00.011 + |   1640995200 |
+++++--+
{code}

While the expected result is


{code:java}
+++++--+
| op |   name |   avg_cost |
   time |unix_time |
+++++--+
| +I |   Alex |  100.0 |  
2022-01-01 08:00:00.001 +0800 |   1640995200 |
| +I |   Emma |  400.0 |  
2022-01-01 00:00:00.003 + |   1640966400 |
| +I |   Alex |  200.0 |  
2022-01-01 08:00:00.005 +0800 |   1640995200 |
| +I |   Emma |  300.0 |  
2022-01-01 00:00:00.007 + |   1640966400 |
| +I |   Jack |  500.0 |  
2022-01-01 08:00:00.009 +0800 |   1640995200 |
| +I |   Alex |  450.0 |  
2022-01-01 00:00:00.011 + |   1640966400 |
+++++--+
{code}




 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30753) Py4J cannot acquire Table.explain() method

2023-01-19 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-30753:


 Summary: Py4J cannot acquire Table.explain() method
 Key: FLINK-30753
 URL: https://issues.apache.org/jira/browse/FLINK-30753
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Affects Versions: 1.16.0
Reporter: Yunfeng Zhou


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=45044&view=logs&j=9cada3cb-c1d3-5621-16da-0f718fb86602&t=c67e71ed-6451-5d26-8920-5a8cf9651901



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30532) Add benchmark for DCT, SQLTransformer and StopWordsRemover algorithm

2022-12-28 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-30532:


 Summary: Add benchmark for DCT, SQLTransformer and 
StopWordsRemover algorithm
 Key: FLINK-30532
 URL: https://issues.apache.org/jira/browse/FLINK-30532
 Project: Flink
  Issue Type: Improvement
  Components: Library / Machine Learning
Reporter: Yunfeng Zhou
 Fix For: ml-2.2.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30292) Better support for conversion between DataType and TypeInformation

2022-12-04 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-30292:


 Summary: Better support for conversion between DataType and 
TypeInformation
 Key: FLINK-30292
 URL: https://issues.apache.org/jira/browse/FLINK-30292
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.15.3
Reporter: Yunfeng Zhou


In Flink 1.15, we have the following ways to convert a DataType to a 
TypeInformation. Each of them has some disadvantages.

* `TypeConversions.fromDataTypeToLegacyInfo`
It might lead to precision losses in face of some data types like timestamp.
It has been deprecated.
* `ExternalTypeInfo.of`
It cannot be used to get detailed type information like `RowTypeInfo`
It might bring some serialization overhead.

Given that the ways mentioned above are both not perfect,  Flink SQL should 
provide a better API to support DataType-TypeInformation conversions, and thus 
better support Table-DataStream conversions.
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30256) LogalWindowAgg can set the chaining Strategy to always

2022-11-30 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-30256:
-

 Summary: LogalWindowAgg can set the chaining Strategy to always
 Key: FLINK-30256
 URL: https://issues.apache.org/jira/browse/FLINK-30256
 Project: Flink
  Issue Type: Improvement
Reporter: xiaogang zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30241) Flink ML Iteration ConcurrentModificationException

2022-11-29 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-30241:


 Summary: Flink ML Iteration ConcurrentModificationException
 Key: FLINK-30241
 URL: https://issues.apache.org/jira/browse/FLINK-30241
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0
Reporter: Yunfeng Zhou


https://github.com/jiangxin369/flink-ml/actions/runs/3577811156/jobs/6017233847


{code}
___ LinearRegressionTest.test_get_model_data ___

self = 

def test_get_model_data(self):
regression = LinearRegression().set_weight_col('weight')
model = regression.fit(self.input_data_table)
model_data = self.t_env.to_data_stream(
>   model.get_model_data()[0]).execute_and_collect().next()

pyflink/ml/lib/regression/tests/test_linearregression.py:124: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/opt/hostedtoolcache/Python/3.7.15/x64/lib/python3.7/site-packages/pyflink/datastream/data_stream.py:1760:
 in next
if not self._j_closeable_iterator.hasNext():
/opt/hostedtoolcache/Python/3.7.15/x64/lib/python3.7/site-packages/py4j/java_gateway.py:1322:
 in __call__
answer, self.gateway_client, self.target_id, self.name)
/opt/hostedtoolcache/Python/3.7.15/x64/lib/python3.7/site-packages/pyflink/util/exceptions.py:146:
 in deco
return f(*a, **kw)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

answer = 'xro12236'
gateway_client = 
target_id = 'o12139', name = 'hasNext'

def get_return_value(answer, gateway_client, target_id=None, name=None):
"""Converts an answer received from the Java gateway into a Python 
object.

For example, string representation of integers are converted to Python
integer, string representation of objects are converted to JavaObject
instances, etc.

:param answer: the string returned by the Java gateway
:param gateway_client: the gateway client used to communicate with the 
Java
Gateway. Only necessary if the answer is a reference (e.g., object,
list, map)
:param target_id: the name of the object from which the answer comes 
from
(e.g., *object1* in `object1.hello()`). Optional.
:param name: the name of the member from which the answer comes from
(e.g., *hello* in `object1.hello()`). Optional.
"""
if is_error(answer)[0]:
if len(answer) > 1:
type = answer[1]
value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
if answer[1] == REFERENCE_TYPE:
raise Py4JJavaError(
"An error occurred while calling {0}{1}{2}.\n".
>   format(target_id, ".", name), value)
E   py4j.protocol.Py4JJavaError: An error occurred while 
calling o12139.hasNext.
E   : java.lang.RuntimeException: Failed to fetch next result
E   at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
E   at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
E   at sun.reflect.GeneratedMethodAccessor80.invoke(Unknown 
Source)
E   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E   at java.lang.reflect.Method.invoke(Method.java:498)
E   at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E   at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
E   at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
E   at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E   at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
E   at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
E   at java.lang.Thread.run(Thread.java:750)
E   Caused by: java.io.IOException: Failed to fetch job 
execution result
E   at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184)
E   at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
E   at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
E 

[jira] [Created] (FLINK-30168) PyFlink Deserialization Error with Object Array

2022-11-23 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-30168:


 Summary: PyFlink Deserialization Error with Object Array
 Key: FLINK-30168
 URL: https://issues.apache.org/jira/browse/FLINK-30168
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Affects Versions: 1.15.2, 1.16.0
Reporter: Yunfeng Zhou


When it is attempted to collect object array records from a DataStream in 
PyFlink, an exception like follows would be thrown
data = 0, field_type = DenseVectorTypeInfo
def pickled_bytes_to_python_converter(data, field_type):if 
isinstance(field_type, RowTypeInfo):
row_kind = RowKind(int.from_bytes(data[0], 'little'))
data = zip(list(data[1:]), field_type.get_field_types())
fields = []for d, d_type in data:
fields.append(pickled_bytes_to_python_converter(d, d_type))
row = Row.of_kind(row_kind, *fields)return rowelse:
> data = pickle.loads(data)
E TypeError: a bytes-like object is required, not 'int'
I found that this error is invoked because PyFlink deals with object arrays 
differently on Java side and Python side. 

 

On Java side 
(org.apache.flink.api.common.python.PythonBridgeUtils.getPickledBytesFromJavaObject)

 
{code:java}
...
else if (dataType instanceof BasicArrayTypeInfo || dataType instanceof 
PrimitiveArrayTypeInfo) {
# recursively deal with array elements
} ...
else {
# ObjectArrayTypeInfo is here
TypeSerializer serializer = dataType.createSerializer(null); 
ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos(); 
DataOutputViewStreamWrapper baosWrapper = new 
DataOutputViewStreamWrapper(baos); serializer.serialize(obj, baosWrapper); 
return pickler.dumps(baos.toByteArray());
}
{code}
 

On python side(pyflink.datastream.utils.pickled_bytes_to_python_converter)
{code:java}
...
elif isinstance(field_type,
(BasicArrayTypeInfo, PrimitiveArrayTypeInfo, ObjectArrayTypeInfo)):
  element_type = field_type._element_type
  elements = []
  for element_bytes in data:
elements.append(pickled_bytes_to_python_converter(element_bytes, 
element_type))
  return elements{code}
 

 

Thus a possible fix for this bug is to align PyFlink's behavior on Java side 
and Python side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30144) Guarantee Flink ML operators function correctly with object-reuse enabled

2022-11-22 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-30144:


 Summary: Guarantee Flink ML operators function correctly with 
object-reuse enabled
 Key: FLINK-30144
 URL: https://issues.apache.org/jira/browse/FLINK-30144
 Project: Flink
  Issue Type: Improvement
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0
Reporter: Yunfeng Zhou


Flink ML operators are supposed to function correctly when object-reuse is 
enabled, as a part of Flink ML's performance improvement. Thus we need to add 
this configuration to Flink ML test cases and fix any possible bugs discovered 
along.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30142) Flink ML operators lose table watermark after transform()

2022-11-22 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-30142:


 Summary: Flink ML operators lose table watermark after transform()
 Key: FLINK-30142
 URL: https://issues.apache.org/jira/browse/FLINK-30142
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0
Reporter: Yunfeng Zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30130) Table.select lose watermark

2022-11-21 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-30130:


 Summary: Table.select lose watermark
 Key: FLINK-30130
 URL: https://issues.apache.org/jira/browse/FLINK-30130
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.15.1
Reporter: Yunfeng Zhou


Trying to execute the following program
{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

DataStream stream = env.fromSequence(0, 1000);

Schema schema = Schema.newBuilder()
.column("f0", DataTypes.BIGINT())
.columnByExpression("time_ltz", "TO_TIMESTAMP_LTZ(f0 * 1000, 3)")
.watermark("time_ltz", "time_ltz - INTERVAL '5' SECOND")
.build();
Table table = tEnv.fromDataStream(stream, schema);

table.printSchema();

table = table.select($("*"));

table.printSchema();{code}

Would get the following result


{code:java}
(
  `f0` BIGINT,
  `time_ltz` TIMESTAMP_LTZ(3) *ROWTIME* AS TO_TIMESTAMP_LTZ(f0 * 1000, 3),
  WATERMARK FOR `time_ltz`: TIMESTAMP_LTZ(3) AS time_ltz - INTERVAL '5' SECOND
)
(
  `f0` BIGINT,
  `time_ltz` TIMESTAMP_LTZ(3) *ROWTIME*
)
{code}

This result shows that the watermark property of a Table is lost during select 
operation.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30124) GenericType is not supported in PyFlink currently

2022-11-21 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-30124:


 Summary: GenericType is not supported in PyFlink 
currently
 Key: FLINK-30124
 URL: https://issues.apache.org/jira/browse/FLINK-30124
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.15.1
Reporter: Yunfeng Zhou


When we add and execute the following test case to 
flink-ml-python/pyflink/ml/lib/classification/tests/test_naivebayes.py of the 
Flink ML repository,
{code:java}
def test_get_model_data(self):
model_data = self.estimator.fit(self.train_data).get_model_data()[0]
self.t_env.to_data_stream(model_data).execute_and_collect().next(){code}
The following exception would be thrown.

 
{code:java}
j_type_info = JavaObject id=o698
    def _from_java_type(j_type_info: JavaObject) -> TypeInformation:
        gateway = get_gateway()
        JBasicTypeInfo = 
gateway.jvm.org.apache.flink.api.common.typeinfo.BasicTypeInfo
    
        if _is_instance_of(j_type_info, JBasicTypeInfo.STRING_TYPE_INFO):
            return Types.STRING()
        elif _is_instance_of(j_type_info, JBasicTypeInfo.BOOLEAN_TYPE_INFO):
            return Types.BOOLEAN()
        elif _is_instance_of(j_type_info, JBasicTypeInfo.BYTE_TYPE_INFO):
            return Types.BYTE()
        elif _is_instance_of(j_type_info, JBasicTypeInfo.SHORT_TYPE_INFO):
            return Types.SHORT()
        elif _is_instance_of(j_type_info, JBasicTypeInfo.INT_TYPE_INFO):
            return Types.INT()
        elif _is_instance_of(j_type_info, JBasicTypeInfo.LONG_TYPE_INFO):
            return Types.LONG()
        elif _is_instance_of(j_type_info, JBasicTypeInfo.FLOAT_TYPE_INFO):
            return Types.FLOAT()
        elif _is_instance_of(j_type_info, JBasicTypeInfo.DOUBLE_TYPE_INFO):
            return Types.DOUBLE()
        elif _is_instance_of(j_type_info, JBasicTypeInfo.CHAR_TYPE_INFO):
            return Types.CHAR()
        elif _is_instance_of(j_type_info, JBasicTypeInfo.BIG_INT_TYPE_INFO):
            return Types.BIG_INT()
        elif _is_instance_of(j_type_info, JBasicTypeInfo.BIG_DEC_TYPE_INFO):
            return Types.BIG_DEC()
        elif _is_instance_of(j_type_info, JBasicTypeInfo.INSTANT_TYPE_INFO):
            return Types.INSTANT()
    
        JSqlTimeTypeInfo = 
gateway.jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
        if _is_instance_of(j_type_info, JSqlTimeTypeInfo.DATE):
            return Types.SQL_DATE()
        elif _is_instance_of(j_type_info, JSqlTimeTypeInfo.TIME):
            return Types.SQL_TIME()
        elif _is_instance_of(j_type_info, JSqlTimeTypeInfo.TIMESTAMP):
            return Types.SQL_TIMESTAMP()
    
        JPrimitiveArrayTypeInfo = 
gateway.jvm.org.apache.flink.api.common.typeinfo \
            .PrimitiveArrayTypeInfo
    
        if _is_instance_of(j_type_info, 
JPrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO):
            return Types.PRIMITIVE_ARRAY(Types.BOOLEAN())
        elif _is_instance_of(j_type_info, 
JPrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO):
            return Types.PRIMITIVE_ARRAY(Types.BYTE())
        elif _is_instance_of(j_type_info, 
JPrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO):
            return Types.PRIMITIVE_ARRAY(Types.SHORT())
        elif _is_instance_of(j_type_info, 
JPrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO):
            return Types.PRIMITIVE_ARRAY(Types.INT())
        elif _is_instance_of(j_type_info, 
JPrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO):
            return Types.PRIMITIVE_ARRAY(Types.LONG())
        elif _is_instance_of(j_type_info, 
JPrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO):
            return Types.PRIMITIVE_ARRAY(Types.FLOAT())
        elif _is_instance_of(j_type_info, 
JPrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO):
            return Types.PRIMITIVE_ARRAY(Types.DOUBLE())
        elif _is_instance_of(j_type_info, 
JPrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO):
            return Types.PRIMITIVE_ARRAY(Types.CHAR())
    
        JBasicArrayTypeInfo = 
gateway.jvm.org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo
    
        if _is_instance_of(j_type_info, 
JBasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO):
            return Types.BASIC_ARRAY(Types.BOOLEAN())
        elif _is_instance_of(j_type_info, 
JBasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO):
            return Types.BASIC_ARRAY(Types.BYTE())
        elif _is_instance_of(j_type_info, 
JBasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO):
            return Types.BASIC_ARRAY(Types.SHORT())
        elif _is_instance_of(j_type_info, 
JBasicArrayTypeInfo.INT_ARRAY_TYPE_INFO):
            return Types.BASIC_ARRAY(Types.INT())
        elif _is_instance_of(j_type_info, 
JBasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO):
            return Types.BASIC_ARRAY(Types.LONG())
        elif _is_instance_of(j_type_info, 
JBasicArrayTypeInfo.FLOAT_ARRAY_T

[jira] [Created] (FLINK-30122) Flink ML KMeans getting model data throws TypeError

2022-11-21 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-30122:


 Summary: Flink ML KMeans getting model data throws TypeError
 Key: FLINK-30122
 URL: https://issues.apache.org/jira/browse/FLINK-30122
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0
Reporter: Yunfeng Zhou


When the following test case is added to 
flink-ml-python/pyflink/ml/lib/clustering/tests/test_kmeans.py,
{code:java}
def test_get_model_data(self):
kmeans = KMeans().set_max_iter(2).set_k(2)
model = kmeans.fit(self.data_table)
model_data = model.get_model_data()[0]
expected_field_names = ['centroids', 'weights']
self.assertEqual(expected_field_names, 
model_data.get_schema().get_field_names())self.t_env.to_data_stream(model_data).execute_and_collect().next(){code}
The following exception would be thrown.
{code:java}
data = 0, field_type = DenseVectorTypeInfo
def pickled_bytes_to_python_converter(data, field_type):
if isinstance(field_type, RowTypeInfo):
row_kind = RowKind(int.from_bytes(data[0], 'little'))
data = zip(list(data[1:]), field_type.get_field_types())
fields = []
for d, d_type in data:
fields.append(pickled_bytes_to_python_converter(d, d_type))
row = Row.of_kind(row_kind, *fields)
return row
else:
> data = pickle.loads(data)
E TypeError: a bytes-like object is required, not 'int'{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30037) Improve the efficiency of Flink ML Python CI

2022-11-15 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-30037:


 Summary: Improve the efficiency of Flink ML Python CI
 Key: FLINK-30037
 URL: https://issues.apache.org/jira/browse/FLINK-30037
 Project: Flink
  Issue Type: Improvement
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0
Reporter: Yunfeng Zhou


It took about thirty minutes to execute Flink ML's python CI[1] for now, which 
has obviously affected the efficiency of Flink ML development. Thus we need to 
reduce the total execution time of Flink ML Python CI.

[1] https://github.com/apache/flink-ml/actions/runs/3475256961



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30009) OperatorCoordinator.start()'s JavaDoc mismatches its behavior

2022-11-13 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-30009:


 Summary: OperatorCoordinator.start()'s JavaDoc mismatches its 
behavior
 Key: FLINK-30009
 URL: https://issues.apache.org/jira/browse/FLINK-30009
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.16.0
Reporter: Yunfeng Zhou


The following description lies in the JavaDoc of 
{{OperatorCoordinator.start()}}.

{{This method is called once at the beginning, before any other methods.}}

This description is incorrect because the method {{resetToCheckpoint()}} can 
happen before {{start()}} is invoked. For example, 
{{RecreateOnResetOperatorCoordinator.DeferrableCoordinator.resetAndStart()}} 
uses these methods in this way. Thus the JavaDoc of {{OperatorCoordinator}}'s 
methods should be modified to match this behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29843) Euclidean Distance Measure generates NAN distance values

2022-11-01 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-29843:


 Summary: Euclidean Distance Measure generates NAN distance values
 Key: FLINK-29843
 URL: https://issues.apache.org/jira/browse/FLINK-29843
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0
Reporter: Yunfeng Zhou


Currently Flink ML's `EuclideanDistanceMeasure.distance(...)` method might 
return a negative value as the distance between two vectors given the 
calculation accuracy of java doubles. This bug should be fixed to guarantee 
that the distance is a non-negative value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29763) TaskManager heatbeat timeout exception in Github CI for python tests

2022-10-25 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-29763:


 Summary: TaskManager heatbeat timeout exception in Github CI for 
python tests
 Key: FLINK-29763
 URL: https://issues.apache.org/jira/browse/FLINK-29763
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0
Reporter: Yunfeng Zhou


https://github.com/apache/flink-ml/actions/runs/3322007330/jobs/5490434747
https://github.com/apache/flink-ml/actions/runs/3321223124/jobs/5488576891
https://github.com/apache/flink-ml/actions/runs/3319920091/jobs/5485672250
https://github.com/apache/flink-ml/actions/runs/3319722473/jobs/5485231041
https://github.com/apache/flink-ml/actions/runs/3319599111/jobs/5484952148
https://github.com/apache/flink-ml/actions/runs/3318938657/jobs/5483471010



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29604) Add Estimator and Transformer for CountVectorizer

2022-10-12 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-29604:


 Summary: Add Estimator and Transformer for CountVectorizer
 Key: FLINK-29604
 URL: https://issues.apache.org/jira/browse/FLINK-29604
 Project: Flink
  Issue Type: New Feature
  Components: Library / Machine Learning
Affects Versions: ml-2.2.0
Reporter: Yunfeng Zhou
 Fix For: ml-2.2.0


Add Estimator and Transformer for CountVectorizer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29602) Add Transformer for SQLTransformer

2022-10-12 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-29602:


 Summary: Add Transformer for SQLTransformer
 Key: FLINK-29602
 URL: https://issues.apache.org/jira/browse/FLINK-29602
 Project: Flink
  Issue Type: New Feature
  Components: Library / Machine Learning
Affects Versions: ml-2.2.0
Reporter: Yunfeng Zhou
 Fix For: ml-2.2.0


Add Transformer for SQLTransformer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29603) Add Transformer for StopWordsRemover

2022-10-12 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-29603:


 Summary: Add Transformer for StopWordsRemover
 Key: FLINK-29603
 URL: https://issues.apache.org/jira/browse/FLINK-29603
 Project: Flink
  Issue Type: New Feature
  Components: Library / Machine Learning
Affects Versions: ml-2.2.0
Reporter: Yunfeng Zhou
 Fix For: ml-2.2.0


Add Transformer for StopWordsRemover.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29600) Add Estimator and Transformer for BucketedRandomProjectionLSH

2022-10-12 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-29600:


 Summary: Add Estimator and Transformer for 
BucketedRandomProjectionLSH
 Key: FLINK-29600
 URL: https://issues.apache.org/jira/browse/FLINK-29600
 Project: Flink
  Issue Type: New Feature
  Components: Library / Machine Learning
Affects Versions: ml-2.2.0
Reporter: Yunfeng Zhou
 Fix For: ml-2.2.0


Add Estimator and Transformer for BucketedRandomProjectionLSH.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29601) Add Estimator and Transformer for UnivariateFeatureSelector

2022-10-12 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-29601:


 Summary: Add Estimator and Transformer for 
UnivariateFeatureSelector
 Key: FLINK-29601
 URL: https://issues.apache.org/jira/browse/FLINK-29601
 Project: Flink
  Issue Type: New Feature
  Components: Library / Machine Learning
Affects Versions: ml-2.2.0
Reporter: Yunfeng Zhou
 Fix For: ml-2.2.0


Add Estimator and Transformer for UnivariateFeatureSelector.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29599) Add Estimator and Transformer for MinHashLSH

2022-10-12 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-29599:


 Summary: Add Estimator and Transformer for MinHashLSH
 Key: FLINK-29599
 URL: https://issues.apache.org/jira/browse/FLINK-29599
 Project: Flink
  Issue Type: New Feature
  Components: Library / Machine Learning
Affects Versions: ml-2.2.0
Reporter: Yunfeng Zhou
 Fix For: ml-2.2.0


Add Estimator and Transformer for MinHashLSH.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29598) Add Estimator and Transformer for Imputer

2022-10-12 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-29598:


 Summary: Add Estimator and Transformer for Imputer
 Key: FLINK-29598
 URL: https://issues.apache.org/jira/browse/FLINK-29598
 Project: Flink
  Issue Type: New Feature
  Components: Library / Machine Learning
Affects Versions: ml-2.2.0
Reporter: Yunfeng Zhou
 Fix For: ml-2.2.0


Add Estimator and Transformer for Imputer



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29597) Add Estimator and Transformer for QuantileDiscretizer

2022-10-12 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-29597:


 Summary: Add Estimator and Transformer for QuantileDiscretizer
 Key: FLINK-29597
 URL: https://issues.apache.org/jira/browse/FLINK-29597
 Project: Flink
  Issue Type: New Feature
  Components: Library / Machine Learning
Affects Versions: ml-2.2.0
Reporter: Yunfeng Zhou
 Fix For: ml-2.2.0


Add Estimator and Transformer for QuantileDiscretizer



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29596) Add Estimator and Transformer for RFormula

2022-10-12 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-29596:


 Summary: Add Estimator and Transformer for RFormula
 Key: FLINK-29596
 URL: https://issues.apache.org/jira/browse/FLINK-29596
 Project: Flink
  Issue Type: New Feature
Affects Versions: ml-2.2.0
Reporter: Yunfeng Zhou
 Fix For: ml-2.2.0


Add Estimator and Transformer for RFormula.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29595) Add Estimator and Transformer for ChiSqSelector

2022-10-12 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-29595:


 Summary: Add Estimator and Transformer for ChiSqSelector
 Key: FLINK-29595
 URL: https://issues.apache.org/jira/browse/FLINK-29595
 Project: Flink
  Issue Type: New Feature
  Components: Library / Machine Learning
Affects Versions: ml-2.2.0
Reporter: Yunfeng Zhou
 Fix For: ml-2.2.0


Add the Estimator and Transformer for ChiSqSelector.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29591) Add built-in UDFs to convert between arrays and vectors

2022-10-11 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-29591:


 Summary: Add built-in UDFs to convert between arrays and vectors
 Key: FLINK-29591
 URL: https://issues.apache.org/jira/browse/FLINK-29591
 Project: Flink
  Issue Type: New Feature
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0
Reporter: Yunfeng Zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29545) kafka consuming stop when trigger first checkpoint

2022-10-08 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-29545:
-

 Summary: kafka consuming stop when trigger first checkpoint
 Key: FLINK-29545
 URL: https://issues.apache.org/jira/browse/FLINK-29545
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / Network
Affects Versions: 1.13.3
Reporter: xiaogang zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29429) Add DataType for Flink ML linear algorithm classes

2022-09-27 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-29429:


 Summary: Add DataType for Flink ML linear algorithm classes
 Key: FLINK-29429
 URL: https://issues.apache.org/jira/browse/FLINK-29429
 Project: Flink
  Issue Type: Improvement
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0
Reporter: Yunfeng Zhou


DataType instances are used in Table API when creating Tables or Table UDFs. 
There are helper functions like `DataTypes.of()` that can be used to get the 
DataType for Flink ML classes like DenseVector in java, but this method is not 
applicable in pyflink, which seems not to support custom DataTypes yet. Thus we 
should add DataType subclasses for them in Flink ML.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29115) Improve the quickstart of Flink ML python API

2022-08-26 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-29115:


 Summary: Improve the quickstart of Flink ML python API
 Key: FLINK-29115
 URL: https://issues.apache.org/jira/browse/FLINK-29115
 Project: Flink
  Issue Type: Improvement
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0
Reporter: Yunfeng Zhou


Currently, in Flink ML's document, python users are required to build Flink ML 
Java project before they can build and use Flink ML's python API. Thus an 
improvement should be made to the setup process and quick start so as to 
simplify the usage of Flink ML.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29044) Add Transformer for DCT

2022-08-19 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-29044:


 Summary: Add Transformer for DCT
 Key: FLINK-29044
 URL: https://issues.apache.org/jira/browse/FLINK-29044
 Project: Flink
  Issue Type: New Feature
  Components: Library / Machine Learning
Reporter: Yunfeng Zhou
 Fix For: ml-2.2.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29043) Improve ML iteration efficiency

2022-08-19 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-29043:


 Summary: Improve ML iteration efficiency
 Key: FLINK-29043
 URL: https://issues.apache.org/jira/browse/FLINK-29043
 Project: Flink
  Issue Type: Improvement
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0
Reporter: Yunfeng Zhou


Currently, in Github Action, it takes about one minute to execute the unit 
tests of each algorithm that uses flink ml's iteration mechanism, including 
KMeansTest, LinearRegressionTest, LinearSVCTest, and LogisticRegressionTest[1]. 
We need to figure out which components in flink-ml-iteration have caused this 
phenomenon and seek to make corresponding improvements.

 [1] [https://github.com/apache/flink-ml/runs/7892649470?check_suite_focus=true]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28941) Savepoint ignores MaxConcurrentCheckpoint limit in aligned checkpoint case

2022-08-11 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-28941:


 Summary: Savepoint ignores MaxConcurrentCheckpoint limit in 
aligned checkpoint case
 Key: FLINK-28941
 URL: https://issues.apache.org/jira/browse/FLINK-28941
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.16.0
Reporter: Yunfeng Zhou


When the unaligned checkpoint is disabled, savepoints would be set as 
forced[1], which means they can ignore the maxConcurrentCheckpoint limit[2] and 
lead to the situation that there are more than maxConcurrentCheckpoint running 
simultaneously. 

This behavior is incompatible with OperatorCoordinatorHolder, which requires 
that there should be at most one pending checkpoint at a time. As a result, 
exceptions, as follows, might be thrown[3].


{code:java}
java.lang.IllegalStateException: Cannot mark for checkpoint 9, already marked 
for checkpoint 8
at 
org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl.markForCheckpoint(SubtaskGatewayImpl.java:185)
 ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinatorInternal$6(OperatorCoordinatorHolder.java:328)
 ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
at java.util.HashMap.forEach(HashMap.java:1289) ~[?:1.8.0_292]
at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.checkpointCoordinatorInternal(OperatorCoordinatorHolder.java:327)
 ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinator$0(OperatorCoordinatorHolder.java:243)
 ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
 ~[classes/:?]
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 ~[classes/:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
 ~[classes/:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
 ~[classes/:?]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
 ~[classes/:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
 ~[classes/:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 
[akka-actor_2.12-2.6.15.jar:2.6.15]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 
[akka-actor_2.12-2.6.15.jar:2.6.15]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
[scala-library-2.12.7.jar:?]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
[scala-library-2.12.7.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
[akka-actor_2.12-2.6.15.jar:2.6.15]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[scala-library-2.12.7.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[scala-library-2.12.7.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[scala-library-2.12.7.jar:?]
at akka.actor.Actor.aroundReceive(Actor.scala:537) 
[akka-actor_2.12-2.6.15.jar:2.6.15]
at akka.actor.Actor.aroundReceive$(Actor.scala:535) 
[akka-actor_2.12-2.6.15.jar:2.6.15]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) 
[akka-actor_2.12-2.6.15.jar:2.6.15]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) 
[akka-actor_2.12-2.6.15.jar:2.6.15]
at akka.actor.ActorCell.invoke(ActorCell.scala:548) 
[akka-actor_2.12-2.6.15.jar:2.6.15]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) 
[akka-actor_2.12-2.6.15.jar:2.6.15]
at akka.dispatch.Mailbox.run(Mailbox.scala:231) 
[akka-actor_2.12-2.6.15.jar:2.6.15]
at akka.dispatch.Mailbox.exec(Mailbox.scala:243) 
[akka-actor_2.12-2.6.15.jar:2.6.15]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
[?:1.8.0_292]
at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
[?:1.8.0_292]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
[?:1.8.0_292]
at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) 
[?:1.8.0_292]
{code}


[1] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java#L160-L164
[2] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L444-L449
[3] 
https://dev.azure.com/apache-flink/apac

[jira] [Created] (FLINK-28761) BinaryClassificationEvaluator cannot work with unaligned checkpoint

2022-08-01 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-28761:


 Summary: BinaryClassificationEvaluator cannot work with unaligned 
checkpoint
 Key: FLINK-28761
 URL: https://issues.apache.org/jira/browse/FLINK-28761
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0
Reporter: Yunfeng Zhou


If we make  {{BinaryClassificationEvaluatorTest}} extend  {{AbstractTestBase}}, 
this test class would throw the following exceptions during execution:
 
{code:java}
org.apache.flink.table.api.TableException: Failed to execute sql

at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:854)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1317)
at 
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:605)
at 
org.apache.flink.ml.evaluation.BinaryClassificationEvaluatorTest.testEvaluateWithWeight(BinaryClassificationEvaluatorTest.java:305)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at 
com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
at 
com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.lang.UnsupportedOperationException: Unaligned checkpoints are 
currently not supported for custom partitioners, as rescaling is not guaranteed 
to work correctly.
The user can force Unaligned Checkpoints by using 
'execution.checkpointing.unaligned.forced'
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:390)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:166)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:121)
at 
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:993)
at 
org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:50)
at 
org.apach

[jira] [Created] (FLINK-28673) Migrate Flink ML to Flink 1.15.1

2022-07-25 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-28673:


 Summary: Migrate Flink ML to Flink 1.15.1
 Key: FLINK-28673
 URL: https://issues.apache.org/jira/browse/FLINK-28673
 Project: Flink
  Issue Type: Improvement
  Components: Library / Machine Learning
Affects Versions: ml-2.2.0
Reporter: Yunfeng Zhou


Update Flink ML's Flink dependency to 1.15.1



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28639) Preserve distributed consistency of OperatorEvents from subtasks to OperatorCoordinator

2022-07-21 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-28639:


 Summary: Preserve distributed consistency of OperatorEvents from 
subtasks to OperatorCoordinator
 Key: FLINK-28639
 URL: https://issues.apache.org/jira/browse/FLINK-28639
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing
Affects Versions: 1.14.3
Reporter: Yunfeng Zhou
 Fix For: 1.16.0


This is the second step to solving the consistency issue of OC communications. 
In this step, we would also guarantee the consistency of operator events sent 
from subtasks to OCs. Combined with the other subtask to preserve the 
consistency of communications in the reverse direction, all communications 
between OC and subtasks would be consistent across checkpoints and global 
failovers.
To achieve the goal of this step, we need to add closing/reopening functions to 
the subtasks' gateways and make the subtasks aware of a checkpoint before they 
receive the checkpoint barriers. The general process would be as follows.
1. When the OC starts checkpoint, it notifies all subtasks about this 
information.
2. After being notified about the ongoing checkpoint in OC, a subtask sends a 
special operator event to its OC, which is the last operator event the OC could 
receive from the subtask before the subtask completes the checkpoint. Then the 
subtask closes its gateway.
3. After receiving this special event from all subtasks, the OC finishes its 
checkpoint and closes its gateway. Then the checkpoint coordinator sends 
checkpoint barriers to the sources.
4. If the subtask or the OC generate any event to send to each other, they 
buffer the events locally.
5. When a subtask starts checkpointing, it also stores the buffered events in 
the checkpoint.
6. After the subtask completes the checkpoint, communications in both 
directions are recovered and the buffered events are sent out.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28606) Preserve distributed consistency of OperatorEvents from OperatorCoordinator to subtasks

2022-07-19 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-28606:


 Summary: Preserve distributed consistency of OperatorEvents from 
OperatorCoordinator to subtasks
 Key: FLINK-28606
 URL: https://issues.apache.org/jira/browse/FLINK-28606
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing
Affects Versions: 1.14.3
Reporter: Yunfeng Zhou
 Fix For: 1.16.0


This is a component of our solution to the consistency issue in the operator 
coordinator mechanism. In this step, we would guarantee the consistency of all 
communications in one direction, from OC to subtasks. This would need less 
workload and should unblock the implementation of the CEP coordinator in 
FLIP-200.

Roughly, we would need to implement the following process in this step.
 # 
Let the OC finish processing all the incoming OperatorEvents before the 
snapshot.
 # 
Closes the gateway that sends operator events to its subtasks when the OC 
completes snapshot.
 # 
Wait until all the outgoing OperatorEvents before the snapshot are sent and 
acked.
 # 
Send checkpoint barriers to the Source operators.
 # 
Open the corresponding gateway of a subtask when the OC learned that the 
subtask has completed the checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28455) pyflink tableResult collect to local timeout

2022-07-07 Thread zhou (Jira)
zhou created FLINK-28455:


 Summary: pyflink tableResult collect to local  timeout
 Key: FLINK-28455
 URL: https://issues.apache.org/jira/browse/FLINK-28455
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.13.0
Reporter: zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28224) Add document for algorithms and features in Flink ML 2.1

2022-06-23 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-28224:


 Summary: Add document for algorithms and features in Flink ML 2.1
 Key: FLINK-28224
 URL: https://issues.apache.org/jira/browse/FLINK-28224
 Project: Flink
  Issue Type: Improvement
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0
Reporter: Yunfeng Zhou


The algorithms and new features introduced in Flink ML 2.1 needs to be 
documented and displayed on Flink ML's document website.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27798) Migrate Flink ML to Flink 1.15.0

2022-05-26 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-27798:


 Summary: Migrate Flink ML to Flink 1.15.0
 Key: FLINK-27798
 URL: https://issues.apache.org/jira/browse/FLINK-27798
 Project: Flink
  Issue Type: Improvement
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0
Reporter: Yunfeng Zhou


Update Flink ML's Flink dependency to 1.15.0



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27797) PythonTableUtils.getCollectionInputFormat cannot correctly handle None values

2022-05-26 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-27797:


 Summary: PythonTableUtils.getCollectionInputFormat cannot 
correctly handle None values
 Key: FLINK-27797
 URL: https://issues.apache.org/jira/browse/FLINK-27797
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.15.0
Reporter: Yunfeng Zhou


In `PythonTableUtils.getCollectionInputFormat` there are implementations like 
follows.
This code can be found at 
[https://github.com/apache/flink/blob/8488368b86a99a064446ca74e775b670b94a/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonTableUtils.java#L515]

```
c -> {
if (c.getClass() != byte[].class || dataType instanceof 
PickledByteArrayTypeInfo) {
return c;
}
```

Here, the generated function did not check `c != null` before doing 
`c.getClass()`. which might cause that tables created through pyflink cannot 
parse it when values are `None`.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27742) Fix Compatibility Issues Between Flink ML Operators.

2022-05-23 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-27742:


 Summary: Fix Compatibility Issues Between Flink ML Operators.
 Key: FLINK-27742
 URL: https://issues.apache.org/jira/browse/FLINK-27742
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.0.0
Reporter: Yunfeng Zhou


It is discovered that StringIndexer and LogisticRegression in Flink ML cannot 
be connected in a pipeline. The reason is that the output label column of 
StringIndexer is integer, while LogisticRegression can only accept input data 
whose labels are doubles.

In order to make Flink ML stages compatible with each other, the following 
changes need to be made.
- For stages who can only accept double-typed inputs, update their 
implementation to accept any numerical type.
- For stages that generates labels as integers, make them return labels as 
doubles.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27352) [JUnit5 Migration] Module: flink-json

2022-04-22 Thread EMing Zhou (Jira)
EMing Zhou created FLINK-27352:
--

 Summary: [JUnit5 Migration] Module: flink-json
 Key: FLINK-27352
 URL: https://issues.apache.org/jira/browse/FLINK-27352
 Project: Flink
  Issue Type: Sub-task
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.16.0
Reporter: EMing Zhou






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27084) Perround mode recreating operator fails

2022-04-06 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-27084:


 Summary: Perround mode recreating operator fails
 Key: FLINK-27084
 URL: https://issues.apache.org/jira/browse/FLINK-27084
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.0.0
 Environment: Flink 1.14.0, Flink ML 2.0.0
Reporter: Yunfeng Zhou


When I was trying to submit a job containing Flink ML KMeans operator to a 
Flink cluster, the following exception is thrown out.
{code:java}
The program finished with the following 
exception:org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
8ba9d3173d1c83eb4803298f81349aea)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
8ba9d3173d1c83eb4803298f81349aea)
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)
at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1898)
at 
org.apache.flink.ml.benchmark.BenchmarkUtils.runBenchmark(BenchmarkUtils.java:127)
at 
org.apache.flink.ml.benchmark.BenchmarkUtils.runBenchmark(BenchmarkUtils.java:84)
at org.apache.flink.ml.benchmark.Benchmark.main(Benchmark.java:50)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 8 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: 8ba9d3173d1c83eb4803298f81349aea)
at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at 
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$26(RestClusterClient.java:698)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at 
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at 
java.util.concurrent.CompletableFuture$Com

[jira] [Created] (FLINK-26705) use threadlocal to decrease ObjectSizeCalculator memory use

2022-03-17 Thread hd zhou (Jira)
hd zhou created FLINK-26705:
---

 Summary: use threadlocal to decrease ObjectSizeCalculator memory 
use
 Key: FLINK-26705
 URL: https://issues.apache.org/jira/browse/FLINK-26705
 Project: Flink
  Issue Type: Improvement
Reporter: hd zhou


in class ObjectSizeCalculator

everytime call static function getObjectSize will new ObjectSizeCalculator, 
cost much memory, Gc busy. use threadlocal will decrease memory use

 
{code:java}

/**
 * Given an object, returns the total allocated size, in bytes, of the object 
and all other objects reachable from it.
 * Attempts to to detect the current JVM memory layout, but may fail with 
{@link UnsupportedOperationException};
 *
 * @param obj the object; can be null. Passing in a {@link java.lang.Class} 
object doesn't do anything special, it
 *measures the size of all objects reachable through it (which will 
include its class loader, and by
 *extension, all other Class objects loaded by the same loader, and all 
the parent class loaders). It doesn't
 *provide the size of the static fields in the JVM class that the Class 
object represents.
 * @return the total allocated size of the object and all other objects it 
retains.
 * @throws UnsupportedOperationException if the current vm memory layout cannot 
be detected.
 */
public static long getObjectSize(Object obj) throws 
UnsupportedOperationException {
  return obj == null ? 0 : new 
ObjectSizeCalculator(CurrentLayout.SPEC).calculateObjectSize(obj);
} {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26313) Support Online KMeans in Flink ML

2022-02-22 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-26313:


 Summary: Support Online KMeans in Flink ML
 Key: FLINK-26313
 URL: https://issues.apache.org/jira/browse/FLINK-26313
 Project: Flink
  Issue Type: New Feature
Reporter: Yunfeng Zhou


Modify Flink ML's KMeans algorithm to support online model training and update.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26263) Check data size in LogisticRegression

2022-02-20 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-26263:


 Summary: Check data size in LogisticRegression
 Key: FLINK-26263
 URL: https://issues.apache.org/jira/browse/FLINK-26263
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.0.0
Reporter: Yunfeng Zhou


In Flink ML LogisticRegression, the algorithm would fail if the parallelism is 
larger than input data size. For example, in 
`LogisticRegressionTest.testFitAndPredict()` if we add the following code

```java

env.setParallelism(12);

```

Then the test case would fail with the following exception

```

Caused by: java.lang.IllegalArgumentException: bound must be positive
    at java.base/java.util.Random.nextInt(Random.java:388)
    at 
org.apache.flink.ml.classification.logisticregression.LogisticRegression$CacheDataAndDoTrain.getMiniBatchData(LogisticRegression.java:351)
    at 
org.apache.flink.ml.classification.logisticregression.LogisticRegression$CacheDataAndDoTrain.onEpochWatermarkIncremented(LogisticRegression.java:381)
    at 
org.apache.flink.iteration.operator.AbstractWrapperOperator.notifyEpochWatermarkIncrement(AbstractWrapperOperator.java:129)
    at 
org.apache.flink.iteration.operator.allround.AbstractAllRoundWrapperOperator.lambda$1(AbstractAllRoundWrapperOperator.java:105)
    at 
org.apache.flink.iteration.operator.OperatorUtils.processOperatorOrUdfIfSatisfy(OperatorUtils.java:79)
    at 
org.apache.flink.iteration.operator.allround.AbstractAllRoundWrapperOperator.onEpochWatermarkIncrement(AbstractAllRoundWrapperOperator.java:102)
    at 
org.apache.flink.iteration.progresstrack.OperatorEpochWatermarkTracker.tryUpdateLowerBound(OperatorEpochWatermarkTracker.java:79)
    at 
org.apache.flink.iteration.progresstrack.OperatorEpochWatermarkTracker.onEpochWatermark(OperatorEpochWatermarkTracker.java:63)
    at 
org.apache.flink.iteration.operator.AbstractWrapperOperator.onEpochWatermarkEvent(AbstractWrapperOperator.java:121)
    at 
org.apache.flink.iteration.operator.allround.TwoInputAllRoundWrapperOperator.processElement(TwoInputAllRoundWrapperOperator.java:77)
    at 
org.apache.flink.iteration.operator.allround.TwoInputAllRoundWrapperOperator.processElement2(TwoInputAllRoundWrapperOperator.java:59)
    at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord2(StreamTwoInputProcessorFactory.java:225)
    at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$1(StreamTwoInputProcessorFactory.java:194)
    at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266)
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at 
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:86)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.base/java.lang.Thread.run(Thread.java:834)

```

The cause of this exception is that LogisticRegression has not considered the 
case when input data size is 0. This can be resolved by adding an additional 
check.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26100) Set up Flink ML Document Website

2022-02-13 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-26100:


 Summary: Set up Flink ML Document Website
 Key: FLINK-26100
 URL: https://issues.apache.org/jira/browse/FLINK-26100
 Project: Flink
  Issue Type: New Feature
  Components: Library / Machine Learning
Affects Versions: ml-2.0.0
Reporter: Yunfeng Zhou


Set up Flink ML's document website based on flink document and statefun 
document website.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25444) ExecutionConfig can not be Serializable due to TextElement used in side

2021-12-26 Thread Wen Zhou (Jira)
Wen Zhou created FLINK-25444:


 Summary: ExecutionConfig can not be Serializable due to 
TextElement used in side
 Key: FLINK-25444
 URL: https://issues.apache.org/jira/browse/FLINK-25444
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.14.2, 1.14.0, 1.14.3
Reporter: Wen Zhou


This should be a bug introduced by the latest flink commit of file 
[flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java|https://github.com/apache/flink/commit/9e0e0929b86c372c9243daad9d654af9e7718708#diff-7a439abdf207cf6da8aa6c147b38c1346820fe786afbf652bc614fc377cdf238]

Diff the file, we can find TextElement is used here where ClosureCleanerLevel 
is is used as a memeber of Serializable ExecutionConfig.

[TextElement in ClosureCleanerLevel|https://i.stack.imgur.com/ky3d8.png]

The simplest way to verify the problem is running code as followings in flink 
1.13.5 and 1.14.x, the exception is reproduced in 1.14.x . And the diff between 
1.13.5 and 1.14.x is only lates commit.

{{@Testpublic void testExecutionConfigSerializable() throws Exception {
ExecutionConfig config = new ExecutionConfig();
ClosureCleaner.clean(config, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, 
true);
}}}

 

The problem can be found here 
https://stackoverflow.com/questions/70443743/flink-blockelement-exception-when-updating-to-version-1-14-2/70468925?noredirect=1#comment124597510_70468925



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24955) Add One-hot Encoder to Flink ML

2021-11-18 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-24955:


 Summary: Add One-hot Encoder to Flink ML
 Key: FLINK-24955
 URL: https://issues.apache.org/jira/browse/FLINK-24955
 Project: Flink
  Issue Type: New Feature
  Components: Library / Machine Learning
Reporter: Yunfeng Zhou


Add One-hot Encoder to Flink ML



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24817) Support Naive Bayes algorithm in Flink ML

2021-11-08 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-24817:


 Summary: Support Naive Bayes algorithm in Flink ML
 Key: FLINK-24817
 URL: https://issues.apache.org/jira/browse/FLINK-24817
 Project: Flink
  Issue Type: New Feature
  Components: Library / Machine Learning
Reporter: Yunfeng Zhou


This ticket aims to add Naive Bayes algorithm to Flink ML. The algorithm will 
use latest Flink ML API proposed in FLIP 173~176. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24059) SourceReaderTestBase should allow NUM_SPLITS to be overridden in implementation

2021-08-30 Thread Brian Zhou (Jira)
Brian Zhou created FLINK-24059:
--

 Summary: SourceReaderTestBase should allow NUM_SPLITS to be 
overridden in implementation
 Key: FLINK-24059
 URL: https://issues.apache.org/jira/browse/FLINK-24059
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common
Affects Versions: 1.13.2
Reporter: Brian Zhou
 Fix For: 1.14.0


Pravega Flink connector is trying to implement the FLIP-27 sources and trying 
to map the Pravega reader into the split. This leads to a one-to-one mapping 
for source reader and splits. For unit tests, Flink has offered the 
{{SourceReaderTestBase}} class to test more easily, but it has a {{final}} 
constraint in the NUM_SPLITS constant which the value is 10, which makes us 
hard to integrate.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23799) application mode not support pyflink

2021-08-16 Thread zhou (Jira)
zhou created FLINK-23799:


 Summary: application mode not support pyflink
 Key: FLINK-23799
 URL: https://issues.apache.org/jira/browse/FLINK-23799
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.13.2
 Environment: flink 1.13
Reporter: zhou


I do this:

/space/flink/bin/flink run-application -t yarn-application 
-Dyarn.application.name=kafka-hive --pyFiles /space/testAirflow/airflow/dags/ 
-py /space/testAirflow/airflow/dags/chloe/kafka_to_hive_1.py

 

flink throw a Exception : java.lang.IllegalArgumentException: Should only have 
one jar

 

Does application mode not support pyflink?

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23353) UDTAGG can't execute in Batch mode

2021-07-12 Thread hayden zhou (Jira)
hayden zhou created FLINK-23353:
---

 Summary: UDTAGG can't execute in Batch mode
 Key: FLINK-23353
 URL: https://issues.apache.org/jira/browse/FLINK-23353
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.13.1
Reporter: hayden zhou



{code:java}

public class Top2Test {
public static void main(String[] args) {

EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode()build();
TableEnvironment tEnv = TableEnvironment.create(settings);

Table sourceTable = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("name",DataTypes.STRING()),
DataTypes.FIELD("price", DataTypes.INT())
),
row(1, "hayden", 18),
row(3, "hayden", 19),
row(4, "hayden", 20),
row(2, "jaylin", 20)
);

tEnv.createTemporaryView("source", sourceTable);

Table rT = tEnv.from("source")
.groupBy($("name"))
.flatAggregate(call(Top2.class, $("price")).as("price", "rank"))
.select($("name"), $("price"), $("rank"));
rT.execute().print();
}


public static class Top2Accumulator {
public Integer first;
public Integer second;
}

public static class Top2 extends TableAggregateFunction, Top2Accumulator> {

@Override
public Top2Accumulator createAccumulator() {
Top2Accumulator acc = new Top2Accumulator();
acc.first = Integer.MIN_VALUE;
acc.second = Integer.MIN_VALUE;
return acc;
}

public void accumulate(Top2Accumulator acc, Integer value) {
if (value > acc.first) {
acc.second = acc.first;
acc.first = value;
} else if (value > acc.second) {
acc.second = value;
}
}

public void merge(Top2Accumulator acc, Iterable it) {
for (Top2Accumulator otherAcc : it) {
accumulate(acc, otherAcc.first);
accumulate(acc, otherAcc.second);
}
}

public void emitValue(Top2Accumulator acc, Collector> out) {
if (acc.first != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.first, 1));
}
if (acc.second != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.second, 2));
}
}
}

}

{code}

got errors as below:
Exception in thread "main" org.apache.flink.table.api.TableException: Cannot 
generate a valid execution plan for the given query: 

LogicalSink(table=[default_catalog.default_database.Unregistered_Collect_Sink_1],
 fields=[name, price, rank])
+- LogicalProject(name=[AS($0, _UTF-16LE'name')], price=[AS($1, 
_UTF-16LE'price')], rank=[AS($2, _UTF-16LE'rank')])
   +- LogicalTableAggregate(group=[{1}], 
tableAggregate=[[flinktest$Top2Test$Top2$4619034833a29d53c136506047509219($2)]])
  +- LogicalUnion(all=[true])
 :- LogicalProject(id=[CAST(1):INTEGER], 
name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(18):INTEGER])
 :  +- LogicalValues(tuples=[[{ 0 }]])
 :- LogicalProject(id=[CAST(3):INTEGER], 
name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(19):INTEGER])
 :  +- LogicalValues(tuples=[[{ 0 }]])
 :- LogicalProject(id=[CAST(4):INTEGER], 
name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(20):INTEGER])
 :  +- LogicalValues(tuples=[[{ 0 }]])
 +- LogicalProject(id=[CAST(2):INTEGER], 
name=[CAST(_UTF-16LE'jaylin':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(20):INTEGER])
+- LogicalValues(tuples=[[{ 0 }]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:72)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.col

[jira] [Created] (FLINK-22689) Table API Documentation Row-Based Operations Example Fails

2021-05-17 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-22689:


 Summary: Table API Documentation Row-Based Operations Example Fails
 Key: FLINK-22689
 URL: https://issues.apache.org/jira/browse/FLINK-22689
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Ecosystem
Affects Versions: 1.12.1
Reporter: Yunfeng Zhou


I wrote the following program according to the example code provided in 
[Documentation/Table API/Row-based 
operations|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/tableapi/#row-based-operations]
public class TableUDF {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
​
        Table input = tEnv.fromValues(
                DataTypes.of("ROW"),
                Row.of("name")
        );
​
        ScalarFunction func = new MyMapFunction();
        tEnv.registerFunction("func", func);
​
        Table table = input
                .map(call("func", $("c")).as("a", "b")); // exception occurs 
here
​
        table.execute().print();
    }
​
    public static class MyMapFunction extends ScalarFunction {
        public Row eval(String a) {
            return Row.of(a, "pre-" + a);
        }
​
        @Override
        public TypeInformation getResultType(Class[] signature) {
            return Types.ROW(Types.STRING, Types.STRING);
        }
    }
}
The code above would throw an exception like this:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Only 
a scalar function can be used in the map operator.
  at 
org.apache.flink.table.operations.utils.OperationTreeBuilder.map(OperationTreeBuilder.java:480)
  at org.apache.flink.table.api.internal.TableImpl.map(TableImpl.java:519)
  at org.apache.flink.ml.common.function.TableUDFBug.main(TableUDF.java:29)
  The core of the program above is identical to that provided in flink 
documentation, but it cannot function correctly. This might affect users who 
want to use custom function with table API.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22460) Conversion to relational algebra failed caused by ''

2021-04-25 Thread Haiwei Zhou (Jira)
Haiwei Zhou created FLINK-22460:
---

 Summary: Conversion to relational algebra failed caused by ''
 Key: FLINK-22460
 URL: https://issues.apache.org/jira/browse/FLINK-22460
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.12.1
Reporter: Haiwei Zhou


Flink complains that an insert sql doesn't match the table schema. The 
validated type is missing a "NOT NULL" modifier.

 

 
{code:java}
py4j.protocol.Py4JJavaError: An error occurred while calling o18.executeSql.
: java.lang.AssertionError: Conversion to relational algebra failed to preserve 
datatypes:
validated type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" request, CHAR(7) 
CHARACTER SET "UTF-16LE" NOT NULL EXPR$1, BIGINT number, TIMESTAMP(3) 
start_time, TIMESTAMP(3) end_time) NOT NULL
converted type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" request, CHAR(7) 
CHARACTER SET "UTF-16LE" NOT NULL EXPR$1, BIGINT NOT NULL number, TIMESTAMP(3) 
start_time, TIMESTAMP(3) end_time) NOT
 NULL{code}
 

 
{code:java}
table_env.execute_sql('''
CREATE TABLE preload_stats (
 lineitems STRING,
 itype STRING,
 number BIGINT NOT NULL,
 start_time TIMESTAMP(3),
 end_time TIMESTAMP(3)
)'''
 
table_env.execute_sql(
 "SELECT request, 'request', number, start_time, end_time "
 "FROM result_1 ").execute_insert('preload_stats')
{code}
 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22133) SplitEmumerator does not provide checkpoint id in snapshot

2021-04-07 Thread Brian Zhou (Jira)
Brian Zhou created FLINK-22133:
--

 Summary: SplitEmumerator does not provide checkpoint id in snapshot
 Key: FLINK-22133
 URL: https://issues.apache.org/jira/browse/FLINK-22133
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common
Affects Versions: 1.12.0
Reporter: Brian Zhou


In ExternallyInducedSource API, the checkpoint trigger exposes the checkpoint 
Id for the external client to identify the checkpoint. However, in the FLIP-27 
source, the SplitEmumerator::snapshot() is an no-arg method. The connector 
cannot track the checkpoint ID from Flink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22059) add a new option is rocksdb statebackend to enable job threads setting

2021-03-30 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-22059:
-

 Summary: add a new option is rocksdb statebackend to enable job 
threads setting
 Key: FLINK-22059
 URL: https://issues.apache.org/jira/browse/FLINK-22059
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.12.2
Reporter: xiaogang zhou


As discussed in FLINK-21688 , now we are using the setIncreaseParallelism 
function to set the number of rocksdb's working threads. 

 

can we enable another setting key to set the rocksdb's max backgroud jobs which 
will set a large flush thread number.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22047) Could not find FLINSHED Flink job and can't submit job

2021-03-30 Thread hayden zhou (Jira)
hayden zhou created FLINK-22047:
---

 Summary: Could not find FLINSHED Flink job and can't submit job 
 Key: FLINK-22047
 URL: https://issues.apache.org/jira/browse/FLINK-22047
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Kubernetes
Affects Versions: 1.12.2
Reporter: hayden zhou


Could not find FLINSHED Flink job,  and aways can't submit job by insufficient 
slot



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22010) when flink executed union all opeators,exception occured

2021-03-29 Thread zhou (Jira)
zhou created FLINK-22010:


 Summary: when flink executed union all opeators,exception occured
 Key: FLINK-22010
 URL: https://issues.apache.org/jira/browse/FLINK-22010
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.12.1
Reporter: zhou


*when I executed job on 1.11.2,the job no exception,when I executed job on 
1.12.1 or 1.12.2 ,the job occured some exception.*

*code as the following:*
{quote}result = result1.union_all(result2)
result = result.union_all(result3)
# 
.union_all(result4).union_all(result5).union_all(result5).union_all(result6).union_all(result7)
result.execute().print()
{quote}
above the code, when i comment the code as the following,the code also no 
exception on flink 1.12.1 :
{quote}result = result1.union_all(result2)
#result = result.union_all(result3)
# 
.union_all(result4).union_all(result5).union_all(result5).union_all(result6).union_all(result7)
result.execute().print()
{quote}
I dont know how to solve the problems, May be someone could help me?

Excepion as the following:
{quote}py4j.protocol.Py4JJavaError: An error occurred while calling o340.print.
: java.lang.RuntimeException: Failed to fetch next result
 at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
 at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 at 
org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:117)
 at 
org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)
 at 
org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:149)
 at 
org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:154)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
 at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
 at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to fetch job execution result
 at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:169)
 at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118)
 at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 ... 16 more
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
9ba6325f27c97192e42e76bd52d05db8)
 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
 at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:167)
 ... 18 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: 9ba6325f27c97192e42e76bd52d05db8)
 at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125)
 at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
 at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
 at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
 at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
 at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
 at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394)
 at 
java.util.concurrent.CompletableFuture.uniWhenCo

[jira] [Created] (FLINK-22008) writing metadata is not an atomic operation, we should add a commit logic

2021-03-29 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-22008:
-

 Summary: writing metadata is not an atomic operation, we should 
add a commit logic
 Key: FLINK-22008
 URL: https://issues.apache.org/jira/browse/FLINK-22008
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.12.2
Reporter: xiaogang zhou


writing metadata is not an atomic operation, some logic can cause there is a 
metadata file in the checkpoint dir, but the data is corrupted if the 
jobmanager crash while writing the metadata. 

 

So we should consider to add commit operation in the checkpoint storage location



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21840) can't submit flink k8s session job with kubernetes.rest-service.exposed.type=NodePort

2021-03-16 Thread hayden zhou (Jira)
hayden zhou created FLINK-21840:
---

 Summary: can't submit flink k8s session job with 
kubernetes.rest-service.exposed.type=NodePort
 Key: FLINK-21840
 URL: https://issues.apache.org/jira/browse/FLINK-21840
 Project: Flink
  Issue Type: Bug
  Components: Runtime / REST
Affects Versions: 1.12.2
 Environment: flink on native kubernetes with session mode
Reporter: hayden zhou


I have created a flink session cluster by `kubernetes-session` command with 
-Dkubernetes.rest-service.exposed.type=NodePort options, because  we don't want 
to expose the rest service external.
when I submit flink job by `flink run --target kubernetes-session xxx` i found 
this command will automatically find the Kubernetes ApiServer address as the 
Node address. But my ApiService address IP is not in the node ips of k8s 
cluster. can I specific a node IP explicitly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21688) we use setIncreaseParallelism function, can cause slow flush in restore

2021-03-09 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-21688:
-

 Summary: we use setIncreaseParallelism function, can cause slow 
flush in restore
 Key: FLINK-21688
 URL: https://issues.apache.org/jira/browse/FLINK-21688
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.11.3
Reporter: xiaogang zhou


we use setIncreaseParallelism function, can cause slow flush in restore 
rescaling case. As this function limits the HIGH threads to 1.

 

Why not set the MAX jobs to 40, which will offer more flush thread to enable a 
fast recovery



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21543) when using FIFO compaction, I found sst being deleted on the first checkpoint

2021-03-01 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-21543:
-

 Summary: when using FIFO compaction, I found sst being deleted on 
the first checkpoint
 Key: FLINK-21543
 URL: https://issues.apache.org/jira/browse/FLINK-21543
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: xiaogang zhou


2021/03/01-18:51:01.202049 7f59042fc700 (Original Log Time 
2021/03/01-18:51:01.200883) [/compaction/compaction_picker_fifo.cc:107] 
[_timer_state/processing_user-timers] FIFO compaction: picking file 1710 with 
creation time 0 for deletion

 

the configuration is like 

currentOptions.setCompactionStyle(getCompactionStyle());
 currentOptions.setLevel0FileNumCompactionTrigger(8);
// currentOptions.setMaxTableFilesSizeFIFO(MemorySize.parse("2gb").getBytes());
 CompactionOptionsFIFO compactionOptionsFIFO = new CompactionOptionsFIFO();
 compactionOptionsFIFO.setMaxTableFilesSize(MemorySize.parse("8gb").getBytes());
 compactionOptionsFIFO.setAllowCompaction(true);

 

the rocksdb version is 


 io.github.myasuka
 frocksdbjni
 6.10.2-ververica-3.0


 

I think the problem is caused by manifest file is not uploaded by flink. Can 
any one suggest how i can skip this problem?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21472) encingTokenException: Fencing token mismatch

2021-02-23 Thread hayden zhou (Jira)
hayden zhou created FLINK-21472:
---

 Summary: encingTokenException: Fencing token mismatch
 Key: FLINK-21472
 URL: https://issues.apache.org/jira/browse/FLINK-21472
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Kubernetes
Affects Versions: 1.12.1
Reporter: hayden zhou
 Attachments: 
flink--standalonesession-0-mta-flink-jobmanager-864d6c8cbb-rmsxw.log


org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler [] - Unhandled 
exception.
org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token 
mismatch: Ignoring message LocalFencedMessage(8fac01d8e3e3988223a2e5c6e3f04f1e, 
LocalRpcInvocation(requestMultipleJobDetails(Time))) because the fencing token 
8fac01d8e3e3988223a2e5c6e3f04f1e did not match the expected fencing token 
8c37414f464bca76144e6cabc946474b.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   >