[jira] [Created] (FLINK-35827) Equality bewteen a row field and a row constant is wrong in SQL
yisha zhou created FLINK-35827: -- Summary: Equality bewteen a row field and a row constant is wrong in SQL Key: FLINK-35827 URL: https://issues.apache.org/jira/browse/FLINK-35827 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: yisha zhou To reproduce the issue, you can add codes below in RowTypeTest {code:java} testAllApis( 'f2 === row(2, "foo", true), "f2 = row(2, 'foo', true)", "true" ) {code} f2 is actually the same as the constant `row(2, "foo", true)`, however the result of expression `f2 = row(2, 'foo', true)` is false. The root cause is that `ScalarOperatorGens.generateEquals` generates code like `$leftTerm.equals($rightTerm)` for row types. However f2 may be a GenericRowData, the constant may be a BinaryRowData, the equality between them are false. And after investigating the code, I believe logic in `EqualiserCodeGenerator.generateEqualsCode` can handle the issue here. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35357) Add "kubernetes.operator.plugins.listeners" parameter description to the Operator configuration document
Yang Zhou created FLINK-35357: - Summary: Add "kubernetes.operator.plugins.listeners" parameter description to the Operator configuration document Key: FLINK-35357 URL: https://issues.apache.org/jira/browse/FLINK-35357 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Yang Zhou In Flink Operator "Custom Flink Resource Listeners" in practice (doc: https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/plugins/#custom-flink-resource -listeners) It was found that the "Operator Configuration Reference" document did not explain the "Custom Flink Resource Listeners" configuration parameters. So I wanted to come up with adding: kubernetes.operator.plugins.listeners..class: , after all it is useful. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35066) TwoInputOperator in IterationBody cannot use keyBy
Yunfeng Zhou created FLINK-35066: Summary: TwoInputOperator in IterationBody cannot use keyBy Key: FLINK-35066 URL: https://issues.apache.org/jira/browse/FLINK-35066 Project: Flink Issue Type: Technical Debt Components: Library / Machine Learning Affects Versions: ml-2.3.0 Reporter: Yunfeng Zhou Implementing a UDF KeyedRichCoProcessFunction or CoFlatMapFunction inside IterationBody yields a “java.lang.ClassCastException: org.apache.flink.iteration.IterationRecord cannot be cast to class org.apache.flink.api.java.tuple.Tuple” error. More details about this bug can be found at https://lists.apache.org/thread/bgkw1g2tdgnp1xy1clsqtcfs3h18pkd6 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35037) Optimize uniqueKeys and upsertKeys inference of windows with ROW_NUMBER
yisha zhou created FLINK-35037: -- Summary: Optimize uniqueKeys and upsertKeys inference of windows with ROW_NUMBER Key: FLINK-35037 URL: https://issues.apache.org/jira/browse/FLINK-35037 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.20.0 Reporter: yisha zhou In current Implementation, relNodes with Window type will only deliver upsert/unique keys of their inputs if these keys contains the partition keys. However windows with ROW_NUMBER can also produce upsert/unique keys. For example: {code:java} select id, name, score, age, class, row_number() over(partition by class order by name) as rn, rank() over (partition by class order by score) as rk, dense_rank() over (partition by class order by score) as drk, avg(score) over (partition by class order by score) as avg_score, max(score) over (partition by age) as max_score, count(id) over (partition by age) as cnt from student {code} (class, rn) is a valid uniqueKeys candidate. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34976) LD_PRELOAD environment may not be effective after su to flink user
xiaogang zhou created FLINK-34976: - Summary: LD_PRELOAD environment may not be effective after su to flink user Key: FLINK-34976 URL: https://issues.apache.org/jira/browse/FLINK-34976 Project: Flink Issue Type: New Feature Components: flink-docker Affects Versions: 1.19.0 Reporter: xiaogang zhou -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34656) Generated code for `ITEM` operator should return null when getting element of a null map/array/row
yisha zhou created FLINK-34656: -- Summary: Generated code for `ITEM` operator should return null when getting element of a null map/array/row Key: FLINK-34656 URL: https://issues.apache.org/jira/browse/FLINK-34656 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.20.0 Reporter: yisha zhou In FieldAccessFromTableITCase we can find that the expected result of f0[1] is null when f0 is a null array. However, behavior in generated code for ITEM is not consistent with case above. The main code is: {code:java} val arrayAccessCode = s""" |${array.code} |${index.code} |boolean $nullTerm = ${array.nullTerm} || ${index.nullTerm} || | $idxStr < 0 || $idxStr >= ${array.resultTerm}.size() || $arrayIsNull; |$resultTypeTerm $resultTerm = $nullTerm ? $defaultTerm : $arrayGet; |""".stripMargin {code} If `array.nullTerm` is true, a default value of element type will be returned, for example -1 for null bigint array. The reason why FieldAccessFromTableITCase can get expected result is that the ReduceExpressionsRule generated an expression code for that case like: {code:java} boolean isNull$0 = true || false || ((int) 1) - 1 < 0 || ((int) 1) - 1 >= ((org.apache.flink.table.data.ArrayData) null).size() || ((org.apache.flink.table.data.ArrayData) null).isNullAt(((int) 1) - 1); long result$0 = isNull$0 ? -1L : ((org.apache.flink.table.data.ArrayData) null).getLong(((int) 1) - 1); if (isNull$0) { out.setField(0, null); } else { out.setField(0, result$0); } {code} The reduced expr will be a null literal. I think the behaviors for getting element of a null value should be unified. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34536) Support reading long value as Timestamp column in JSON format
yisha zhou created FLINK-34536: -- Summary: Support reading long value as Timestamp column in JSON format Key: FLINK-34536 URL: https://issues.apache.org/jira/browse/FLINK-34536 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.19.0 Reporter: yisha zhou In many scenarios, timestamp data is stored as Long value and expected to be operated as Timestamp value. It's not user-friendly to use an UDF to convert the data before operating it. Meanwhile, in Avro format, it seems it can receive several types of input and convert it into TimestampData. Hope the same ability can be introduced into JSON format. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34529) Projection cannot be pushed down through rank operator.
yisha zhou created FLINK-34529: -- Summary: Projection cannot be pushed down through rank operator. Key: FLINK-34529 URL: https://issues.apache.org/jira/browse/FLINK-34529 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.19.0 Reporter: yisha zhou When there is a rank/deduplicate operator, the projection based on output of this operator cannot be pushed down to the input of it. The following code can help reproducing the issue: {code:java} val util = streamTestUtil() util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c) util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f) val sql = """ |SELECT a FROM ( | SELECT a, f, | ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num | FROM T1, T2 | WHERE T1.a = T2.d |) |WHERE rank_num = 1 """.stripMargin util.verifyPlan(sql){code} The plan is expected to be: {code:java} Calc(select=[a]) +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], select=[a, c, f]) +- Exchange(distribution=[hash[f]]) +- Calc(select=[a, c, f]) +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a, c]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +- Exchange(distribution=[hash[d]]) +- Calc(select=[d, f]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) {code} Notice that the 'select' of Join operator is [a, c, d, f]. However the actual plan is: {code:java} Calc(select=[a]) +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], select=[a, c, f]) +- Exchange(distribution=[hash[f]]) +- Calc(select=[a, c, f]) +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +- Exchange(distribution=[hash[d]]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) {code} the 'select' of Join operator is [a, b, c, d, e, f], which means the projection in the final Calc is not passed through the Rank. And I think an easy way to fix this issue is to add org.apache.calcite.rel.rules.ProjectWindowTransposeRule into FlinkStreamRuleSets.LOGICAL_OPT_RULES. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34519) Refine checkpoint scheduling and canceling logic
Yunfeng Zhou created FLINK-34519: Summary: Refine checkpoint scheduling and canceling logic Key: FLINK-34519 URL: https://issues.apache.org/jira/browse/FLINK-34519 Project: Flink Issue Type: Technical Debt Components: Runtime / Checkpointing Affects Versions: 1.20.0 Reporter: Yunfeng Zhou In the current implementation, CheckpointCoordinator#startCheckpointScheduler would stop the checkpoint scheduler before starting it, and CheckpointCoordinator#stopCheckpointScheduler would cancel all ongoing and pending checkpoints. When a stop-with-savepoint request is received, checkpoint coordinator would trigger stopCheckpointScheduler before creating the savepoint, and start the scheduler afterwards if the savepoint fails. The problem with this behavior is that it mixed up different checkpointing types. For example, stopCheckpointScheduler() only needs to cancel previous periodic checkpoints, while the current behavior cancels ongoing savepoints as well. This behavior is still acceptable for now, given that periodic checkpointing is enabled so long as a job is running, and two users would hardly trigger savepoints at the same time. However, as the Batch-Streaming Unification optimizations need to change some of these assumptions, the checkpoint coordinator should fix this problem. To be exact, checkpoint coordinator should at least distinguish between the following semantics. - Periodic checkpoint is enabled to ensure that failover recovery time should be kept within a time limit. - Periodic checkpoint is disabled to reduce corresponding performance overhead, but the ability to checkpoint still exists and users can trigger a savepoint anytime. - Checkpoint or savepoint is not allowed due to job status or topological requirements. It should also be supported for a Flink job to change between the checkpointing semantics mentioned above dynamically during runtime. Besides, checkpoints canceled in stopCheckpointScheduler() would fail with an error message saying "Checkpoint Coordinator is suspending", which is ambiguous for debugging. The detailed reason should be recorded as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34406) Expose more RuntimeContext functionalities in FunctionContext
yisha zhou created FLINK-34406: -- Summary: Expose more RuntimeContext functionalities in FunctionContext Key: FLINK-34406 URL: https://issues.apache.org/jira/browse/FLINK-34406 Project: Flink Issue Type: New Feature Reporter: yisha zhou -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34402) Class loading conflicts when using PowerMock in ITcase.
yisha zhou created FLINK-34402: -- Summary: Class loading conflicts when using PowerMock in ITcase. Key: FLINK-34402 URL: https://issues.apache.org/jira/browse/FLINK-34402 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.19.0 Reporter: yisha zhou Fix For: 1.19.0 Currently when no user jars exist, system classLoader will be used to load classes as default. However, if we use powerMock to create some ITCases, the framework will utilize JavassistMockClassLoader to load classes. Forcing the use of the system classLoader can lead to class loading conflict issue. Therefore we should use Thread.currentThread().getContextClassLoader() instead of ClassLoader.getSystemClassLoader() here. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34371) FLIP-331: Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment
Yunfeng Zhou created FLINK-34371: Summary: FLIP-331: Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment Key: FLINK-34371 URL: https://issues.apache.org/jira/browse/FLINK-34371 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing, Runtime / Task Reporter: Yunfeng Zhou This is an umbrella ticket for FLIP-331. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34077) Sphinx version needs upgrade
Yunfeng Zhou created FLINK-34077: Summary: Sphinx version needs upgrade Key: FLINK-34077 URL: https://issues.apache.org/jira/browse/FLINK-34077 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.19.0 Reporter: Yunfeng Zhou [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357&view=logs&j=9cada3cb-c1d3-5621-16da-0f718fb86602&t=c67e71ed-6451-5d26-8920-5a8cf9651901] {code:java} Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d _build/doctrees -a -W . _build/htmlJan 14 15:49:17 Running Sphinx v4.5.0 Jan 14 15:49:17Jan 14 15:49:17 Sphinx version error:Jan 14 15:49:17 The sphinxcontrib.applehelp extension used by this project needs at least Sphinx v5.0; it therefore cannot be built with this version.Jan 14 15:49:17 Makefile:76: recipe for target 'html' failedJan 14 15:49:17 make: *** [html] Error 2Jan 14 15:49:18 ==sphinx checks... [FAILED]=== {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34015) execution.savepoint.ignore-unclaimed-state is invalid when passing this parameter by dynamic properties
Renxiang Zhou created FLINK-34015: - Summary: execution.savepoint.ignore-unclaimed-state is invalid when passing this parameter by dynamic properties Key: FLINK-34015 URL: https://issues.apache.org/jira/browse/FLINK-34015 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.17.0 Reporter: Renxiang Zhou Attachments: image-2024-01-08-14-22-09-758.png, image-2024-01-08-14-24-30-665.png, image-2024-01-08-14-29-04-347.png We set `execution.savepoint.ignore-unclaimed-state` to true and use -D option to submit the job, but unfortunately we found the value is still false in jobmanager log. Pic 1: we set `execution.savepoint.ignore-unclaimed-state` to true in submiting job. !image-2024-01-08-14-22-09-758.png|width=1012,height=222! Pic 2: The value is still false in jmlog. !image-2024-01-08-14-24-30-665.png|width=651,height=51! Besides, the parameter `execution.savepoint-restore-mode` has the same problem since when we pass it by -D option. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33741) introduce stat dump period and statsLevel configuration
xiaogang zhou created FLINK-33741: - Summary: introduce stat dump period and statsLevel configuration Key: FLINK-33741 URL: https://issues.apache.org/jira/browse/FLINK-33741 Project: Flink Issue Type: New Feature Reporter: xiaogang zhou I'd like to introduce 2 rocksdb statistic related configuration. Then we can customize stats {code:java} // code placeholder Statistics s = new Statistics(); s.setStatsLevel(EXCEPT_TIME_FOR_MUTEX); currentOptions.setStatsDumpPeriodSec(internalGetOption(RocksDBConfigurableOptions.STATISTIC_DUMP_PERIOD)) .setStatistics(s); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail
xiaogang zhou created FLINK-33728: - Summary: do not rewatch when KubernetesResourceManagerDriver watch fail Key: FLINK-33728 URL: https://issues.apache.org/jira/browse/FLINK-33728 Project: Flink Issue Type: New Feature Components: Deployment / Kubernetes Reporter: xiaogang zhou -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33249) comment should be parsed by StringLiteral() instead of SqlCharStringLiteral to avoid parsing failure
xiaogang zhou created FLINK-33249: - Summary: comment should be parsed by StringLiteral() instead of SqlCharStringLiteral to avoid parsing failure Key: FLINK-33249 URL: https://issues.apache.org/jira/browse/FLINK-33249 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.17.1 Reporter: xiaogang zhou -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33174) enabling tablesample bernoulli in flink
xiaogang zhou created FLINK-33174: - Summary: enabling tablesample bernoulli in flink Key: FLINK-33174 URL: https://issues.apache.org/jira/browse/FLINK-33174 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.17.1 Reporter: xiaogang zhou I'd like to introduce a table sample function to enable fast sampling to streamings. this is enlighted by https://issues.apache.org/jira/browse/CALCITE-5971 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33162) seperate the executor in DefaultDispatcherResourceManagerComponentFactory for MetricFetcher and webMonitorEndpoint
xiaogang zhou created FLINK-33162: - Summary: seperate the executor in DefaultDispatcherResourceManagerComponentFactory for MetricFetcher and webMonitorEndpoint Key: FLINK-33162 URL: https://issues.apache.org/jira/browse/FLINK-33162 Project: Flink Issue Type: Improvement Components: Runtime / REST Affects Versions: 1.16.0 Reporter: xiaogang zhou Fix For: 1.19.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33038) remove getMinRetentionTime in StreamExecDeduplicate
xiaogang zhou created FLINK-33038: - Summary: remove getMinRetentionTime in StreamExecDeduplicate Key: FLINK-33038 URL: https://issues.apache.org/jira/browse/FLINK-33038 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.18.0 Reporter: xiaogang zhou Fix For: 1.19.0 I suggest to remove the StreamExecDeduplicate method in StreamExecDeduplicate as the ttl is controlled by the state meta data. Please let me take the issue if possible -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32998) if function result not correct
zhou created FLINK-32998: Summary: if function result not correct Key: FLINK-32998 URL: https://issues.apache.org/jira/browse/FLINK-32998 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.15.4 Reporter: zhou Attachments: image-2023-08-30-18-29-16-277.png, image-2023-08-30-18-30-05-568.png !image-2023-08-30-18-29-16-277.png! !image-2023-08-30-18-30-05-568.png! if function result not correct,not result in origin field value, cut off the filed(word) value -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32881) Client supports making savepoints in detach mode
Renxiang Zhou created FLINK-32881: - Summary: Client supports making savepoints in detach mode Key: FLINK-32881 URL: https://issues.apache.org/jira/browse/FLINK-32881 Project: Flink Issue Type: Improvement Components: API / State Processor, Client / Job Submission Affects Versions: 1.19.0 Reporter: Renxiang Zhou Fix For: 1.19.0 Attachments: image-2023-08-16-17-14-34-740.png, image-2023-08-16-17-14-44-212.png When triggering a savepoint using the command-line tool, the client needs to wait for the job to finish creating the savepoint before it can exit. For jobs with large state, the savepoint creation process can be time-consuming, leading to the following problems: # Platform users may need to manage thousands of Flink tasks on a single client machine. With the current savepoint triggering mode, all savepoint creation threads on that machine have to wait for the job to finish the snapshot, resulting in significant resource waste; # If the savepoint producing time exceeds the client's timeout duration, the client will throw a timeout exception and report that the trggering savepoint process fails. Since different jobs have varying savepoint durations, it is difficult to adjust the client's timeout parameter. Therefore, we propose adding a detach mode to trigger savepoints on the client side, just similar to the detach mode behavior when submitting jobs. Here are some specific details: # The savepoint UUID will be generated on the client side. After successfully triggering the savepoint, the client immediately returns the UUID information. # Add a "dump-pending-savepoints" API interface that allows the client to check whether the triggered savepoint has been successfully created. By implementing these changes, the client can detach from the savepoint creation process, reducing resource waste, and providing a way to check the status of savepoint creation. !image-2023-08-16-17-14-34-740.png!!image-2023-08-16-17-14-44-212.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32765) create view should reuse calcite tree
xiaogang zhou created FLINK-32765: - Summary: create view should reuse calcite tree Key: FLINK-32765 URL: https://issues.apache.org/jira/browse/FLINK-32765 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.16.1 Reporter: xiaogang zhou -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32738) PROTOBUF format supports projection push down
xiaogang zhou created FLINK-32738: - Summary: PROTOBUF format supports projection push down Key: FLINK-32738 URL: https://issues.apache.org/jira/browse/FLINK-32738 Project: Flink Issue Type: Sub-task Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.18.0 Reporter: xiaogang zhou Fix For: 1.18.0 support projection push down for protobuf -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32732) auto offset reset should be exposed to user
xiaogang zhou created FLINK-32732: - Summary: auto offset reset should be exposed to user Key: FLINK-32732 URL: https://issues.apache.org/jira/browse/FLINK-32732 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.16.1 Reporter: xiaogang zhou -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32514) FLIP-309: Support using larger checkpointing interval when source is processing backlog
Yunfeng Zhou created FLINK-32514: Summary: FLIP-309: Support using larger checkpointing interval when source is processing backlog Key: FLINK-32514 URL: https://issues.apache.org/jira/browse/FLINK-32514 Project: Flink Issue Type: New Feature Components: Runtime / Checkpointing Reporter: Yunfeng Zhou Umbrella issue for https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32494) Cannot convert list literal to Table with PyFlink
Yunfeng Zhou created FLINK-32494: Summary: Cannot convert list literal to Table with PyFlink Key: FLINK-32494 URL: https://issues.apache.org/jira/browse/FLINK-32494 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.16.1 Reporter: Yunfeng Zhou During my attempt to convert a list or array to a PyFlink Table using the following program {code:python} from pyflink.datastream import StreamExecutionEnvironment from pyflink.java_gateway import get_gateway from pyflink.table import ( expressions as native_flink_expr, StreamTableEnvironment, ) from pyflink.table.types import DataTypes if __name__ == "__main__": env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) table = t_env.from_elements([(1, ), (2, ), (3, )]) # table = table.add_or_replace_columns( # native_flink_expr.lit([], DataTypes.ARRAY(DataTypes.INT()).not_null()) # ) table = table.add_or_replace_columns( native_flink_expr.lit(get_gateway().new_array(get_gateway().jvm.java.lang.Integer, 0)) ) table.execute().print() {code} The following exception would be thrown {code} ClassCastException: [Ljava.lang.Integer; cannot be cast to java.util.List {code} If I use the following code to create the literal expression along with the program above {code:python} table = table.add_or_replace_columns( native_flink_expr.lit([], DataTypes.ARRAY(DataTypes.INT()).not_null()) ) {code} The following exception would be thrown {code} Data type 'ARRAY NOT NULL' with conversion class '[Ljava.lang.Integer;' does not support a value literal of class 'java.util.ArrayList'. {code} As PyFlink does not provide a document explaining how to create Table with list literals, and my attempts described above both fail, there might be some bug in PyFlink with this function. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32464) AssertionError when converting between Table and SQL with selection and type cast
Yunfeng Zhou created FLINK-32464: Summary: AssertionError when converting between Table and SQL with selection and type cast Key: FLINK-32464 URL: https://issues.apache.org/jira/browse/FLINK-32464 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.16.1 Reporter: Yunfeng Zhou In an attempt to convert table between Table API and SQL API using the following program ```java public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); Table table = tEnv.fromValues(1, 2, 3); tEnv.createTemporaryView("input_table", table); table = tEnv.sqlQuery("SELECT MAP[f0, 1] AS f1 from input_table"); table = table.select($("f1").cast(DataTypes.MAP(DataTypes.INT(), DataTypes.INT(; tEnv.createTemporaryView("input_table_2", table); tEnv.sqlQuery("SELECT * from input_table_2"); } ``` The following exception is thrown. ``` Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes: validated type: RecordType((INTEGER, INTEGER) MAP NOT NULL f1-MAP) NOT NULL converted type: RecordType((INTEGER, INTEGER) MAP f1-MAP) NOT NULL rel: LogicalProject(f1-MAP=[CAST(MAP($0, 1)):(INTEGER, INTEGER) MAP]) LogicalValues(tuples=[[{ 1 }, { 2 }, { 3 }]]) at org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:470) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:215) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:191) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1498) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1253) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:374) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:262) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:703) at org.apache.flink.streaming.connectors.redis.RedisSinkITCase.main ``` It seems that there is a bug with the Table-SQL conversion and selection process when type cast is involved. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32367) lead function second param leat to ClassCastException
zhou created FLINK-32367: Summary: lead function second param leat to ClassCastException Key: FLINK-32367 URL: https://issues.apache.org/jira/browse/FLINK-32367 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.15.4 Reporter: zhou Attachments: image-2023-06-16-15-49-49-003.png, image-2023-06-16-18-12-05-861.png !image-2023-06-16-18-12-05-861.png!!image-2023-06-16-15-49-49-003.png! lead function second param is expression (window_length/2),throw a exception if lead function second param is number,it worked well -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32132) Cast function CODEGEN does not work as expected when nullOnFailure enabled
xiaogang zhou created FLINK-32132: - Summary: Cast function CODEGEN does not work as expected when nullOnFailure enabled Key: FLINK-32132 URL: https://issues.apache.org/jira/browse/FLINK-32132 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.16.1 Reporter: xiaogang zhou -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32115) json_value support cache
xiaogang zhou created FLINK-32115: - Summary: json_value support cache Key: FLINK-32115 URL: https://issues.apache.org/jira/browse/FLINK-32115 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Affects Versions: 1.16.1 Reporter: xiaogang zhou [https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java] hive support json object cache for previous deserialized value, could we consider use a cache objects in JsonValueCallGen? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31605) Table#to_pandas check table boundedness
Yunfeng Zhou created FLINK-31605: Summary: Table#to_pandas check table boundedness Key: FLINK-31605 URL: https://issues.apache.org/jira/browse/FLINK-31605 Project: Flink Issue Type: Improvement Components: API / Python Affects Versions: 1.17.0 Reporter: Yunfeng Zhou When `Table#to_pandas` is invoked on a table with an unbounded source, this method would not throw exceptions. Instead, it would be infinitely executing without exiting. It would be better to check the boundedness of a table before execution and throw exceptions on unbounded cases. It might be possible to reference the code in `StreamGraphGenerator#existsUnboundedSource` to judge whether a Table is bounded or not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31249) Checkpoint Timer failed to process timeout events when it blocked at writing _metadata to DFS
renxiang zhou created FLINK-31249: - Summary: Checkpoint Timer failed to process timeout events when it blocked at writing _metadata to DFS Key: FLINK-31249 URL: https://issues.apache.org/jira/browse/FLINK-31249 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Affects Versions: 1.16.0, 1.11.6 Reporter: renxiang zhou Fix For: 1.18.0 Attachments: image-2023-02-28-11-25-03-637.png The jobmanager-future thread may be blocked at writing metadata to DFS caused by a DFS failure, and the CheckpointCoordinator Lock is hold by this thread. When the next Checkpoint is triggered, the Checkpoint Timer thread waits for the lock to be released. If the previous checkpoint times out, the checkpoint timer does not execute the timeout event since it is blocked at waiting for the lock. As a result, the previous checkpoint cannot be cancelled. !image-2023-02-28-11-25-03-637.png|width=1144,height=248! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31225) rocksdb max open file can lead to oom
xiaogang zhou created FLINK-31225: - Summary: rocksdb max open file can lead to oom Key: FLINK-31225 URL: https://issues.apache.org/jira/browse/FLINK-31225 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 1.16.1 Reporter: xiaogang zhou -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit
xiaogang zhou created FLINK-31089: - Summary: pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit Key: FLINK-31089 URL: https://issues.apache.org/jira/browse/FLINK-31089 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 1.16.1 Reporter: xiaogang zhou -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30959) UNIX_TIMESTAMP's return value does not meet expected
Yunfeng Zhou created FLINK-30959: Summary: UNIX_TIMESTAMP's return value does not meet expected Key: FLINK-30959 URL: https://issues.apache.org/jira/browse/FLINK-30959 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.15.2 Reporter: Yunfeng Zhou When running the following pyflink program {code:python} import pandas as pd from pyflink.datastream import StreamExecutionEnvironment, HashMapStateBackend from pyflink.table import StreamTableEnvironment if __name__ == "__main__": input_data = pd.DataFrame( [ ["Alex", 100.0, "2022-01-01 08:00:00.001 +0800"], ["Emma", 400.0, "2022-01-01 00:00:00.003 +"], ["Alex", 200.0, "2022-01-01 08:00:00.005 +0800"], ["Emma", 300.0, "2022-01-01 00:00:00.007 +"], ["Jack", 500.0, "2022-01-01 08:00:00.009 +0800"], ["Alex", 450.0, "2022-01-01 00:00:00.011 +"], ], columns=["name", "avg_cost", "time"], ) env = StreamExecutionEnvironment.get_execution_environment() env.set_state_backend(HashMapStateBackend()) t_env = StreamTableEnvironment.create(env) input_table = t_env.from_pandas(input_data) t_env.create_temporary_view("input_table", input_table) time_format = "-MM-dd HH:mm:ss.SSS X" output_table = t_env.sql_query( f"SELECT *, UNIX_TIMESTAMP(`time`, '{time_format}') AS unix_time FROM input_table" ) output_table.execute().print() {code} The actual output is {code} +++++--+ | op | name | avg_cost | time |unix_time | +++++--+ | +I | Alex | 100.0 | 2022-01-01 08:00:00.001 +0800 | 1640995200 | | +I | Emma | 400.0 | 2022-01-01 00:00:00.003 + | 1640995200 | | +I | Alex | 200.0 | 2022-01-01 08:00:00.005 +0800 | 1640995200 | | +I | Emma | 300.0 | 2022-01-01 00:00:00.007 + | 1640995200 | | +I | Jack | 500.0 | 2022-01-01 08:00:00.009 +0800 | 1640995200 | | +I | Alex | 450.0 | 2022-01-01 00:00:00.011 + | 1640995200 | +++++--+ {code} While the expected result is {code:java} +++++--+ | op | name | avg_cost | time |unix_time | +++++--+ | +I | Alex | 100.0 | 2022-01-01 08:00:00.001 +0800 | 1640995200 | | +I | Emma | 400.0 | 2022-01-01 00:00:00.003 + | 1640966400 | | +I | Alex | 200.0 | 2022-01-01 08:00:00.005 +0800 | 1640995200 | | +I | Emma | 300.0 | 2022-01-01 00:00:00.007 + | 1640966400 | | +I | Jack | 500.0 | 2022-01-01 08:00:00.009 +0800 | 1640995200 | | +I | Alex | 450.0 | 2022-01-01 00:00:00.011 + | 1640966400 | +++++--+ {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30753) Py4J cannot acquire Table.explain() method
Yunfeng Zhou created FLINK-30753: Summary: Py4J cannot acquire Table.explain() method Key: FLINK-30753 URL: https://issues.apache.org/jira/browse/FLINK-30753 Project: Flink Issue Type: Improvement Components: API / Python Affects Versions: 1.16.0 Reporter: Yunfeng Zhou https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=45044&view=logs&j=9cada3cb-c1d3-5621-16da-0f718fb86602&t=c67e71ed-6451-5d26-8920-5a8cf9651901 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30532) Add benchmark for DCT, SQLTransformer and StopWordsRemover algorithm
Yunfeng Zhou created FLINK-30532: Summary: Add benchmark for DCT, SQLTransformer and StopWordsRemover algorithm Key: FLINK-30532 URL: https://issues.apache.org/jira/browse/FLINK-30532 Project: Flink Issue Type: Improvement Components: Library / Machine Learning Reporter: Yunfeng Zhou Fix For: ml-2.2.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30292) Better support for conversion between DataType and TypeInformation
Yunfeng Zhou created FLINK-30292: Summary: Better support for conversion between DataType and TypeInformation Key: FLINK-30292 URL: https://issues.apache.org/jira/browse/FLINK-30292 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.15.3 Reporter: Yunfeng Zhou In Flink 1.15, we have the following ways to convert a DataType to a TypeInformation. Each of them has some disadvantages. * `TypeConversions.fromDataTypeToLegacyInfo` It might lead to precision losses in face of some data types like timestamp. It has been deprecated. * `ExternalTypeInfo.of` It cannot be used to get detailed type information like `RowTypeInfo` It might bring some serialization overhead. Given that the ways mentioned above are both not perfect, Flink SQL should provide a better API to support DataType-TypeInformation conversions, and thus better support Table-DataStream conversions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30256) LogalWindowAgg can set the chaining Strategy to always
xiaogang zhou created FLINK-30256: - Summary: LogalWindowAgg can set the chaining Strategy to always Key: FLINK-30256 URL: https://issues.apache.org/jira/browse/FLINK-30256 Project: Flink Issue Type: Improvement Reporter: xiaogang zhou -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30241) Flink ML Iteration ConcurrentModificationException
Yunfeng Zhou created FLINK-30241: Summary: Flink ML Iteration ConcurrentModificationException Key: FLINK-30241 URL: https://issues.apache.org/jira/browse/FLINK-30241 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.1.0 Reporter: Yunfeng Zhou https://github.com/jiangxin369/flink-ml/actions/runs/3577811156/jobs/6017233847 {code} ___ LinearRegressionTest.test_get_model_data ___ self = def test_get_model_data(self): regression = LinearRegression().set_weight_col('weight') model = regression.fit(self.input_data_table) model_data = self.t_env.to_data_stream( > model.get_model_data()[0]).execute_and_collect().next() pyflink/ml/lib/regression/tests/test_linearregression.py:124: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ /opt/hostedtoolcache/Python/3.7.15/x64/lib/python3.7/site-packages/pyflink/datastream/data_stream.py:1760: in next if not self._j_closeable_iterator.hasNext(): /opt/hostedtoolcache/Python/3.7.15/x64/lib/python3.7/site-packages/py4j/java_gateway.py:1322: in __call__ answer, self.gateway_client, self.target_id, self.name) /opt/hostedtoolcache/Python/3.7.15/x64/lib/python3.7/site-packages/pyflink/util/exceptions.py:146: in deco return f(*a, **kw) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ answer = 'xro12236' gateway_client = target_id = 'o12139', name = 'hasNext' def get_return_value(answer, gateway_client, target_id=None, name=None): """Converts an answer received from the Java gateway into a Python object. For example, string representation of integers are converted to Python integer, string representation of objects are converted to JavaObject instances, etc. :param answer: the string returned by the Java gateway :param gateway_client: the gateway client used to communicate with the Java Gateway. Only necessary if the answer is a reference (e.g., object, list, map) :param target_id: the name of the object from which the answer comes from (e.g., *object1* in `object1.hello()`). Optional. :param name: the name of the member from which the answer comes from (e.g., *hello* in `object1.hello()`). Optional. """ if is_error(answer)[0]: if len(answer) > 1: type = answer[1] value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) if answer[1] == REFERENCE_TYPE: raise Py4JJavaError( "An error occurred while calling {0}{1}{2}.\n". > format(target_id, ".", name), value) E py4j.protocol.Py4JJavaError: An error occurred while calling o12139.hasNext. E : java.lang.RuntimeException: Failed to fetch next result E at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) E at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) E at sun.reflect.GeneratedMethodAccessor80.invoke(Unknown Source) E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) E at java.lang.reflect.Method.invoke(Method.java:498) E at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) E at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) E at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) E at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) E at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) E at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) E at java.lang.Thread.run(Thread.java:750) E Caused by: java.io.IOException: Failed to fetch job execution result E at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184) E at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) E at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) E
[jira] [Created] (FLINK-30168) PyFlink Deserialization Error with Object Array
Yunfeng Zhou created FLINK-30168: Summary: PyFlink Deserialization Error with Object Array Key: FLINK-30168 URL: https://issues.apache.org/jira/browse/FLINK-30168 Project: Flink Issue Type: Improvement Components: API / Python Affects Versions: 1.15.2, 1.16.0 Reporter: Yunfeng Zhou When it is attempted to collect object array records from a DataStream in PyFlink, an exception like follows would be thrown data = 0, field_type = DenseVectorTypeInfo def pickled_bytes_to_python_converter(data, field_type):if isinstance(field_type, RowTypeInfo): row_kind = RowKind(int.from_bytes(data[0], 'little')) data = zip(list(data[1:]), field_type.get_field_types()) fields = []for d, d_type in data: fields.append(pickled_bytes_to_python_converter(d, d_type)) row = Row.of_kind(row_kind, *fields)return rowelse: > data = pickle.loads(data) E TypeError: a bytes-like object is required, not 'int' I found that this error is invoked because PyFlink deals with object arrays differently on Java side and Python side. On Java side (org.apache.flink.api.common.python.PythonBridgeUtils.getPickledBytesFromJavaObject) {code:java} ... else if (dataType instanceof BasicArrayTypeInfo || dataType instanceof PrimitiveArrayTypeInfo) { # recursively deal with array elements } ... else { # ObjectArrayTypeInfo is here TypeSerializer serializer = dataType.createSerializer(null); ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos(); DataOutputViewStreamWrapper baosWrapper = new DataOutputViewStreamWrapper(baos); serializer.serialize(obj, baosWrapper); return pickler.dumps(baos.toByteArray()); } {code} On python side(pyflink.datastream.utils.pickled_bytes_to_python_converter) {code:java} ... elif isinstance(field_type, (BasicArrayTypeInfo, PrimitiveArrayTypeInfo, ObjectArrayTypeInfo)): element_type = field_type._element_type elements = [] for element_bytes in data: elements.append(pickled_bytes_to_python_converter(element_bytes, element_type)) return elements{code} Thus a possible fix for this bug is to align PyFlink's behavior on Java side and Python side. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30144) Guarantee Flink ML operators function correctly with object-reuse enabled
Yunfeng Zhou created FLINK-30144: Summary: Guarantee Flink ML operators function correctly with object-reuse enabled Key: FLINK-30144 URL: https://issues.apache.org/jira/browse/FLINK-30144 Project: Flink Issue Type: Improvement Components: Library / Machine Learning Affects Versions: ml-2.1.0 Reporter: Yunfeng Zhou Flink ML operators are supposed to function correctly when object-reuse is enabled, as a part of Flink ML's performance improvement. Thus we need to add this configuration to Flink ML test cases and fix any possible bugs discovered along. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30142) Flink ML operators lose table watermark after transform()
Yunfeng Zhou created FLINK-30142: Summary: Flink ML operators lose table watermark after transform() Key: FLINK-30142 URL: https://issues.apache.org/jira/browse/FLINK-30142 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.1.0 Reporter: Yunfeng Zhou -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30130) Table.select lose watermark
Yunfeng Zhou created FLINK-30130: Summary: Table.select lose watermark Key: FLINK-30130 URL: https://issues.apache.org/jira/browse/FLINK-30130 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.15.1 Reporter: Yunfeng Zhou Trying to execute the following program {code:java} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); DataStream stream = env.fromSequence(0, 1000); Schema schema = Schema.newBuilder() .column("f0", DataTypes.BIGINT()) .columnByExpression("time_ltz", "TO_TIMESTAMP_LTZ(f0 * 1000, 3)") .watermark("time_ltz", "time_ltz - INTERVAL '5' SECOND") .build(); Table table = tEnv.fromDataStream(stream, schema); table.printSchema(); table = table.select($("*")); table.printSchema();{code} Would get the following result {code:java} ( `f0` BIGINT, `time_ltz` TIMESTAMP_LTZ(3) *ROWTIME* AS TO_TIMESTAMP_LTZ(f0 * 1000, 3), WATERMARK FOR `time_ltz`: TIMESTAMP_LTZ(3) AS time_ltz - INTERVAL '5' SECOND ) ( `f0` BIGINT, `time_ltz` TIMESTAMP_LTZ(3) *ROWTIME* ) {code} This result shows that the watermark property of a Table is lost during select operation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30124) GenericType is not supported in PyFlink currently
Yunfeng Zhou created FLINK-30124: Summary: GenericType is not supported in PyFlink currently Key: FLINK-30124 URL: https://issues.apache.org/jira/browse/FLINK-30124 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.15.1 Reporter: Yunfeng Zhou When we add and execute the following test case to flink-ml-python/pyflink/ml/lib/classification/tests/test_naivebayes.py of the Flink ML repository, {code:java} def test_get_model_data(self): model_data = self.estimator.fit(self.train_data).get_model_data()[0] self.t_env.to_data_stream(model_data).execute_and_collect().next(){code} The following exception would be thrown. {code:java} j_type_info = JavaObject id=o698 def _from_java_type(j_type_info: JavaObject) -> TypeInformation: gateway = get_gateway() JBasicTypeInfo = gateway.jvm.org.apache.flink.api.common.typeinfo.BasicTypeInfo if _is_instance_of(j_type_info, JBasicTypeInfo.STRING_TYPE_INFO): return Types.STRING() elif _is_instance_of(j_type_info, JBasicTypeInfo.BOOLEAN_TYPE_INFO): return Types.BOOLEAN() elif _is_instance_of(j_type_info, JBasicTypeInfo.BYTE_TYPE_INFO): return Types.BYTE() elif _is_instance_of(j_type_info, JBasicTypeInfo.SHORT_TYPE_INFO): return Types.SHORT() elif _is_instance_of(j_type_info, JBasicTypeInfo.INT_TYPE_INFO): return Types.INT() elif _is_instance_of(j_type_info, JBasicTypeInfo.LONG_TYPE_INFO): return Types.LONG() elif _is_instance_of(j_type_info, JBasicTypeInfo.FLOAT_TYPE_INFO): return Types.FLOAT() elif _is_instance_of(j_type_info, JBasicTypeInfo.DOUBLE_TYPE_INFO): return Types.DOUBLE() elif _is_instance_of(j_type_info, JBasicTypeInfo.CHAR_TYPE_INFO): return Types.CHAR() elif _is_instance_of(j_type_info, JBasicTypeInfo.BIG_INT_TYPE_INFO): return Types.BIG_INT() elif _is_instance_of(j_type_info, JBasicTypeInfo.BIG_DEC_TYPE_INFO): return Types.BIG_DEC() elif _is_instance_of(j_type_info, JBasicTypeInfo.INSTANT_TYPE_INFO): return Types.INSTANT() JSqlTimeTypeInfo = gateway.jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo if _is_instance_of(j_type_info, JSqlTimeTypeInfo.DATE): return Types.SQL_DATE() elif _is_instance_of(j_type_info, JSqlTimeTypeInfo.TIME): return Types.SQL_TIME() elif _is_instance_of(j_type_info, JSqlTimeTypeInfo.TIMESTAMP): return Types.SQL_TIMESTAMP() JPrimitiveArrayTypeInfo = gateway.jvm.org.apache.flink.api.common.typeinfo \ .PrimitiveArrayTypeInfo if _is_instance_of(j_type_info, JPrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO): return Types.PRIMITIVE_ARRAY(Types.BOOLEAN()) elif _is_instance_of(j_type_info, JPrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO): return Types.PRIMITIVE_ARRAY(Types.BYTE()) elif _is_instance_of(j_type_info, JPrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO): return Types.PRIMITIVE_ARRAY(Types.SHORT()) elif _is_instance_of(j_type_info, JPrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO): return Types.PRIMITIVE_ARRAY(Types.INT()) elif _is_instance_of(j_type_info, JPrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO): return Types.PRIMITIVE_ARRAY(Types.LONG()) elif _is_instance_of(j_type_info, JPrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO): return Types.PRIMITIVE_ARRAY(Types.FLOAT()) elif _is_instance_of(j_type_info, JPrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO): return Types.PRIMITIVE_ARRAY(Types.DOUBLE()) elif _is_instance_of(j_type_info, JPrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO): return Types.PRIMITIVE_ARRAY(Types.CHAR()) JBasicArrayTypeInfo = gateway.jvm.org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo if _is_instance_of(j_type_info, JBasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO): return Types.BASIC_ARRAY(Types.BOOLEAN()) elif _is_instance_of(j_type_info, JBasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO): return Types.BASIC_ARRAY(Types.BYTE()) elif _is_instance_of(j_type_info, JBasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO): return Types.BASIC_ARRAY(Types.SHORT()) elif _is_instance_of(j_type_info, JBasicArrayTypeInfo.INT_ARRAY_TYPE_INFO): return Types.BASIC_ARRAY(Types.INT()) elif _is_instance_of(j_type_info, JBasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO): return Types.BASIC_ARRAY(Types.LONG()) elif _is_instance_of(j_type_info, JBasicArrayTypeInfo.FLOAT_ARRAY_T
[jira] [Created] (FLINK-30122) Flink ML KMeans getting model data throws TypeError
Yunfeng Zhou created FLINK-30122: Summary: Flink ML KMeans getting model data throws TypeError Key: FLINK-30122 URL: https://issues.apache.org/jira/browse/FLINK-30122 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.1.0 Reporter: Yunfeng Zhou When the following test case is added to flink-ml-python/pyflink/ml/lib/clustering/tests/test_kmeans.py, {code:java} def test_get_model_data(self): kmeans = KMeans().set_max_iter(2).set_k(2) model = kmeans.fit(self.data_table) model_data = model.get_model_data()[0] expected_field_names = ['centroids', 'weights'] self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())self.t_env.to_data_stream(model_data).execute_and_collect().next(){code} The following exception would be thrown. {code:java} data = 0, field_type = DenseVectorTypeInfo def pickled_bytes_to_python_converter(data, field_type): if isinstance(field_type, RowTypeInfo): row_kind = RowKind(int.from_bytes(data[0], 'little')) data = zip(list(data[1:]), field_type.get_field_types()) fields = [] for d, d_type in data: fields.append(pickled_bytes_to_python_converter(d, d_type)) row = Row.of_kind(row_kind, *fields) return row else: > data = pickle.loads(data) E TypeError: a bytes-like object is required, not 'int'{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30037) Improve the efficiency of Flink ML Python CI
Yunfeng Zhou created FLINK-30037: Summary: Improve the efficiency of Flink ML Python CI Key: FLINK-30037 URL: https://issues.apache.org/jira/browse/FLINK-30037 Project: Flink Issue Type: Improvement Components: Library / Machine Learning Affects Versions: ml-2.1.0 Reporter: Yunfeng Zhou It took about thirty minutes to execute Flink ML's python CI[1] for now, which has obviously affected the efficiency of Flink ML development. Thus we need to reduce the total execution time of Flink ML Python CI. [1] https://github.com/apache/flink-ml/actions/runs/3475256961 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30009) OperatorCoordinator.start()'s JavaDoc mismatches its behavior
Yunfeng Zhou created FLINK-30009: Summary: OperatorCoordinator.start()'s JavaDoc mismatches its behavior Key: FLINK-30009 URL: https://issues.apache.org/jira/browse/FLINK-30009 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.16.0 Reporter: Yunfeng Zhou The following description lies in the JavaDoc of {{OperatorCoordinator.start()}}. {{This method is called once at the beginning, before any other methods.}} This description is incorrect because the method {{resetToCheckpoint()}} can happen before {{start()}} is invoked. For example, {{RecreateOnResetOperatorCoordinator.DeferrableCoordinator.resetAndStart()}} uses these methods in this way. Thus the JavaDoc of {{OperatorCoordinator}}'s methods should be modified to match this behavior. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29843) Euclidean Distance Measure generates NAN distance values
Yunfeng Zhou created FLINK-29843: Summary: Euclidean Distance Measure generates NAN distance values Key: FLINK-29843 URL: https://issues.apache.org/jira/browse/FLINK-29843 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.1.0 Reporter: Yunfeng Zhou Currently Flink ML's `EuclideanDistanceMeasure.distance(...)` method might return a negative value as the distance between two vectors given the calculation accuracy of java doubles. This bug should be fixed to guarantee that the distance is a non-negative value. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29763) TaskManager heatbeat timeout exception in Github CI for python tests
Yunfeng Zhou created FLINK-29763: Summary: TaskManager heatbeat timeout exception in Github CI for python tests Key: FLINK-29763 URL: https://issues.apache.org/jira/browse/FLINK-29763 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.1.0 Reporter: Yunfeng Zhou https://github.com/apache/flink-ml/actions/runs/3322007330/jobs/5490434747 https://github.com/apache/flink-ml/actions/runs/3321223124/jobs/5488576891 https://github.com/apache/flink-ml/actions/runs/3319920091/jobs/5485672250 https://github.com/apache/flink-ml/actions/runs/3319722473/jobs/5485231041 https://github.com/apache/flink-ml/actions/runs/3319599111/jobs/5484952148 https://github.com/apache/flink-ml/actions/runs/3318938657/jobs/5483471010 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29604) Add Estimator and Transformer for CountVectorizer
Yunfeng Zhou created FLINK-29604: Summary: Add Estimator and Transformer for CountVectorizer Key: FLINK-29604 URL: https://issues.apache.org/jira/browse/FLINK-29604 Project: Flink Issue Type: New Feature Components: Library / Machine Learning Affects Versions: ml-2.2.0 Reporter: Yunfeng Zhou Fix For: ml-2.2.0 Add Estimator and Transformer for CountVectorizer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29602) Add Transformer for SQLTransformer
Yunfeng Zhou created FLINK-29602: Summary: Add Transformer for SQLTransformer Key: FLINK-29602 URL: https://issues.apache.org/jira/browse/FLINK-29602 Project: Flink Issue Type: New Feature Components: Library / Machine Learning Affects Versions: ml-2.2.0 Reporter: Yunfeng Zhou Fix For: ml-2.2.0 Add Transformer for SQLTransformer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29603) Add Transformer for StopWordsRemover
Yunfeng Zhou created FLINK-29603: Summary: Add Transformer for StopWordsRemover Key: FLINK-29603 URL: https://issues.apache.org/jira/browse/FLINK-29603 Project: Flink Issue Type: New Feature Components: Library / Machine Learning Affects Versions: ml-2.2.0 Reporter: Yunfeng Zhou Fix For: ml-2.2.0 Add Transformer for StopWordsRemover. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29600) Add Estimator and Transformer for BucketedRandomProjectionLSH
Yunfeng Zhou created FLINK-29600: Summary: Add Estimator and Transformer for BucketedRandomProjectionLSH Key: FLINK-29600 URL: https://issues.apache.org/jira/browse/FLINK-29600 Project: Flink Issue Type: New Feature Components: Library / Machine Learning Affects Versions: ml-2.2.0 Reporter: Yunfeng Zhou Fix For: ml-2.2.0 Add Estimator and Transformer for BucketedRandomProjectionLSH. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29601) Add Estimator and Transformer for UnivariateFeatureSelector
Yunfeng Zhou created FLINK-29601: Summary: Add Estimator and Transformer for UnivariateFeatureSelector Key: FLINK-29601 URL: https://issues.apache.org/jira/browse/FLINK-29601 Project: Flink Issue Type: New Feature Components: Library / Machine Learning Affects Versions: ml-2.2.0 Reporter: Yunfeng Zhou Fix For: ml-2.2.0 Add Estimator and Transformer for UnivariateFeatureSelector. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29599) Add Estimator and Transformer for MinHashLSH
Yunfeng Zhou created FLINK-29599: Summary: Add Estimator and Transformer for MinHashLSH Key: FLINK-29599 URL: https://issues.apache.org/jira/browse/FLINK-29599 Project: Flink Issue Type: New Feature Components: Library / Machine Learning Affects Versions: ml-2.2.0 Reporter: Yunfeng Zhou Fix For: ml-2.2.0 Add Estimator and Transformer for MinHashLSH. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29598) Add Estimator and Transformer for Imputer
Yunfeng Zhou created FLINK-29598: Summary: Add Estimator and Transformer for Imputer Key: FLINK-29598 URL: https://issues.apache.org/jira/browse/FLINK-29598 Project: Flink Issue Type: New Feature Components: Library / Machine Learning Affects Versions: ml-2.2.0 Reporter: Yunfeng Zhou Fix For: ml-2.2.0 Add Estimator and Transformer for Imputer -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29597) Add Estimator and Transformer for QuantileDiscretizer
Yunfeng Zhou created FLINK-29597: Summary: Add Estimator and Transformer for QuantileDiscretizer Key: FLINK-29597 URL: https://issues.apache.org/jira/browse/FLINK-29597 Project: Flink Issue Type: New Feature Components: Library / Machine Learning Affects Versions: ml-2.2.0 Reporter: Yunfeng Zhou Fix For: ml-2.2.0 Add Estimator and Transformer for QuantileDiscretizer -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29596) Add Estimator and Transformer for RFormula
Yunfeng Zhou created FLINK-29596: Summary: Add Estimator and Transformer for RFormula Key: FLINK-29596 URL: https://issues.apache.org/jira/browse/FLINK-29596 Project: Flink Issue Type: New Feature Affects Versions: ml-2.2.0 Reporter: Yunfeng Zhou Fix For: ml-2.2.0 Add Estimator and Transformer for RFormula. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29595) Add Estimator and Transformer for ChiSqSelector
Yunfeng Zhou created FLINK-29595: Summary: Add Estimator and Transformer for ChiSqSelector Key: FLINK-29595 URL: https://issues.apache.org/jira/browse/FLINK-29595 Project: Flink Issue Type: New Feature Components: Library / Machine Learning Affects Versions: ml-2.2.0 Reporter: Yunfeng Zhou Fix For: ml-2.2.0 Add the Estimator and Transformer for ChiSqSelector. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29591) Add built-in UDFs to convert between arrays and vectors
Yunfeng Zhou created FLINK-29591: Summary: Add built-in UDFs to convert between arrays and vectors Key: FLINK-29591 URL: https://issues.apache.org/jira/browse/FLINK-29591 Project: Flink Issue Type: New Feature Components: Library / Machine Learning Affects Versions: ml-2.1.0 Reporter: Yunfeng Zhou -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29545) kafka consuming stop when trigger first checkpoint
xiaogang zhou created FLINK-29545: - Summary: kafka consuming stop when trigger first checkpoint Key: FLINK-29545 URL: https://issues.apache.org/jira/browse/FLINK-29545 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Runtime / Network Affects Versions: 1.13.3 Reporter: xiaogang zhou -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29429) Add DataType for Flink ML linear algorithm classes
Yunfeng Zhou created FLINK-29429: Summary: Add DataType for Flink ML linear algorithm classes Key: FLINK-29429 URL: https://issues.apache.org/jira/browse/FLINK-29429 Project: Flink Issue Type: Improvement Components: Library / Machine Learning Affects Versions: ml-2.1.0 Reporter: Yunfeng Zhou DataType instances are used in Table API when creating Tables or Table UDFs. There are helper functions like `DataTypes.of()` that can be used to get the DataType for Flink ML classes like DenseVector in java, but this method is not applicable in pyflink, which seems not to support custom DataTypes yet. Thus we should add DataType subclasses for them in Flink ML. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29115) Improve the quickstart of Flink ML python API
Yunfeng Zhou created FLINK-29115: Summary: Improve the quickstart of Flink ML python API Key: FLINK-29115 URL: https://issues.apache.org/jira/browse/FLINK-29115 Project: Flink Issue Type: Improvement Components: Library / Machine Learning Affects Versions: ml-2.1.0 Reporter: Yunfeng Zhou Currently, in Flink ML's document, python users are required to build Flink ML Java project before they can build and use Flink ML's python API. Thus an improvement should be made to the setup process and quick start so as to simplify the usage of Flink ML. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29044) Add Transformer for DCT
Yunfeng Zhou created FLINK-29044: Summary: Add Transformer for DCT Key: FLINK-29044 URL: https://issues.apache.org/jira/browse/FLINK-29044 Project: Flink Issue Type: New Feature Components: Library / Machine Learning Reporter: Yunfeng Zhou Fix For: ml-2.2.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29043) Improve ML iteration efficiency
Yunfeng Zhou created FLINK-29043: Summary: Improve ML iteration efficiency Key: FLINK-29043 URL: https://issues.apache.org/jira/browse/FLINK-29043 Project: Flink Issue Type: Improvement Components: Library / Machine Learning Affects Versions: ml-2.1.0 Reporter: Yunfeng Zhou Currently, in Github Action, it takes about one minute to execute the unit tests of each algorithm that uses flink ml's iteration mechanism, including KMeansTest, LinearRegressionTest, LinearSVCTest, and LogisticRegressionTest[1]. We need to figure out which components in flink-ml-iteration have caused this phenomenon and seek to make corresponding improvements. [1] [https://github.com/apache/flink-ml/runs/7892649470?check_suite_focus=true] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28941) Savepoint ignores MaxConcurrentCheckpoint limit in aligned checkpoint case
Yunfeng Zhou created FLINK-28941: Summary: Savepoint ignores MaxConcurrentCheckpoint limit in aligned checkpoint case Key: FLINK-28941 URL: https://issues.apache.org/jira/browse/FLINK-28941 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.16.0 Reporter: Yunfeng Zhou When the unaligned checkpoint is disabled, savepoints would be set as forced[1], which means they can ignore the maxConcurrentCheckpoint limit[2] and lead to the situation that there are more than maxConcurrentCheckpoint running simultaneously. This behavior is incompatible with OperatorCoordinatorHolder, which requires that there should be at most one pending checkpoint at a time. As a result, exceptions, as follows, might be thrown[3]. {code:java} java.lang.IllegalStateException: Cannot mark for checkpoint 9, already marked for checkpoint 8 at org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl.markForCheckpoint(SubtaskGatewayImpl.java:185) ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinatorInternal$6(OperatorCoordinatorHolder.java:328) ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at java.util.HashMap.forEach(HashMap.java:1289) ~[?:1.8.0_292] at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.checkpointCoordinatorInternal(OperatorCoordinatorHolder.java:327) ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinator$0(OperatorCoordinatorHolder.java:243) ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453) ~[classes/:?] at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[classes/:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453) ~[classes/:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218) ~[classes/:?] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) ~[classes/:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~[classes/:?] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [akka-actor_2.12-2.6.15.jar:2.6.15] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [akka-actor_2.12-2.6.15.jar:2.6.15] at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [scala-library-2.12.7.jar:?] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [scala-library-2.12.7.jar:?] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [akka-actor_2.12-2.6.15.jar:2.6.15] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [scala-library-2.12.7.jar:?] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [scala-library-2.12.7.jar:?] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [scala-library-2.12.7.jar:?] at akka.actor.Actor.aroundReceive(Actor.scala:537) [akka-actor_2.12-2.6.15.jar:2.6.15] at akka.actor.Actor.aroundReceive$(Actor.scala:535) [akka-actor_2.12-2.6.15.jar:2.6.15] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [akka-actor_2.12-2.6.15.jar:2.6.15] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [akka-actor_2.12-2.6.15.jar:2.6.15] at akka.actor.ActorCell.invoke(ActorCell.scala:548) [akka-actor_2.12-2.6.15.jar:2.6.15] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [akka-actor_2.12-2.6.15.jar:2.6.15] at akka.dispatch.Mailbox.run(Mailbox.scala:231) [akka-actor_2.12-2.6.15.jar:2.6.15] at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [akka-actor_2.12-2.6.15.jar:2.6.15] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_292] at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_292] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_292] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_292] {code} [1] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java#L160-L164 [2] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L444-L449 [3] https://dev.azure.com/apache-flink/apac
[jira] [Created] (FLINK-28761) BinaryClassificationEvaluator cannot work with unaligned checkpoint
Yunfeng Zhou created FLINK-28761: Summary: BinaryClassificationEvaluator cannot work with unaligned checkpoint Key: FLINK-28761 URL: https://issues.apache.org/jira/browse/FLINK-28761 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.1.0 Reporter: Yunfeng Zhou If we make {{BinaryClassificationEvaluatorTest}} extend {{AbstractTestBase}}, this test class would throw the following exceptions during execution: {code:java} org.apache.flink.table.api.TableException: Failed to execute sql at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:854) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1317) at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:605) at org.apache.flink.ml.evaluation.BinaryClassificationEvaluatorTest.testEvaluateWithWeight(BinaryClassificationEvaluatorTest.java:305) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38) at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) Caused by: java.lang.UnsupportedOperationException: Unaligned checkpoints are currently not supported for custom partitioners, as rescaling is not guaranteed to work correctly. The user can force Unaligned Checkpoints by using 'execution.checkpointing.unaligned.forced' at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:390) at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:166) at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:121) at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:993) at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:50) at org.apach
[jira] [Created] (FLINK-28673) Migrate Flink ML to Flink 1.15.1
Yunfeng Zhou created FLINK-28673: Summary: Migrate Flink ML to Flink 1.15.1 Key: FLINK-28673 URL: https://issues.apache.org/jira/browse/FLINK-28673 Project: Flink Issue Type: Improvement Components: Library / Machine Learning Affects Versions: ml-2.2.0 Reporter: Yunfeng Zhou Update Flink ML's Flink dependency to 1.15.1 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28639) Preserve distributed consistency of OperatorEvents from subtasks to OperatorCoordinator
Yunfeng Zhou created FLINK-28639: Summary: Preserve distributed consistency of OperatorEvents from subtasks to OperatorCoordinator Key: FLINK-28639 URL: https://issues.apache.org/jira/browse/FLINK-28639 Project: Flink Issue Type: Sub-task Components: Runtime / Checkpointing Affects Versions: 1.14.3 Reporter: Yunfeng Zhou Fix For: 1.16.0 This is the second step to solving the consistency issue of OC communications. In this step, we would also guarantee the consistency of operator events sent from subtasks to OCs. Combined with the other subtask to preserve the consistency of communications in the reverse direction, all communications between OC and subtasks would be consistent across checkpoints and global failovers. To achieve the goal of this step, we need to add closing/reopening functions to the subtasks' gateways and make the subtasks aware of a checkpoint before they receive the checkpoint barriers. The general process would be as follows. 1. When the OC starts checkpoint, it notifies all subtasks about this information. 2. After being notified about the ongoing checkpoint in OC, a subtask sends a special operator event to its OC, which is the last operator event the OC could receive from the subtask before the subtask completes the checkpoint. Then the subtask closes its gateway. 3. After receiving this special event from all subtasks, the OC finishes its checkpoint and closes its gateway. Then the checkpoint coordinator sends checkpoint barriers to the sources. 4. If the subtask or the OC generate any event to send to each other, they buffer the events locally. 5. When a subtask starts checkpointing, it also stores the buffered events in the checkpoint. 6. After the subtask completes the checkpoint, communications in both directions are recovered and the buffered events are sent out. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28606) Preserve distributed consistency of OperatorEvents from OperatorCoordinator to subtasks
Yunfeng Zhou created FLINK-28606: Summary: Preserve distributed consistency of OperatorEvents from OperatorCoordinator to subtasks Key: FLINK-28606 URL: https://issues.apache.org/jira/browse/FLINK-28606 Project: Flink Issue Type: Sub-task Components: Runtime / Checkpointing Affects Versions: 1.14.3 Reporter: Yunfeng Zhou Fix For: 1.16.0 This is a component of our solution to the consistency issue in the operator coordinator mechanism. In this step, we would guarantee the consistency of all communications in one direction, from OC to subtasks. This would need less workload and should unblock the implementation of the CEP coordinator in FLIP-200. Roughly, we would need to implement the following process in this step. # Let the OC finish processing all the incoming OperatorEvents before the snapshot. # Closes the gateway that sends operator events to its subtasks when the OC completes snapshot. # Wait until all the outgoing OperatorEvents before the snapshot are sent and acked. # Send checkpoint barriers to the Source operators. # Open the corresponding gateway of a subtask when the OC learned that the subtask has completed the checkpoint. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28455) pyflink tableResult collect to local timeout
zhou created FLINK-28455: Summary: pyflink tableResult collect to local timeout Key: FLINK-28455 URL: https://issues.apache.org/jira/browse/FLINK-28455 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.13.0 Reporter: zhou -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28224) Add document for algorithms and features in Flink ML 2.1
Yunfeng Zhou created FLINK-28224: Summary: Add document for algorithms and features in Flink ML 2.1 Key: FLINK-28224 URL: https://issues.apache.org/jira/browse/FLINK-28224 Project: Flink Issue Type: Improvement Components: Library / Machine Learning Affects Versions: ml-2.1.0 Reporter: Yunfeng Zhou The algorithms and new features introduced in Flink ML 2.1 needs to be documented and displayed on Flink ML's document website. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27798) Migrate Flink ML to Flink 1.15.0
Yunfeng Zhou created FLINK-27798: Summary: Migrate Flink ML to Flink 1.15.0 Key: FLINK-27798 URL: https://issues.apache.org/jira/browse/FLINK-27798 Project: Flink Issue Type: Improvement Components: Library / Machine Learning Affects Versions: ml-2.1.0 Reporter: Yunfeng Zhou Update Flink ML's Flink dependency to 1.15.0 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27797) PythonTableUtils.getCollectionInputFormat cannot correctly handle None values
Yunfeng Zhou created FLINK-27797: Summary: PythonTableUtils.getCollectionInputFormat cannot correctly handle None values Key: FLINK-27797 URL: https://issues.apache.org/jira/browse/FLINK-27797 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.15.0 Reporter: Yunfeng Zhou In `PythonTableUtils.getCollectionInputFormat` there are implementations like follows. This code can be found at [https://github.com/apache/flink/blob/8488368b86a99a064446ca74e775b670b94a/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonTableUtils.java#L515] ``` c -> { if (c.getClass() != byte[].class || dataType instanceof PickledByteArrayTypeInfo) { return c; } ``` Here, the generated function did not check `c != null` before doing `c.getClass()`. which might cause that tables created through pyflink cannot parse it when values are `None`. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27742) Fix Compatibility Issues Between Flink ML Operators.
Yunfeng Zhou created FLINK-27742: Summary: Fix Compatibility Issues Between Flink ML Operators. Key: FLINK-27742 URL: https://issues.apache.org/jira/browse/FLINK-27742 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.0.0 Reporter: Yunfeng Zhou It is discovered that StringIndexer and LogisticRegression in Flink ML cannot be connected in a pipeline. The reason is that the output label column of StringIndexer is integer, while LogisticRegression can only accept input data whose labels are doubles. In order to make Flink ML stages compatible with each other, the following changes need to be made. - For stages who can only accept double-typed inputs, update their implementation to accept any numerical type. - For stages that generates labels as integers, make them return labels as doubles. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27352) [JUnit5 Migration] Module: flink-json
EMing Zhou created FLINK-27352: -- Summary: [JUnit5 Migration] Module: flink-json Key: FLINK-27352 URL: https://issues.apache.org/jira/browse/FLINK-27352 Project: Flink Issue Type: Sub-task Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.16.0 Reporter: EMing Zhou -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27084) Perround mode recreating operator fails
Yunfeng Zhou created FLINK-27084: Summary: Perround mode recreating operator fails Key: FLINK-27084 URL: https://issues.apache.org/jira/browse/FLINK-27084 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.0.0 Environment: Flink 1.14.0, Flink ML 2.0.0 Reporter: Yunfeng Zhou When I was trying to submit a job containing Flink ML KMeans operator to a Flink cluster, the following exception is thrown out. {code:java} The program finished with the following exception:org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 8ba9d3173d1c83eb4803298f81349aea) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 8ba9d3173d1c83eb4803298f81349aea) at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1898) at org.apache.flink.ml.benchmark.BenchmarkUtils.runBenchmark(BenchmarkUtils.java:127) at org.apache.flink.ml.benchmark.BenchmarkUtils.runBenchmark(BenchmarkUtils.java:84) at org.apache.flink.ml.benchmark.Benchmark.main(Benchmark.java:50) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ... 8 more Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 8ba9d3173d1c83eb4803298f81349aea) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$26(RestClusterClient.java:698) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943) at java.util.concurrent.CompletableFuture$Com
[jira] [Created] (FLINK-26705) use threadlocal to decrease ObjectSizeCalculator memory use
hd zhou created FLINK-26705: --- Summary: use threadlocal to decrease ObjectSizeCalculator memory use Key: FLINK-26705 URL: https://issues.apache.org/jira/browse/FLINK-26705 Project: Flink Issue Type: Improvement Reporter: hd zhou in class ObjectSizeCalculator everytime call static function getObjectSize will new ObjectSizeCalculator, cost much memory, Gc busy. use threadlocal will decrease memory use {code:java} /** * Given an object, returns the total allocated size, in bytes, of the object and all other objects reachable from it. * Attempts to to detect the current JVM memory layout, but may fail with {@link UnsupportedOperationException}; * * @param obj the object; can be null. Passing in a {@link java.lang.Class} object doesn't do anything special, it *measures the size of all objects reachable through it (which will include its class loader, and by *extension, all other Class objects loaded by the same loader, and all the parent class loaders). It doesn't *provide the size of the static fields in the JVM class that the Class object represents. * @return the total allocated size of the object and all other objects it retains. * @throws UnsupportedOperationException if the current vm memory layout cannot be detected. */ public static long getObjectSize(Object obj) throws UnsupportedOperationException { return obj == null ? 0 : new ObjectSizeCalculator(CurrentLayout.SPEC).calculateObjectSize(obj); } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26313) Support Online KMeans in Flink ML
Yunfeng Zhou created FLINK-26313: Summary: Support Online KMeans in Flink ML Key: FLINK-26313 URL: https://issues.apache.org/jira/browse/FLINK-26313 Project: Flink Issue Type: New Feature Reporter: Yunfeng Zhou Modify Flink ML's KMeans algorithm to support online model training and update. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26263) Check data size in LogisticRegression
Yunfeng Zhou created FLINK-26263: Summary: Check data size in LogisticRegression Key: FLINK-26263 URL: https://issues.apache.org/jira/browse/FLINK-26263 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.0.0 Reporter: Yunfeng Zhou In Flink ML LogisticRegression, the algorithm would fail if the parallelism is larger than input data size. For example, in `LogisticRegressionTest.testFitAndPredict()` if we add the following code ```java env.setParallelism(12); ``` Then the test case would fail with the following exception ``` Caused by: java.lang.IllegalArgumentException: bound must be positive at java.base/java.util.Random.nextInt(Random.java:388) at org.apache.flink.ml.classification.logisticregression.LogisticRegression$CacheDataAndDoTrain.getMiniBatchData(LogisticRegression.java:351) at org.apache.flink.ml.classification.logisticregression.LogisticRegression$CacheDataAndDoTrain.onEpochWatermarkIncremented(LogisticRegression.java:381) at org.apache.flink.iteration.operator.AbstractWrapperOperator.notifyEpochWatermarkIncrement(AbstractWrapperOperator.java:129) at org.apache.flink.iteration.operator.allround.AbstractAllRoundWrapperOperator.lambda$1(AbstractAllRoundWrapperOperator.java:105) at org.apache.flink.iteration.operator.OperatorUtils.processOperatorOrUdfIfSatisfy(OperatorUtils.java:79) at org.apache.flink.iteration.operator.allround.AbstractAllRoundWrapperOperator.onEpochWatermarkIncrement(AbstractAllRoundWrapperOperator.java:102) at org.apache.flink.iteration.progresstrack.OperatorEpochWatermarkTracker.tryUpdateLowerBound(OperatorEpochWatermarkTracker.java:79) at org.apache.flink.iteration.progresstrack.OperatorEpochWatermarkTracker.onEpochWatermark(OperatorEpochWatermarkTracker.java:63) at org.apache.flink.iteration.operator.AbstractWrapperOperator.onEpochWatermarkEvent(AbstractWrapperOperator.java:121) at org.apache.flink.iteration.operator.allround.TwoInputAllRoundWrapperOperator.processElement(TwoInputAllRoundWrapperOperator.java:77) at org.apache.flink.iteration.operator.allround.TwoInputAllRoundWrapperOperator.processElement2(TwoInputAllRoundWrapperOperator.java:59) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord2(StreamTwoInputProcessorFactory.java:225) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$1(StreamTwoInputProcessorFactory.java:194) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:86) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run(Thread.java:834) ``` The cause of this exception is that LogisticRegression has not considered the case when input data size is 0. This can be resolved by adding an additional check. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26100) Set up Flink ML Document Website
Yunfeng Zhou created FLINK-26100: Summary: Set up Flink ML Document Website Key: FLINK-26100 URL: https://issues.apache.org/jira/browse/FLINK-26100 Project: Flink Issue Type: New Feature Components: Library / Machine Learning Affects Versions: ml-2.0.0 Reporter: Yunfeng Zhou Set up Flink ML's document website based on flink document and statefun document website. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25444) ExecutionConfig can not be Serializable due to TextElement used in side
Wen Zhou created FLINK-25444: Summary: ExecutionConfig can not be Serializable due to TextElement used in side Key: FLINK-25444 URL: https://issues.apache.org/jira/browse/FLINK-25444 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.14.2, 1.14.0, 1.14.3 Reporter: Wen Zhou This should be a bug introduced by the latest flink commit of file [flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java|https://github.com/apache/flink/commit/9e0e0929b86c372c9243daad9d654af9e7718708#diff-7a439abdf207cf6da8aa6c147b38c1346820fe786afbf652bc614fc377cdf238] Diff the file, we can find TextElement is used here where ClosureCleanerLevel is is used as a memeber of Serializable ExecutionConfig. [TextElement in ClosureCleanerLevel|https://i.stack.imgur.com/ky3d8.png] The simplest way to verify the problem is running code as followings in flink 1.13.5 and 1.14.x, the exception is reproduced in 1.14.x . And the diff between 1.13.5 and 1.14.x is only lates commit. {{@Testpublic void testExecutionConfigSerializable() throws Exception { ExecutionConfig config = new ExecutionConfig(); ClosureCleaner.clean(config, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); }}} The problem can be found here https://stackoverflow.com/questions/70443743/flink-blockelement-exception-when-updating-to-version-1-14-2/70468925?noredirect=1#comment124597510_70468925 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24955) Add One-hot Encoder to Flink ML
Yunfeng Zhou created FLINK-24955: Summary: Add One-hot Encoder to Flink ML Key: FLINK-24955 URL: https://issues.apache.org/jira/browse/FLINK-24955 Project: Flink Issue Type: New Feature Components: Library / Machine Learning Reporter: Yunfeng Zhou Add One-hot Encoder to Flink ML -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24817) Support Naive Bayes algorithm in Flink ML
Yunfeng Zhou created FLINK-24817: Summary: Support Naive Bayes algorithm in Flink ML Key: FLINK-24817 URL: https://issues.apache.org/jira/browse/FLINK-24817 Project: Flink Issue Type: New Feature Components: Library / Machine Learning Reporter: Yunfeng Zhou This ticket aims to add Naive Bayes algorithm to Flink ML. The algorithm will use latest Flink ML API proposed in FLIP 173~176. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24059) SourceReaderTestBase should allow NUM_SPLITS to be overridden in implementation
Brian Zhou created FLINK-24059: -- Summary: SourceReaderTestBase should allow NUM_SPLITS to be overridden in implementation Key: FLINK-24059 URL: https://issues.apache.org/jira/browse/FLINK-24059 Project: Flink Issue Type: Bug Components: Connectors / Common Affects Versions: 1.13.2 Reporter: Brian Zhou Fix For: 1.14.0 Pravega Flink connector is trying to implement the FLIP-27 sources and trying to map the Pravega reader into the split. This leads to a one-to-one mapping for source reader and splits. For unit tests, Flink has offered the {{SourceReaderTestBase}} class to test more easily, but it has a {{final}} constraint in the NUM_SPLITS constant which the value is 10, which makes us hard to integrate. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23799) application mode not support pyflink
zhou created FLINK-23799: Summary: application mode not support pyflink Key: FLINK-23799 URL: https://issues.apache.org/jira/browse/FLINK-23799 Project: Flink Issue Type: Improvement Affects Versions: 1.13.2 Environment: flink 1.13 Reporter: zhou I do this: /space/flink/bin/flink run-application -t yarn-application -Dyarn.application.name=kafka-hive --pyFiles /space/testAirflow/airflow/dags/ -py /space/testAirflow/airflow/dags/chloe/kafka_to_hive_1.py flink throw a Exception : java.lang.IllegalArgumentException: Should only have one jar Does application mode not support pyflink? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23353) UDTAGG can't execute in Batch mode
hayden zhou created FLINK-23353: --- Summary: UDTAGG can't execute in Batch mode Key: FLINK-23353 URL: https://issues.apache.org/jira/browse/FLINK-23353 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.13.1 Reporter: hayden zhou {code:java} public class Top2Test { public static void main(String[] args) { EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode()build(); TableEnvironment tEnv = TableEnvironment.create(settings); Table sourceTable = tEnv.fromValues( DataTypes.ROW( DataTypes.FIELD("id", DataTypes.INT()), DataTypes.FIELD("name",DataTypes.STRING()), DataTypes.FIELD("price", DataTypes.INT()) ), row(1, "hayden", 18), row(3, "hayden", 19), row(4, "hayden", 20), row(2, "jaylin", 20) ); tEnv.createTemporaryView("source", sourceTable); Table rT = tEnv.from("source") .groupBy($("name")) .flatAggregate(call(Top2.class, $("price")).as("price", "rank")) .select($("name"), $("price"), $("rank")); rT.execute().print(); } public static class Top2Accumulator { public Integer first; public Integer second; } public static class Top2 extends TableAggregateFunction, Top2Accumulator> { @Override public Top2Accumulator createAccumulator() { Top2Accumulator acc = new Top2Accumulator(); acc.first = Integer.MIN_VALUE; acc.second = Integer.MIN_VALUE; return acc; } public void accumulate(Top2Accumulator acc, Integer value) { if (value > acc.first) { acc.second = acc.first; acc.first = value; } else if (value > acc.second) { acc.second = value; } } public void merge(Top2Accumulator acc, Iterable it) { for (Top2Accumulator otherAcc : it) { accumulate(acc, otherAcc.first); accumulate(acc, otherAcc.second); } } public void emitValue(Top2Accumulator acc, Collector> out) { if (acc.first != Integer.MIN_VALUE) { out.collect(Tuple2.of(acc.first, 1)); } if (acc.second != Integer.MIN_VALUE) { out.collect(Tuple2.of(acc.second, 2)); } } } } {code} got errors as below: Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: LogicalSink(table=[default_catalog.default_database.Unregistered_Collect_Sink_1], fields=[name, price, rank]) +- LogicalProject(name=[AS($0, _UTF-16LE'name')], price=[AS($1, _UTF-16LE'price')], rank=[AS($2, _UTF-16LE'rank')]) +- LogicalTableAggregate(group=[{1}], tableAggregate=[[flinktest$Top2Test$Top2$4619034833a29d53c136506047509219($2)]]) +- LogicalUnion(all=[true]) :- LogicalProject(id=[CAST(1):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], price=[CAST(18):INTEGER]) : +- LogicalValues(tuples=[[{ 0 }]]) :- LogicalProject(id=[CAST(3):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], price=[CAST(19):INTEGER]) : +- LogicalValues(tuples=[[{ 0 }]]) :- LogicalProject(id=[CAST(4):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], price=[CAST(20):INTEGER]) : +- LogicalValues(tuples=[[{ 0 }]]) +- LogicalProject(id=[CAST(2):INTEGER], name=[CAST(_UTF-16LE'jaylin':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], price=[CAST(20):INTEGER]) +- LogicalValues(tuples=[[{ 0 }]]) This exception indicates that the query uses an unsupported SQL feature. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:72) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.col
[jira] [Created] (FLINK-22689) Table API Documentation Row-Based Operations Example Fails
Yunfeng Zhou created FLINK-22689: Summary: Table API Documentation Row-Based Operations Example Fails Key: FLINK-22689 URL: https://issues.apache.org/jira/browse/FLINK-22689 Project: Flink Issue Type: Bug Components: Table SQL / Ecosystem Affects Versions: 1.12.1 Reporter: Yunfeng Zhou I wrote the following program according to the example code provided in [Documentation/Table API/Row-based operations|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/tableapi/#row-based-operations] public class TableUDF { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); Table input = tEnv.fromValues( DataTypes.of("ROW"), Row.of("name") ); ScalarFunction func = new MyMapFunction(); tEnv.registerFunction("func", func); Table table = input .map(call("func", $("c")).as("a", "b")); // exception occurs here table.execute().print(); } public static class MyMapFunction extends ScalarFunction { public Row eval(String a) { return Row.of(a, "pre-" + a); } @Override public TypeInformation getResultType(Class[] signature) { return Types.ROW(Types.STRING, Types.STRING); } } } The code above would throw an exception like this: Exception in thread "main" org.apache.flink.table.api.ValidationException: Only a scalar function can be used in the map operator. at org.apache.flink.table.operations.utils.OperationTreeBuilder.map(OperationTreeBuilder.java:480) at org.apache.flink.table.api.internal.TableImpl.map(TableImpl.java:519) at org.apache.flink.ml.common.function.TableUDFBug.main(TableUDF.java:29) The core of the program above is identical to that provided in flink documentation, but it cannot function correctly. This might affect users who want to use custom function with table API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22460) Conversion to relational algebra failed caused by ''
Haiwei Zhou created FLINK-22460: --- Summary: Conversion to relational algebra failed caused by '' Key: FLINK-22460 URL: https://issues.apache.org/jira/browse/FLINK-22460 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.12.1 Reporter: Haiwei Zhou Flink complains that an insert sql doesn't match the table schema. The validated type is missing a "NOT NULL" modifier. {code:java} py4j.protocol.Py4JJavaError: An error occurred while calling o18.executeSql. : java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes: validated type: RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" request, CHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$1, BIGINT number, TIMESTAMP(3) start_time, TIMESTAMP(3) end_time) NOT NULL converted type: RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" request, CHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$1, BIGINT NOT NULL number, TIMESTAMP(3) start_time, TIMESTAMP(3) end_time) NOT NULL{code} {code:java} table_env.execute_sql(''' CREATE TABLE preload_stats ( lineitems STRING, itype STRING, number BIGINT NOT NULL, start_time TIMESTAMP(3), end_time TIMESTAMP(3) )''' table_env.execute_sql( "SELECT request, 'request', number, start_time, end_time " "FROM result_1 ").execute_insert('preload_stats') {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22133) SplitEmumerator does not provide checkpoint id in snapshot
Brian Zhou created FLINK-22133: -- Summary: SplitEmumerator does not provide checkpoint id in snapshot Key: FLINK-22133 URL: https://issues.apache.org/jira/browse/FLINK-22133 Project: Flink Issue Type: Bug Components: Connectors / Common Affects Versions: 1.12.0 Reporter: Brian Zhou In ExternallyInducedSource API, the checkpoint trigger exposes the checkpoint Id for the external client to identify the checkpoint. However, in the FLIP-27 source, the SplitEmumerator::snapshot() is an no-arg method. The connector cannot track the checkpoint ID from Flink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22059) add a new option is rocksdb statebackend to enable job threads setting
xiaogang zhou created FLINK-22059: - Summary: add a new option is rocksdb statebackend to enable job threads setting Key: FLINK-22059 URL: https://issues.apache.org/jira/browse/FLINK-22059 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 1.12.2 Reporter: xiaogang zhou As discussed in FLINK-21688 , now we are using the setIncreaseParallelism function to set the number of rocksdb's working threads. can we enable another setting key to set the rocksdb's max backgroud jobs which will set a large flush thread number. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22047) Could not find FLINSHED Flink job and can't submit job
hayden zhou created FLINK-22047: --- Summary: Could not find FLINSHED Flink job and can't submit job Key: FLINK-22047 URL: https://issues.apache.org/jira/browse/FLINK-22047 Project: Flink Issue Type: Bug Components: Deployment / Kubernetes Affects Versions: 1.12.2 Reporter: hayden zhou Could not find FLINSHED Flink job, and aways can't submit job by insufficient slot -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22010) when flink executed union all opeators,exception occured
zhou created FLINK-22010: Summary: when flink executed union all opeators,exception occured Key: FLINK-22010 URL: https://issues.apache.org/jira/browse/FLINK-22010 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.12.1 Reporter: zhou *when I executed job on 1.11.2,the job no exception,when I executed job on 1.12.1 or 1.12.2 ,the job occured some exception.* *code as the following:* {quote}result = result1.union_all(result2) result = result.union_all(result3) # .union_all(result4).union_all(result5).union_all(result5).union_all(result6).union_all(result7) result.execute().print() {quote} above the code, when i comment the code as the following,the code also no exception on flink 1.12.1 : {quote}result = result1.union_all(result2) #result = result.union_all(result3) # .union_all(result4).union_all(result5).union_all(result5).union_all(result6).union_all(result7) result.execute().print() {quote} I dont know how to solve the problems, May be someone could help me? Excepion as the following: {quote}py4j.protocol.Py4JJavaError: An error occurred while calling o340.print. : java.lang.RuntimeException: Failed to fetch next result at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) at org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:117) at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350) at org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:149) at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:154) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Failed to fetch job execution result at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:169) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118) at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ... 16 more Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 9ba6325f27c97192e42e76bd52d05db8) at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:167) ... 18 more Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 9ba6325f27c97192e42e76bd52d05db8) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394) at java.util.concurrent.CompletableFuture.uniWhenCo
[jira] [Created] (FLINK-22008) writing metadata is not an atomic operation, we should add a commit logic
xiaogang zhou created FLINK-22008: - Summary: writing metadata is not an atomic operation, we should add a commit logic Key: FLINK-22008 URL: https://issues.apache.org/jira/browse/FLINK-22008 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Affects Versions: 1.12.2 Reporter: xiaogang zhou writing metadata is not an atomic operation, some logic can cause there is a metadata file in the checkpoint dir, but the data is corrupted if the jobmanager crash while writing the metadata. So we should consider to add commit operation in the checkpoint storage location -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21840) can't submit flink k8s session job with kubernetes.rest-service.exposed.type=NodePort
hayden zhou created FLINK-21840: --- Summary: can't submit flink k8s session job with kubernetes.rest-service.exposed.type=NodePort Key: FLINK-21840 URL: https://issues.apache.org/jira/browse/FLINK-21840 Project: Flink Issue Type: Bug Components: Runtime / REST Affects Versions: 1.12.2 Environment: flink on native kubernetes with session mode Reporter: hayden zhou I have created a flink session cluster by `kubernetes-session` command with -Dkubernetes.rest-service.exposed.type=NodePort options, because we don't want to expose the rest service external. when I submit flink job by `flink run --target kubernetes-session xxx` i found this command will automatically find the Kubernetes ApiServer address as the Node address. But my ApiService address IP is not in the node ips of k8s cluster. can I specific a node IP explicitly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21688) we use setIncreaseParallelism function, can cause slow flush in restore
xiaogang zhou created FLINK-21688: - Summary: we use setIncreaseParallelism function, can cause slow flush in restore Key: FLINK-21688 URL: https://issues.apache.org/jira/browse/FLINK-21688 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 1.11.3 Reporter: xiaogang zhou we use setIncreaseParallelism function, can cause slow flush in restore rescaling case. As this function limits the HIGH threads to 1. Why not set the MAX jobs to 40, which will offer more flush thread to enable a fast recovery -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21543) when using FIFO compaction, I found sst being deleted on the first checkpoint
xiaogang zhou created FLINK-21543: - Summary: when using FIFO compaction, I found sst being deleted on the first checkpoint Key: FLINK-21543 URL: https://issues.apache.org/jira/browse/FLINK-21543 Project: Flink Issue Type: Bug Components: Runtime / State Backends Reporter: xiaogang zhou 2021/03/01-18:51:01.202049 7f59042fc700 (Original Log Time 2021/03/01-18:51:01.200883) [/compaction/compaction_picker_fifo.cc:107] [_timer_state/processing_user-timers] FIFO compaction: picking file 1710 with creation time 0 for deletion the configuration is like currentOptions.setCompactionStyle(getCompactionStyle()); currentOptions.setLevel0FileNumCompactionTrigger(8); // currentOptions.setMaxTableFilesSizeFIFO(MemorySize.parse("2gb").getBytes()); CompactionOptionsFIFO compactionOptionsFIFO = new CompactionOptionsFIFO(); compactionOptionsFIFO.setMaxTableFilesSize(MemorySize.parse("8gb").getBytes()); compactionOptionsFIFO.setAllowCompaction(true); the rocksdb version is io.github.myasuka frocksdbjni 6.10.2-ververica-3.0 I think the problem is caused by manifest file is not uploaded by flink. Can any one suggest how i can skip this problem? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21472) encingTokenException: Fencing token mismatch
hayden zhou created FLINK-21472: --- Summary: encingTokenException: Fencing token mismatch Key: FLINK-21472 URL: https://issues.apache.org/jira/browse/FLINK-21472 Project: Flink Issue Type: Bug Components: Deployment / Kubernetes Affects Versions: 1.12.1 Reporter: hayden zhou Attachments: flink--standalonesession-0-mta-flink-jobmanager-864d6c8cbb-rmsxw.log org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler [] - Unhandled exception. org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token mismatch: Ignoring message LocalFencedMessage(8fac01d8e3e3988223a2e5c6e3f04f1e, LocalRpcInvocation(requestMultipleJobDetails(Time))) because the fencing token 8fac01d8e3e3988223a2e5c6e3f04f1e did not match the expected fencing token 8c37414f464bca76144e6cabc946474b. -- This message was sent by Atlassian Jira (v8.3.4#803005)