[jira] [Created] (FLINK-18997) Rename type_info to result_type to make it more clear
Hequn Cheng created FLINK-18997: --- Summary: Rename type_info to result_type to make it more clear Key: FLINK-18997 URL: https://issues.apache.org/jira/browse/FLINK-18997 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Hequn Cheng Assignee: Hequn Cheng Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18985) Update the Sphinx doc for Python DataStream API.
Hequn Cheng created FLINK-18985: --- Summary: Update the Sphinx doc for Python DataStream API. Key: FLINK-18985 URL: https://issues.apache.org/jira/browse/FLINK-18985 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Hequn Cheng Assignee: Hequn Cheng Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18984) Add tutorial documentation for Python DataStream API
Hequn Cheng created FLINK-18984: --- Summary: Add tutorial documentation for Python DataStream API Key: FLINK-18984 URL: https://issues.apache.org/jira/browse/FLINK-18984 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Hequn Cheng Assignee: Hequn Cheng Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18966) Support key_by() on ConnectedStreams for Python DataStream API
Hequn Cheng created FLINK-18966: --- Summary: Support key_by() on ConnectedStreams for Python DataStream API Key: FLINK-18966 URL: https://issues.apache.org/jira/browse/FLINK-18966 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Hequn Cheng Assignee: Hequn Cheng Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18954) Add documentation for Metrics in Python DataStream API.
Hequn Cheng created FLINK-18954: --- Summary: Add documentation for Metrics in Python DataStream API. Key: FLINK-18954 URL: https://issues.apache.org/jira/browse/FLINK-18954 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Hequn Cheng Assignee: Hequn Cheng Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18953) Add documentation for DataTypes in Python DataStream API
Hequn Cheng created FLINK-18953: --- Summary: Add documentation for DataTypes in Python DataStream API Key: FLINK-18953 URL: https://issues.apache.org/jira/browse/FLINK-18953 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Hequn Cheng Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18952) Add 10 minutes to DataStream API documentation
Hequn Cheng created FLINK-18952: --- Summary: Add 10 minutes to DataStream API documentation Key: FLINK-18952 URL: https://issues.apache.org/jira/browse/FLINK-18952 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Hequn Cheng Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18949) Support Streaming File Sink for Python DataStream API
Hequn Cheng created FLINK-18949: --- Summary: Support Streaming File Sink for Python DataStream API Key: FLINK-18949 URL: https://issues.apache.org/jira/browse/FLINK-18949 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Hequn Cheng Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18948) Add end to end test for Python DataStream API
Hequn Cheng created FLINK-18948: --- Summary: Add end to end test for Python DataStream API Key: FLINK-18948 URL: https://issues.apache.org/jira/browse/FLINK-18948 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Hequn Cheng Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18947) Support partitionCustom() operation for Python DataStream API
Hequn Cheng created FLINK-18947: --- Summary: Support partitionCustom() operation for Python DataStream API Key: FLINK-18947 URL: https://issues.apache.org/jira/browse/FLINK-18947 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Hequn Cheng Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18945) Support key_by() on ConnectedStreams for Python DataStream API
Hequn Cheng created FLINK-18945: --- Summary: Support key_by() on ConnectedStreams for Python DataStream API Key: FLINK-18945 URL: https://issues.apache.org/jira/browse/FLINK-18945 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Hequn Cheng Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18943) Support connect() operation for Python DataStream API
Hequn Cheng created FLINK-18943: --- Summary: Support connect() operation for Python DataStream API Key: FLINK-18943 URL: https://issues.apache.org/jira/browse/FLINK-18943 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Hequn Cheng Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18883) Support reduce() operation for Python KeyedStream.
Hequn Cheng created FLINK-18883: --- Summary: Support reduce() operation for Python KeyedStream. Key: FLINK-18883 URL: https://issues.apache.org/jira/browse/FLINK-18883 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Hequn Cheng Assignee: Hequn Cheng Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18817) 'Kerberized YARN per-job on Docker test' failed
Hequn Cheng created FLINK-18817: --- Summary: 'Kerberized YARN per-job on Docker test' failed Key: FLINK-18817 URL: https://issues.apache.org/jira/browse/FLINK-18817 Project: Flink Issue Type: Test Components: Tests Reporter: Hequn Cheng The end-to-end test failed due to some AccessControlException: https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_apis/build/builds/5169/logs/125 {code} 2020-08-04T13:13:10.2755424Z Failing this attempt.Diagnostics: Failed on local exception: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host is: "worker1.docker-hadoop-cluster-network/172.19.0.5"; destination host is: "master.docker-hadoop-cluster-network":9000; 2020-08-04T13:13:10.2757620Z java.io.IOException: Failed on local exception: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host is: "worker1.docker-hadoop-cluster-network/172.19.0.5"; destination host is: "master.docker-hadoop-cluster-network":9000; 2020-08-04T13:13:10.2758550Zat org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:782) 2020-08-04T13:13:10.2758960Zat org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1493) 2020-08-04T13:13:10.2759321Zat org.apache.hadoop.ipc.Client.call(Client.java:1435) 2020-08-04T13:13:10.2759676Zat org.apache.hadoop.ipc.Client.call(Client.java:1345) 2020-08-04T13:13:10.2760305Zat org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227) 2020-08-04T13:13:10.2760743Zat org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) 2020-08-04T13:13:10.2761087Zat com.sun.proxy.$Proxy11.getFileInfo(Unknown Source) 2020-08-04T13:13:10.2761521Zat org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:796) 2020-08-04T13:13:10.2761964Zat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2020-08-04T13:13:10.2762310Zat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 2020-08-04T13:13:10.2762741Zat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2020-08-04T13:13:10.2763105Zat java.lang.reflect.Method.invoke(Method.java:498) 2020-08-04T13:13:10.2763503Zat org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409) 2020-08-04T13:13:10.2763979Zat org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163) 2020-08-04T13:13:10.2764474Zat org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155) 2020-08-04T13:13:10.2764944Zat org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) 2020-08-04T13:13:10.2765417Zat org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346) 2020-08-04T13:13:10.2765770Zat com.sun.proxy.$Proxy12.getFileInfo(Unknown Source) 2020-08-04T13:13:10.2766093Zat org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1649) 2020-08-04T13:13:10.2766489Zat org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1440) 2020-08-04T13:13:10.2767209Zat org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1437) 2020-08-04T13:13:10.2767699Zat org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) 2020-08-04T13:13:10.2768187Zat org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1437) 2020-08-04T13:13:10.2768646Zat org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253) 2020-08-04T13:13:10.2769051Zat org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63) 2020-08-04T13:13:10.2769470Zat org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361) 2020-08-04T13:13:10.2769988Zat org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359) 2020-08-04T13:13:10.2770438Zat java.security.AccessController.doPrivileged(Native Method) 2020-08-04T13:13:10.2770735Zat javax.security.auth.Subject.doAs(Subject.java:422) 2020-08-04T13:13:10.2771113Zat org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1840) 2020-08-04T13:13:10.2771503Zat org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:358) 2020-08-04T13:13:10.2771870Zat org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62) 2020-08-04T13:13:10.2772317Zat java.util.concurrent.FutureTask.run(FutureTask.java:266) 2020-08-04T13:13:10.2772732Zat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[jira] [Created] (FLINK-18766) Support add_sink() for Python DataStream API
Hequn Cheng created FLINK-18766: --- Summary: Support add_sink() for Python DataStream API Key: FLINK-18766 URL: https://issues.apache.org/jira/browse/FLINK-18766 Project: Flink Issue Type: Sub-task Reporter: Hequn Cheng -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18765) Support map() and flat_map() for Python DataStream API
Hequn Cheng created FLINK-18765: --- Summary: Support map() and flat_map() for Python DataStream API Key: FLINK-18765 URL: https://issues.apache.org/jira/browse/FLINK-18765 Project: Flink Issue Type: Sub-task Reporter: Hequn Cheng -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18764) Support from_collection for Python DataStream API
Hequn Cheng created FLINK-18764: --- Summary: Support from_collection for Python DataStream API Key: FLINK-18764 URL: https://issues.apache.org/jira/browse/FLINK-18764 Project: Flink Issue Type: Sub-task Reporter: Hequn Cheng -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18763) Support basic TypeInformation for Python DataStream API
Hequn Cheng created FLINK-18763: --- Summary: Support basic TypeInformation for Python DataStream API Key: FLINK-18763 URL: https://issues.apache.org/jira/browse/FLINK-18763 Project: Flink Issue Type: Sub-task Reporter: Hequn Cheng Supports basic TypeInformation including BasicTypeInfo, LocalTimeTypeInfo, PrimitiveArrayTypeInfo, RowTypeInfo. Types.ROW()/Types.ROW_NAMED()/Types.PRIMITIVE_ARRAY() should also be supported. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18761) Support Python DataStream API (Stateless part)
Hequn Cheng created FLINK-18761: --- Summary: Support Python DataStream API (Stateless part) Key: FLINK-18761 URL: https://issues.apache.org/jira/browse/FLINK-18761 Project: Flink Issue Type: New Feature Components: API / DataStream, API / Python Reporter: Hequn Cheng This is the umbrella Jira for FLIP-130, which intends to support Python DataStream API for the stateless part. FLIP wiki page: [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866298] As we all know, Flink provides [three layered APIs|https://flink.apache.org/flink-applications.html#layered-apis]: the ProcessFunctions, the DataStream API and the SQL & Table API. Each API offers a different trade-off between conciseness and expressiveness and targets different use cases. Currently, the SQL & Table API has already been supported in PyFlink. The API provides relational operations as well as user-defined functions to provide convenience for users who are familiar with python and relational programming. Meanwhile, the DataStream API and ProcessFunctions provide more generic APIs to implement stream processing applications. The ProcessFunctions expose time and state which are the fundamental building blocks for any kind of streaming application. To cover more use cases, we are planning to cover all these APIs in PyFlink. In this FLIP, we propose to support the Python DataStream API for the stateless part. For more detail, please refer to the [FLIP wiki page|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866298]. As for the stateful part, it would come later after this FLIP. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18655) Set failOnUnableToExtractRepoInfo to false for git-commit-id-plugin in module flink-runtime
Hequn Cheng created FLINK-18655: --- Summary: Set failOnUnableToExtractRepoInfo to false for git-commit-id-plugin in module flink-runtime Key: FLINK-18655 URL: https://issues.apache.org/jira/browse/FLINK-18655 Project: Flink Issue Type: Bug Components: Runtime / Configuration Affects Versions: 1.11.0 Reporter: Hequn Cheng Assignee: Hequn Cheng Exception may be thrown when building source distribution without the .git folder(for the flink-runtime module): {code:java} [ERROR] Failed to execute goal pl.project13.maven:git-commit-id-plugin:4.0.0:revision (get-the-git-infos) on project flink-runtime_2.11: Could not complete Mojo execution... Error: Could not get HEAD Ref, are you sure you have set the dotGitDirectory property of this plugin to a valid path? -> [Help 1] {code} To solve the problem, we need to add the {{false}} configuration in addition to {{false}} in the pom of flink-runtime. The reason is the plugin:git-commit-id-plugin would search up all the maven parent project hierarchy until a .git directory is found. For example, if we config dotGitDirectory as /a/b/c/.git and if /a/b/c/.git is invalid, the plugin would search /a/b/.git and /a/.git. However, once the plugin found a /a/.git folder, it would fail on extracting repo info which leads to the failure above. The search logic of the plugin can be found [here|https://github.com/git-commit-id/git-commit-id-maven-plugin/blob/v4.0.0/maven/src/main/java/pl/project13/maven/git/GitDirLocator.java#L74]. You can reproduce the exception with: - download the 1.11.0 source distribution. - put a .git folder under the path (or parent path) of ${project.basedir}/../.git. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18151) Resolve CWE22 problems in pyflink_gateway_server.py
Hequn Cheng created FLINK-18151: --- Summary: Resolve CWE22 problems in pyflink_gateway_server.py Key: FLINK-18151 URL: https://issues.apache.org/jira/browse/FLINK-18151 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.10.1, 1.11.0, 1.12.0 Reporter: Hequn Cheng For example, the code `if os.path.isfile(flink_conf_path):` contains CWE22 problem that calling "os.path.isfile" with the tainted value in argument 1. This constructs a path or URI using the tainted value and may thus allow an attacker to access, modify, or test the existence of critical or sensitive files. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18143) Fix Python meter metric not correct problem
Hequn Cheng created FLINK-18143: --- Summary: Fix Python meter metric not correct problem Key: FLINK-18143 URL: https://issues.apache.org/jira/browse/FLINK-18143 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.11.0 Reporter: Hequn Cheng Fix For: 1.11.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18031) Update the copyright year in the NOTICE file in flink-shaded repo
Hequn Cheng created FLINK-18031: --- Summary: Update the copyright year in the NOTICE file in flink-shaded repo Key: FLINK-18031 URL: https://issues.apache.org/jira/browse/FLINK-18031 Project: Flink Issue Type: Bug Affects Versions: shaded-10.0 Reporter: Hequn Cheng Fix For: shaded-11.0 The year in the root NOTICE file should be updated from `2014-2017` to `2014-2020`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17962) Add document for how to define Python UDF with DDL
Hequn Cheng created FLINK-17962: --- Summary: Add document for how to define Python UDF with DDL Key: FLINK-17962 URL: https://issues.apache.org/jira/browse/FLINK-17962 Project: Flink Issue Type: Improvement Components: API / Python, Documentation Reporter: Hequn Cheng -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17960) Improve commands in the "Common Questions" document for PyFlink
Hequn Cheng created FLINK-17960: --- Summary: Improve commands in the "Common Questions" document for PyFlink Key: FLINK-17960 URL: https://issues.apache.org/jira/browse/FLINK-17960 Project: Flink Issue Type: Improvement Components: API / Python Affects Versions: 1.11.0 Reporter: Hequn Cheng Currently, in the ["Common Questions" |https://ci.apache.org/projects/flink/flink-docs-master/dev/table/python/common_questions.html#preparing-python-virtual-environment]document, we have the command `$ setup-pyflink-virtual-env.sh` to run the script. However, the script is not executable. It would be better to replace the command with `$ sh setup-pyflink-virtual-env.sh` and add download command. {code} $ curl -O https://ci.apache.org/projects/flink/flink-docs-master/downloads/setup-pyflink-virtual-env.sh $ sh setup-pyflink-virtual-env.sh {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17959) Exception: "CANCELLED: call already cancelled" is thrown when run python udf
Hequn Cheng created FLINK-17959: --- Summary: Exception: "CANCELLED: call already cancelled" is thrown when run python udf Key: FLINK-17959 URL: https://issues.apache.org/jira/browse/FLINK-17959 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.10.1, 1.11.0 Reporter: Hequn Cheng The exception is thrown when running Python UDF: {code:java} May 27, 2020 3:20:49 PM org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor run SEVERE: Exception while executing runnable org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed@3960b30e org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException: CANCELLED: call already cancelled at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Status.asRuntimeException(Status.java:524) at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onCompleted(ServerCalls.java:366) at org.apache.beam.runners.fnexecution.state.GrpcStateService$Inbound.onError(GrpcStateService.java:145) at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:270) at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96) at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:337) at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:793) at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} The job can output the right results however it seems something goes wrong during the shutdown procedure. You can reproduce the exception with the following code(note: the exception happens occasionally): {code} from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, DataTypes from pyflink.table.descriptors import Schema, OldCsv, FileSystem from pyflink.table.udf import udf env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env) add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT()) t_env.register_function("add", add) t_env.connect(FileSystem().path('/tmp/input')) \ .with_format(OldCsv() .field('a', DataTypes.BIGINT()) .field('b', DataTypes.BIGINT())) \ .with_schema(Schema() .field('a', DataTypes.BIGINT()) .field('b', DataTypes.BIGINT())) \ .create_temporary_table('mySource') t_env.connect(FileSystem().path('/tmp/output')) \ .with_format(OldCsv() .field('sum', DataTypes.BIGINT())) \ .with_schema(Schema() .field('sum', DataTypes.BIGINT())) \ .create_temporary_table('mySink') t_env.from_path('mySource')\ .select("add(a, b)") \ .insert_into('mySink') t_env.execute("tutorial_job") {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17710) StreamSqlTests.test_execute_sql test is not stable
Hequn Cheng created FLINK-17710: --- Summary: StreamSqlTests.test_execute_sql test is not stable Key: FLINK-17710 URL: https://issues.apache.org/jira/browse/FLINK-17710 Project: Flink Issue Type: Bug Components: API / Python Reporter: Hequn Cheng Failure log: https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_apis/build/builds/1311/logs/144 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17318) The comment is not right in `org.apache.flink.table.planner.delegation.PlannerBase`
Hequn Cheng created FLINK-17318: --- Summary: The comment is not right in `org.apache.flink.table.planner.delegation.PlannerBase` Key: FLINK-17318 URL: https://issues.apache.org/jira/browse/FLINK-17318 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.10.0 Reporter: Hequn Cheng `org.apache.flink.table.planner.delegation.PlannerBase` should be an implementation of Blink planner instead of legacy Flink planner. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17215) Clean the build document for Pyflink
Hequn Cheng created FLINK-17215: --- Summary: Clean the build document for Pyflink Key: FLINK-17215 URL: https://issues.apache.org/jira/browse/FLINK-17215 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Hequn Cheng Assignee: Hequn Cheng Fix For: 1.9.4 Previously the build document of Pyflink has been adjusted in https://github.com/apache/flink/pull/11013, however, we can clean it now as we have pick the fix in FLINK-15638 into the 1.9. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16854) Correct dependency versions in the NOTICE file of module statefun-ridesharing-example-simulator
Hequn Cheng created FLINK-16854: --- Summary: Correct dependency versions in the NOTICE file of module statefun-ridesharing-example-simulator Key: FLINK-16854 URL: https://issues.apache.org/jira/browse/FLINK-16854 Project: Flink Issue Type: Bug Components: Stateful Functions Reporter: Hequn Cheng Fix For: statefun-2.0 There are some dependencies with a wrong version in the NOTICE file: {code} com.google.code.findbugs:jsr305:3.0.2:compile (Remove compile) org.hibernate.validator:hibernate-validator:6.0.17 (Version should be 6.0.17.Final) org.jboss.logging:jboss-logging:3.3.2 (Version should be 3.3.2.Final) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16853) Update the version of protobuf-java from 3.8.0 to 3.7.1 in the NOTICE file of module statefun-flink-distribution
Hequn Cheng created FLINK-16853: --- Summary: Update the version of protobuf-java from 3.8.0 to 3.7.1 in the NOTICE file of module statefun-flink-distribution Key: FLINK-16853 URL: https://issues.apache.org/jira/browse/FLINK-16853 Project: Flink Issue Type: Bug Components: Stateful Functions Reporter: Hequn Cheng Fix For: statefun-2.0 In the NOTICE file, the version of protobuf-java should be 3.7.1 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16786) Fix pyarrow version incompatible problem
Hequn Cheng created FLINK-16786: --- Summary: Fix pyarrow version incompatible problem Key: FLINK-16786 URL: https://issues.apache.org/jira/browse/FLINK-16786 Project: Flink Issue Type: Bug Components: API / Python Reporter: Hequn Cheng Fix For: 1.11.0 As reported in FLINK-16483, we should make the version of pyarrow consistent between pyflink and beam. Other dependencies should also be checked. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16766) Support create StreamTableEnvironment without passing StreamExecutionEnvironment
Hequn Cheng created FLINK-16766: --- Summary: Support create StreamTableEnvironment without passing StreamExecutionEnvironment Key: FLINK-16766 URL: https://issues.apache.org/jira/browse/FLINK-16766 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Hequn Cheng Fix For: 1.9.3, 1.10.1, 1.11.0 Currently, when we create a BatchTableEnvironment, the ExecutionEnvironment is an optional parameter, while for the StreamTableEnvironment, the ExecutionEnvironment is not optional. We should make them consistent -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16765) Replace all BatchTableEnvironment to StreamTableEnvironment in the document of PyFlink
Hequn Cheng created FLINK-16765: --- Summary: Replace all BatchTableEnvironment to StreamTableEnvironment in the document of PyFlink Key: FLINK-16765 URL: https://issues.apache.org/jira/browse/FLINK-16765 Project: Flink Issue Type: Improvement Components: API / Python, Documentation Reporter: Hequn Cheng Fix For: 1.10.1, 1.11.0 For example, in the [tutorial|https://ci.apache.org/projects/flink/flink-docs-master/getting-started/walkthroughs/python_table_api.html], replace the BatchTableEnvironment to StreamTableEnvironment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16763) Should not use BatchTableEnvironment for Python UDF in the document of flink-1.10
Hequn Cheng created FLINK-16763: --- Summary: Should not use BatchTableEnvironment for Python UDF in the document of flink-1.10 Key: FLINK-16763 URL: https://issues.apache.org/jira/browse/FLINK-16763 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Hequn Cheng Fix For: 1.10.1 Currently, for flink-1.10, Python UDF is not supported in old planner under batch mode, so we should not use BatchTableEnvironment as an example for Python UDF in the document[1] of flink-1.10. The problem may also exist in other places in the document and should be checked. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/python_udfs.html -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16761) Return JobExecutionResult for Python ExecutionEnvironment and TableEnvironment
Hequn Cheng created FLINK-16761: --- Summary: Return JobExecutionResult for Python ExecutionEnvironment and TableEnvironment Key: FLINK-16761 URL: https://issues.apache.org/jira/browse/FLINK-16761 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.10.0, 1.9.2 Reporter: Hequn Cheng Fix For: 1.9.3, 1.10.1, 1.11.0 For ExecutionEnvironment and TableEnvironment, the JobExecutionResult should be returned in order to make jobID or other information accessible for users. This is also the behavior of Java and we should make python consistent with it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16674) Add documentation about how to use user-defined metrics for Python UDF
Hequn Cheng created FLINK-16674: --- Summary: Add documentation about how to use user-defined metrics for Python UDF Key: FLINK-16674 URL: https://issues.apache.org/jira/browse/FLINK-16674 Project: Flink Issue Type: Sub-task Components: API / Python, Documentation Reporter: Hequn Cheng Assignee: Hequn Cheng Fix For: 1.11.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16673) Support Counter, Gauge, Meter, Distribution metric type for Python UDTF
Hequn Cheng created FLINK-16673: --- Summary: Support Counter, Gauge, Meter, Distribution metric type for Python UDTF Key: FLINK-16673 URL: https://issues.apache.org/jira/browse/FLINK-16673 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Hequn Cheng Assignee: Hequn Cheng Fix For: 1.11.0 Support Counter, Gauge, Meter, Distribution metric type for Python UDTF -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16672) Support Counter, Gauge, Meter, Distribution metric type for Python UDF
Hequn Cheng created FLINK-16672: --- Summary: Support Counter, Gauge, Meter, Distribution metric type for Python UDF Key: FLINK-16672 URL: https://issues.apache.org/jira/browse/FLINK-16672 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Hequn Cheng Assignee: Hequn Cheng Fix For: 1.11.0 Support Counter, Gauge, Meter, Distribution metric type for Python UDF -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16671) Support define scopes and variables on Python metric group
Hequn Cheng created FLINK-16671: --- Summary: Support define scopes and variables on Python metric group Key: FLINK-16671 URL: https://issues.apache.org/jira/browse/FLINK-16671 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Hequn Cheng Assignee: Hequn Cheng Fix For: 1.11.0 In this ticket, we are going to support define scopes and variables on Python metric group, i.e., adding `get_metric_group()` on `FunctionContext`, adding `add_group`, `get_scope_components`, `get_all_variables` and `get_metric_identifier` in `MetricGroup`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16670) Support User-Defined Metrics in Python UDF
Hequn Cheng created FLINK-16670: --- Summary: Support User-Defined Metrics in Python UDF Key: FLINK-16670 URL: https://issues.apache.org/jira/browse/FLINK-16670 Project: Flink Issue Type: New Feature Components: API / Python Reporter: Hequn Cheng Assignee: Hequn Cheng Fix For: 1.11.0 This is the umbrella Jira for FLIP112, which intends to support User-Defined Metrics in Python UDF. FLIP wiki page: [https://cwiki.apache.org/confluence/display/FLINK/FLIP-112%3A+Support+User-Defined+Metrics+in++Python+UDF] FLIP-58 adds the support for Python UDFs, but user-defined metrics have not been supported yet. With metrics, users can report and monitor the UDF status to get a deeper understanding of the execution. In this FLIP, we want to support metrics for Python UDFs. In this FLIP we propose to: * Support user-defined metrics including Counters, Gauges, Meters, Distributions in Python UDFs. (Note: Histogram is not supported in this FLIP, instead, Distributions is supported to report statistics about the distribution of value. See more in the Distribution section.) * Support defining user scopes. * Support defining user variables. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16565) Make Pipeline Json compitable between Java and Python if all Pipelinestage are Java ones
Hequn Cheng created FLINK-16565: --- Summary: Make Pipeline Json compitable between Java and Python if all Pipelinestage are Java ones Key: FLINK-16565 URL: https://issues.apache.org/jira/browse/FLINK-16565 Project: Flink Issue Type: Sub-task Components: API / Python, Library / Machine Learning Reporter: Hequn Cheng Assignee: Hequn Cheng Fix For: 1.11.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16273) Solve "sun.misc.Unseafe or java.nio.DirectByteBuffer.(long, int) not available" problem for users
Hequn Cheng created FLINK-16273: --- Summary: Solve "sun.misc.Unseafe or java.nio.DirectByteBuffer.(long, int) not available" problem for users Key: FLINK-16273 URL: https://issues.apache.org/jira/browse/FLINK-16273 Project: Flink Issue Type: Bug Components: API / Python Reporter: Hequn Cheng Currently, the JVM property "io.netty.tryReflectionSetAccessible=true" at startup should be set for pandas udf users. We should add a document for this or solve this automatically. BTW, some other discussion about it: ARROW-7223 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16250) Add interfaces for PipelineStage and Pipeline
Hequn Cheng created FLINK-16250: --- Summary: Add interfaces for PipelineStage and Pipeline Key: FLINK-16250 URL: https://issues.apache.org/jira/browse/FLINK-16250 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Hequn Cheng Assignee: Hequn Cheng A pipeline is a linear workflow that chains some PipelineStages, e.g., Estimators and Transformers to execute an algorithm. After this issue is addressed, Python users can write Python Pipelines. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16249) Add interfaces for Params, ParamInfo and WithParams
Hequn Cheng created FLINK-16249: --- Summary: Add interfaces for Params, ParamInfo and WithParams Key: FLINK-16249 URL: https://issues.apache.org/jira/browse/FLINK-16249 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Hequn Cheng Assignee: Hequn Cheng Parameters are widely used in machine learning realm. These classes define common interfaces to interact with classes with parameters. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16248) Add interfaces for MLEnvironment and MLEnvironmentFactory
Hequn Cheng created FLINK-16248: --- Summary: Add interfaces for MLEnvironment and MLEnvironmentFactory Key: FLINK-16248 URL: https://issues.apache.org/jira/browse/FLINK-16248 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Hequn Cheng Assignee: Hequn Cheng Align interface for MLEnvironment and MLEnvironmentFactory, so Python users can use Python MLEnvironmentFactory to maintain execution environment and table environment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16241) Remove the license and notice file in flink-ml-lib module on release-1.10 branch
Hequn Cheng created FLINK-16241: --- Summary: Remove the license and notice file in flink-ml-lib module on release-1.10 branch Key: FLINK-16241 URL: https://issues.apache.org/jira/browse/FLINK-16241 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: 1.10.0 Reporter: Hequn Cheng Assignee: Hequn Cheng Fix For: 1.10.1 The jar of flink-ml-lib should not contain the license and notice file as it actually does not bundle the related dependencies. We should remove these file on branch release-1.10. BTW. The release-1.9 branch does not have this problem since the license and notice are added in 1.10. On master(1.11), we will bundle the dependencies, so the license and notice file should be kept, see FLINK-15847. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16187) Support Python ML Pipeline API
Hequn Cheng created FLINK-16187: --- Summary: Support Python ML Pipeline API Key: FLINK-16187 URL: https://issues.apache.org/jira/browse/FLINK-16187 Project: Flink Issue Type: New Feature Components: API / Python Reporter: Hequn Cheng Assignee: Hequn Cheng This is the umbrella Jira for FLIP96, which intends to support Python ML Pipeline API. FLIP wiki page: [https://cwiki.apache.org/confluence/display/FLINK/FLIP-96%3A+Support+Python+ML+Pipeline+API] FLIP-39 rebuilds the Flink ML pipeline on top of TableAPI and introduces a new set of Java APIs. As Python is widely used in ML areas, providing Python ML Pipeline APIs for Flink can not only make it easier to write ML jobs for Python users but also broaden the adoption of Flink ML. In this FLIP we propose to: * Add Python pipeline API according to Java pipeline API(we will adapt the Python pipeline API if Java pipeline API changes). * Support native Python Transformer/Estimator/Model, i.e., users can write not only Python Transformer/Estimator/Model wrappers for calling Java ones but also can write native Python Transformer/Estimator/Models. * Ease of use. Support keyword arguments when defining parameters. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16119) Port base RelNode classes from Scala to Java
Hequn Cheng created FLINK-16119: --- Summary: Port base RelNode classes from Scala to Java Key: FLINK-16119 URL: https://issues.apache.org/jira/browse/FLINK-16119 Project: Flink Issue Type: Improvement Components: Table SQL / Legacy Planner, Table SQL / Planner Reporter: Hequn Cheng Currently, when adding new Flink RelNodes, we have to write a Scala one due to the problem that we can't use the implemented methods of a Scala trait from Java([see details|https://alvinalexander.com/scala/how-to-wrap-scala-traits-used-accessed-java-classes-methods]). Take DataStreamCorrelate as an example, it extends both CommonCorrelate and DataStreamRel and we can't convert DataStreamCorrelate to Java directly. It would be great if we can convert these base RelNode classes(CommonCorrelate, DataStreamRel, etc) from Scala to Java so that we can add new Java RelNodes and convert the existed RelNodes to Java. CC [~twalthr] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15847) Include flink-ml-api and flink-ml-lib in opt
Hequn Cheng created FLINK-15847: --- Summary: Include flink-ml-api and flink-ml-lib in opt Key: FLINK-15847 URL: https://issues.apache.org/jira/browse/FLINK-15847 Project: Flink Issue Type: Improvement Components: Library / Machine Learning Reporter: Hequn Cheng Assignee: Hequn Cheng [FLIP-39|https://cwiki.apache.org/confluence/display/FLINK/FLIP-39+Flink+ML+pipeline+and+ML+libs] rebuilds Flink ML pipeline on top of TableAPI which moves Flink ML a step further. Base on it, users can develop their ML jobs and more and more machine learning platforms are providing ML services. However, the problem now is the jars of flink-ml-api and flink-ml-lib are only exist on maven repo. Whenever users want to submit ML jobs, they can only depend on the ml modules and package a fat jar. This would be inconvenient especially for the machine learning platforms on which nearly all jobs depend on Flink ML modules and have to package a fat jar. Given this, it would be better to include jars of flink-ml-api and flink-ml-lib in the `opt` folder, so that users can directly use the jars with the binary release. For example, users can move the jars into the `lib` folder or use -j to upload the jars. Putting the jars in the `opt` folder instead of the `lib` folder is because currently, the ml jars are still optional for the Flink project by default. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15675) Add documentation that Python UDF is not supported for Flink Planner under batch mode
Hequn Cheng created FLINK-15675: --- Summary: Add documentation that Python UDF is not supported for Flink Planner under batch mode Key: FLINK-15675 URL: https://issues.apache.org/jira/browse/FLINK-15675 Project: Flink Issue Type: Improvement Components: API / Python, Documentation Reporter: Hequn Cheng Assignee: Hequn Cheng Fix For: 1.10.0 We should add document to info users that Python UDF is not supported for Flink Planner under batch mode. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15636) Support Python UDF for flink planner under batch mode
Hequn Cheng created FLINK-15636: --- Summary: Support Python UDF for flink planner under batch mode Key: FLINK-15636 URL: https://issues.apache.org/jira/browse/FLINK-15636 Project: Flink Issue Type: New Feature Components: API / Python Reporter: Hequn Cheng Assignee: Hequn Cheng Currently, Python UDF has been supported under flink planner(only stream) and blink planner(stream). This jira dedicates to add Python UDF support for flink planner under batch mode. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15616) Move boot error messages from python-udf-boot.log to taskmanager's log file
Hequn Cheng created FLINK-15616: --- Summary: Move boot error messages from python-udf-boot.log to taskmanager's log file Key: FLINK-15616 URL: https://issues.apache.org/jira/browse/FLINK-15616 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Hequn Cheng Assignee: Hequn Cheng Previously, the boot error messages are printed in the log file under FLINK_LOG_DIR, i.e., {{"$FLINK_LOG_DIR/flink-$USER-python-udf-boot-$HOSTNAME.log"}}. This additional file is very hard to locate for users, so it would be better to print the error messages directly into the taskmanager log file. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14684) Add Pinterest to Chinese Powered By page
Hequn Cheng created FLINK-14684: --- Summary: Add Pinterest to Chinese Powered By page Key: FLINK-14684 URL: https://issues.apache.org/jira/browse/FLINK-14684 Project: Flink Issue Type: New Feature Components: chinese-translation Reporter: Hequn Cheng Fix For: 1.10.0 Pinterest was added to the English Powered By page with commit: [51f7e3ced85b94dcbe3c051069379d22c88fbc5c|https://github.com/apache/flink-web/pull/281] It should be added to the Chinese Powered By (and index.html) page as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14497) Support primitive data types in Python user-defined functions
Hequn Cheng created FLINK-14497: --- Summary: Support primitive data types in Python user-defined functions Key: FLINK-14497 URL: https://issues.apache.org/jira/browse/FLINK-14497 Project: Flink Issue Type: New Feature Components: API / Python Reporter: Hequn Cheng Assignee: Huang Xingbo Fix For: 1.10.0 This jira is a sub-task of FLINK-14388. In this jira, only primitive types are dedicated to be supported for python UDF. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14459) Python module build hangs
Hequn Cheng created FLINK-14459: --- Summary: Python module build hangs Key: FLINK-14459 URL: https://issues.apache.org/jira/browse/FLINK-14459 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.9.0, 1.10.0, 1.9.1 Reporter: Hequn Cheng The build of python module hangs when installing conda. See travis log: https://api.travis-ci.org/v3/job/599704570/log.txt Can't reproduce it neither on my local mac nor on my repo with travis. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14385) There are ClassNotFoundException when run the example in Local Setup Tutorial
Hequn Cheng created FLINK-14385: --- Summary: There are ClassNotFoundException when run the example in Local Setup Tutorial Key: FLINK-14385 URL: https://issues.apache.org/jira/browse/FLINK-14385 Project: Flink Issue Type: Improvement Components: Client / Job Submission Affects Versions: 1.9.1 Reporter: Hequn Cheng When do the release check of release-1.9.1-rc1, the ClassNotFoundException is found when go through the wordcount example in [Local Setup Tutorial|https://ci.apache.org/projects/flink/flink-docs-master/getting-started/tutorials/local_setup.html#run-the-example]. You can find the exception in the log file of `flink-xxx-client-MacBook-Pro-2.local.log` {code:java} java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1187) at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1147) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1072) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 5 more {code} We know that Hadoop is not pre-bundled in Flink anymore but it would be nice to avoid this exception in order not to bring confusion to new users when they run programs under the local cluster? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14383) Support python UDFs with constant value of time interval types
Hequn Cheng created FLINK-14383: --- Summary: Support python UDFs with constant value of time interval types Key: FLINK-14383 URL: https://issues.apache.org/jira/browse/FLINK-14383 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Hequn Cheng Fix For: 1.10.0 As discussed [here|https://github.com/apache/flink/pull/9858#issuecomment-541312088], this issue is dedicated to add support for python UDFs with constant value of time interval types. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-13637) Anchors not working in document(building.md, common.md, queryable_state.md)
Hequn Cheng created FLINK-13637: --- Summary: Anchors not working in document(building.md, common.md, queryable_state.md) Key: FLINK-13637 URL: https://issues.apache.org/jira/browse/FLINK-13637 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.9.0 Reporter: Hequn Cheng Fix For: 1.9.0 Anchors not working in document(building.md, common.md, queryable_state.md). The format should be: {code:java} [create an anchor](#anchors-in-markdown) {code} - Add - characters between each word in the heading and wrap the value in parens - All letters should be lowercase. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13446) Row count sliding window outputs incorrectly in blink planner
Hequn Cheng created FLINK-13446: --- Summary: Row count sliding window outputs incorrectly in blink planner Key: FLINK-13446 URL: https://issues.apache.org/jira/browse/FLINK-13446 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.9.0 Reporter: Hequn Cheng For blink planner, the Row count sliding window outputs incorrectly. The window assigner assigns less window than what expected. This means the window outputs fewer data. The bug can be reproduced by the following test: {code:java} @Test def testGroupWindowWithoutKeyInProjection(): Unit = { val data = List( (1L, 1, "Hi", 1, 1), (2L, 2, "Hello", 2, 2), (4L, 2, "Hello", 2, 2), (8L, 3, "Hello world", 3, 3), (16L, 3, "Hello world", 3, 3)) val stream = failingDataSource(data) val table = stream.toTable(tEnv, 'long, 'int, 'string, 'int2, 'int3, 'proctime.proctime) val weightAvgFun = new WeightedAvg val countDistinct = new CountDistinct val windowedTable = table .window(Slide over 2.rows every 1.rows on 'proctime as 'w) .groupBy('w, 'int2, 'int3, 'string) .select(weightAvgFun('long, 'int), countDistinct('long)) val sink = new TestingAppendSink windowedTable.toAppendStream[Row].addSink(sink) env.execute() val expected = Seq("12,2", "8,1", "2,1", "3,2", "1,1") assertEquals(expected.sorted, sink.getAppendResults.sorted) } {code} The expected output is Seq("12,2", "8,1", "2,1", "3,2", "1,1") while the actual output is Seq("12,2", "3,2") To fix the problem, we can correct the assign logic in CountSlidingWindowAssigner.assignWindows. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13392) WindowAggregate inherited from Aggregate incorrectly
Hequn Cheng created FLINK-13392: --- Summary: WindowAggregate inherited from Aggregate incorrectly Key: FLINK-13392 URL: https://issues.apache.org/jira/browse/FLINK-13392 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Hequn Cheng As discussed in FLINK-12249, the WindowAggregate inherited from Aggregate incorrectly. For WindowAggregate, the group keys are window group and normal fields (may be empty), while Aggregate only has normal group keys part, and know nothing about window group key. Currently, many planner rules match and apply transformations on Aggregate, however some of them does not applicable to WindowAggregate, e.g. AggregateJoinTransposeRule, AggregateProjectMergeRule, etc. Although FLINK-12249 fixes the type equivalence check problem, we should do a step further to correct the WindowAggregate behavior. There are three options now: # make Aggregate's group key supports expressions(such as RexCall), not field reference only. and then the window group expression could be as a part of Aggregate's group key. the disadvantage is we must update all existing aggregate rules, metadata handlers, etc. # make WindowAggregate extends from SingleRel, not from Aggregate. the disadvantage is we must implement related planner rules about WindowAggregate. # in logical phase, we does not merge Aggregate and Project (with window group) into WindowAggregate, and convert the Project to a new kind of node named WindowAssigner, which could prevent Project from being pushed down/merged. and in physical phase, we merge them into WindowAggregate. the advantage is we could reuse current aggregate rules, and the disadvantage is we should add new rules about WindowAssigner. We could have some further discussions in the jira ticket. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13196) Fix Ambiguous column name exception bug for Table API
Hequn Cheng created FLINK-13196: --- Summary: Fix Ambiguous column name exception bug for Table API Key: FLINK-13196 URL: https://issues.apache.org/jira/browse/FLINK-13196 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.9.0 Reporter: Hequn Cheng Assignee: Hequn Cheng The following query should be valid, however, ambiguous column name exception is thrown. {code:java} val util = streamTestUtil() val table = util.addTable[(Long, Int, String)]('a, 'b, 'c) val resultTable = table .groupBy('b) .select('b, 'a.sum, 'a.sum, 'a.sum) {code} {code:java} org.apache.flink.table.api.ValidationException: Ambiguous column name: EXPR$0 at org.apache.flink.table.operations.utils.factories.ProjectionOperationFactory.lambda$validateAndGetUniqueNames$4(ProjectionOperationFactory.java:103) {code} We should add some alias logic in {{AggregationAndPropertiesReplacer}} if the name has ever been used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13087) Add group window Aggregate operator to Table API
Hequn Cheng created FLINK-13087: --- Summary: Add group window Aggregate operator to Table API Key: FLINK-13087 URL: https://issues.apache.org/jira/browse/FLINK-13087 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Hequn Cheng Assignee: Hequn Cheng Add Group Window Aggregate operator to Table API as described in [Google doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. The usage: {code:java} val res = tab .window(Tumble over 15.minute on 'rowtime as 'w) .groupBy('w, 'a) // leave out groupBy-clause to define global aggregates .agg(fun: AggregateFunction) // output has columns 'a, 'b, 'c .select('a, 'c, 'w.start) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13057) Correct comments in ListState class
Hequn Cheng created FLINK-13057: --- Summary: Correct comments in ListState class Key: FLINK-13057 URL: https://issues.apache.org/jira/browse/FLINK-13057 Project: Flink Issue Type: Bug Components: Runtime / State Backends Reporter: Hequn Cheng Assignee: Hequn Cheng ListState can be a keyed state or an operator state, but the comment in ListState said it can only be a keyed state: {code:java} The state is only accessible by functions applied on a {@code KeyedStream}. {code} We can change the comment from {code:java} * The state is only accessible by functions applied on a {@code KeyedStream}. The key is * automatically supplied by the system, so the function always sees the value mapped to the * key of the current element. That way, the system can handle stream and state partitioning * consistently together.{code} to {code:java} * The state can be a keyed state or an operator state. When it is a keyed state, it is only * accessible by functions applied on a {@code KeyedStream}. The key is automatically supplied by * the system, so the function always sees the value mapped to the key of the current element. * That way, the system can handle stream and state partitioning consistently together.{code} Appreciate any suggestions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12906) Port OperationTreeBuilder to Java
Hequn Cheng created FLINK-12906: --- Summary: Port OperationTreeBuilder to Java Key: FLINK-12906 URL: https://issues.apache.org/jira/browse/FLINK-12906 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: Hequn Cheng Assignee: Hequn Cheng As discussed with [~dawidwys], even we can't move it to the API module yet, but we could already migrate it to java. This might be a good idea, because some new features added there that use scala features, any such additions are making the future migration harder. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12797) Remove duplicate tests in GroupWindowTableAggregateValidationTest
Hequn Cheng created FLINK-12797: --- Summary: Remove duplicate tests in GroupWindowTableAggregateValidationTest Key: FLINK-12797 URL: https://issues.apache.org/jira/browse/FLINK-12797 Project: Flink Issue Type: Improvement Components: Table SQL / API Environment: Some tests in \{{GroupWindowTableAggregateValidationTest}} are duplicate with tests in \{{GroupWindowValidationTest}}. We can remove these duplicate tests directly. Reporter: Hequn Cheng Assignee: Hequn Cheng -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12789) Fix java docs in UserDefinedAggregateFunction
Hequn Cheng created FLINK-12789: --- Summary: Fix java docs in UserDefinedAggregateFunction Key: FLINK-12789 URL: https://issues.apache.org/jira/browse/FLINK-12789 Project: Flink Issue Type: Bug Components: Table SQL / API Reporter: Hequn Cheng Assignee: Hequn Cheng We use \{{UserDefinedAggregateFunction}} as the base class for \{{TableAggregateFunction}} and \{{AggregateFunction}}. However, the java docs in \{{UserDefinedAggregateFunction}} are only dedicated for \{{AggregateFunction}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12779) Avoid field conflicts when generate field names for non-composite Typeinformation
Hequn Cheng created FLINK-12779: --- Summary: Avoid field conflicts when generate field names for non-composite Typeinformation Key: FLINK-12779 URL: https://issues.apache.org/jira/browse/FLINK-12779 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: Hequn Cheng Assignee: Hequn Cheng We use {{FieldInfoUtils.getFieldNames(resultType)}} to get the relative field names of the resultType. There are no problem for composite types. For non-composite types, we always set the field name to `f0`. But the `f0` may conflict with the predefined field names. To make it more robust, we should generate a field name with no conflicts. For example, we can use `f0_0` as the field name if `f0` has been used. This is also consistent with the behavior of SQL. The following test can reproduce the problem. {code:java} @Test def testUserDefinedTableFunctionWithParameter(): Unit = { val tableFunc1 = new RichTableFunc1 StreamITCase.testResults = mutable.MutableList() val result = StreamTestData.getSmall3TupleDataStream(env) .toTable(tEnv, 'f0, 'f1, 'f2) .joinLateral(tableFunc1('f2)) val results = result.toAppendStream[Row] results.addSink(new StreamITCase.StringSink[Row]) env.execute() val expected = mutable.MutableList("3,Hello", "3,world") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } {code} Exception {code:java} org.apache.flink.table.api.ValidationException: join relations with ambiguous names: [f0] at org.apache.flink.table.operations.JoinOperationFactory.validateNamesAmbiguity(JoinOperationFactory.java:115) at org.apache.flink.table.operations.JoinOperationFactory.create(JoinOperationFactory.java:78) at org.apache.flink.table.operations.OperationTreeBuilder.join(OperationTreeBuilder.scala:358) at org.apache.flink.table.operations.OperationTreeBuilder.joinLateral(OperationTreeBuilder.scala:373) at org.apache.flink.table.api.TableImpl.joinLateralInternal(tableImpl.scala:256) at org.apache.flink.table.api.TableImpl.joinLateral(tableImpl.scala:214) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12778) Fix deriveTableAggRowType bug for non-composite types
Hequn Cheng created FLINK-12778: --- Summary: Fix deriveTableAggRowType bug for non-composite types Key: FLINK-12778 URL: https://issues.apache.org/jira/browse/FLINK-12778 Project: Flink Issue Type: Bug Components: Table SQL / API Reporter: Hequn Cheng Assignee: Hequn Cheng Currently, we call {{aggCalls.get(0).`type`.getFieldList.foreach(builder.add)}} when derive row type for table aggregate. However, for types which are not composite types, the field list would be null. Table Aggregate should, of course, support non-composite types. To solve the problem, we should judge whether types are structured. This is because a composite type will be converted to a RelDataType which contains field list and is structured. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12562) Improve code in ExpandColumnFunctionsRule
Hequn Cheng created FLINK-12562: --- Summary: Improve code in ExpandColumnFunctionsRule Key: FLINK-12562 URL: https://issues.apache.org/jira/browse/FLINK-12562 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: Hequn Cheng Assignee: Hequn Cheng Enable most IntelliJ warnings and resolve these warnings. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12533) Introduce TABLE_AGGREGATE_FUNCTION FunctionDefinition.Type
Hequn Cheng created FLINK-12533: --- Summary: Introduce TABLE_AGGREGATE_FUNCTION FunctionDefinition.Type Key: FLINK-12533 URL: https://issues.apache.org/jira/browse/FLINK-12533 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: Hequn Cheng Assignee: Hequn Cheng Currently, there are four kinds of {{FunctionDefinition.Type}}, {code:java} public enum Type { AGGREGATE_FUNCTION, SCALAR_FUNCTION, TABLE_FUNCTION, OTHER_FUNCTION } {code} The Type AGGREGATE_FUNCTION is used to express both AggregateFunction and TableAggregateFunction. However, due to the two kinds of the function contains different semantics. It would be nice if we can separate these two kinds of functions more clearly by introducing another type TABLE_AGGREGATE_FUNCTION. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12402) Make validation error message for CallExpression more user friendly
Hequn Cheng created FLINK-12402: --- Summary: Make validation error message for CallExpression more user friendly Key: FLINK-12402 URL: https://issues.apache.org/jira/browse/FLINK-12402 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: Hequn Cheng Assignee: Hequn Cheng Currently, the error message for CallExpression validation may not display the root cause which may confuse our users. Take the following test as an example: {code:java} @Test def testSimpleSelectAllWithAs(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) StreamITCase.testResults = mutable.MutableList() val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) .select('a, 'b.log as 'b, 'c) val results = ds.toAppendStream[Row] results.addSink(new StreamITCase.StringSink[Row]) env.execute() val expected = mutable.MutableList( "1,1,Hi", "2,2,Hello", "3,2,Hello world") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } {code} The error message is: {code:java} org.apache.flink.table.api.ValidationException: Invalid arguments [log(b), 'b'] for function: as {code} >From the error message, it shows there is something wrong with the `as` >function. However, the root cause is the log function can only accept a double >parameter while b is a long number. To make it more user friendly, it would be better to display the root cause error message. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12401) Support incremental emit for non-window streaming FlatAggregate on Table API
Hequn Cheng created FLINK-12401: --- Summary: Support incremental emit for non-window streaming FlatAggregate on Table API Key: FLINK-12401 URL: https://issues.apache.org/jira/browse/FLINK-12401 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Hequn Cheng Assignee: Hequn Cheng As described in [Flip-29|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739], there are two output modes for non-window streaming flatAggregate. One is emitting with full values, the other is emitting with incremental values. [FLINK-10977|https://issues.apache.org/jira/browse/FLINK-10977] supports the former one, this jira is going to support the latter one. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12357) Remove useless code in TableConfig
Hequn Cheng created FLINK-12357: --- Summary: Remove useless code in TableConfig Key: FLINK-12357 URL: https://issues.apache.org/jira/browse/FLINK-12357 Project: Flink Issue Type: Bug Components: Table SQL / API Reporter: Hequn Cheng Assignee: Hequn Cheng -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12271) Display input field name list when throw Cannot resolve field exception
Hequn Cheng created FLINK-12271: --- Summary: Display input field name list when throw Cannot resolve field exception Key: FLINK-12271 URL: https://issues.apache.org/jira/browse/FLINK-12271 Project: Flink Issue Type: Bug Components: Table SQL / API Reporter: Hequn Cheng Assignee: Hequn Cheng Currently, when we select a field that does not exist, an cannot solve field exception is thrown. For example, {code:java} org.apache.flink.table.api.ValidationException: Cannot resolve field [_4] {code} It would be better to also display the input field list to indicate existing fields, such as: {code:java} org.apache.flink.table.api.ValidationException: Cannot resolve field [_4], input field list:[_1, _2, _3]. {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12217) OperationTreeBuilder.map() should perform ExpressionResolver.resolve()
Hequn Cheng created FLINK-12217: --- Summary: OperationTreeBuilder.map() should perform ExpressionResolver.resolve() Key: FLINK-12217 URL: https://issues.apache.org/jira/browse/FLINK-12217 Project: Flink Issue Type: Bug Components: Table SQL / API Reporter: Hequn Cheng Assignee: Hequn Cheng In {{OperationTreeBuilder.map()}}, we should resolve all LookupCallExpression for the case of java, otherwise, exceptions will be thrown. The following test case can reproduce the problem. {code:java} @Test def testMap(): Unit = { val util = streamTestUtil() val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c) util.tableEnv.registerFunction("func", Func23) val t1 = t.map("func(a, b, c)") val t2 = t.map(Func23('a, 'b, 'c)) verifyTableEquals(t1, t2) } {code} {code:java} org.apache.flink.table.api.ValidationException: Only ScalarFunction can be used in the map operator. at org.apache.flink.table.operations.OperationTreeBuilder.map(OperationTreeBuilder.scala:355) at org.apache.flink.table.api.TableImpl.map(tableImpl.scala:461) at org.apache.flink.table.api.TableImpl.map(tableImpl.scala:457) at org.apache.flink.table.api.stream.table.stringexpr.CalcStringExpressionTest.testMap(CalcStringExpressionTest.scala:178) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12182) AggregateProjectMergeRule can not handle LogicalWindowAggregate
Hequn Cheng created FLINK-12182: --- Summary: AggregateProjectMergeRule can not handle LogicalWindowAggregate Key: FLINK-12182 URL: https://issues.apache.org/jira/browse/FLINK-12182 Project: Flink Issue Type: Bug Components: Table SQL / API Reporter: Hequn Cheng Assignee: Hequn Cheng Currently, we add the AggregateProjectMergeRule.INSTANCE in logical rule sets and use it to remove project for aggregates. However, there are some bugs when this rule be applied for the LogicalWindowAggregate. When the project was removed, the input field names are changed, so the rule has to update all fields in the Aggregate, while the field name in LogicalWindow in LogicalWindowAggregate has not taken into consideration in AggregateProjectMergeRule, as it is a rule in Calcite. As a quick fix, I think we can change {code:java} AggregateProjectMergeRule.INSTANCE, {code} to {code:java} new AggregateProjectMergeRule( classOf[LogicalAggregate], classOf[Project], RelFactories.LOGICAL_BUILDER), {code} Of course, we need a complete solution for the LogicalWindowAggregate, but not in this jira. Any suggestions are welcomed! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12174) Introduce FlinkAggregateExtractProjectRule and remove extractFieldReferences
Hequn Cheng created FLINK-12174: --- Summary: Introduce FlinkAggregateExtractProjectRule and remove extractFieldReferences Key: FLINK-12174 URL: https://issues.apache.org/jira/browse/FLINK-12174 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: Hequn Cheng Assignee: Hequn Cheng Currently, when parsing Table API expression, an inner project will be added to projects fields for {{Aggregate}}s. As the code show below: {code:java} if (!extracted.getAggregations.isEmpty) { val projectFields = extractFieldReferences(expressionsWithResolvedCalls) wrap( operationTreeBuilder.project(extracted.getProjections, operationTreeBuilder.aggregate(emptyList[Expression], extracted.getAggregations, operationTreeBuilder.project(projectFields, operationTree) ) ) ) } {code} This optimization is not very suited to added here, instead, we can use a rule to achieve this. The `extractFieldReferences` method can also be removed if we use a rule which also makes the Expression parsing logic more clear. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12016) Change package name in blink modules to avoid class conflict
Hequn Cheng created FLINK-12016: --- Summary: Change package name in blink modules to avoid class conflict Key: FLINK-12016 URL: https://issues.apache.org/jira/browse/FLINK-12016 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Hequn Cheng A lot of classes(table, table env, etc) have been copied directly from flink into the blink modules. This brings class conflict problem as described in FLINK-11994. To solve the problem, it would be better to rename the package name in blink module. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11969) Add end2end stream sql test for unbounded operators
Hequn Cheng created FLINK-11969: --- Summary: Add end2end stream sql test for unbounded operators Key: FLINK-11969 URL: https://issues.apache.org/jira/browse/FLINK-11969 Project: Flink Issue Type: Sub-task Components: API / Table SQL, Tests Affects Versions: 1.9.0 Reporter: Hequn Cheng Assignee: Hequn Cheng Currently, there is an end2end test for streaming SQL. This test covers bounded operators such as Group Window and Window join. I think it would be nice if we add end2end tests for unbounded operators, so that unbouded group by and stream-stream join can be covered in end-to-end tests as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11934) Remove the deprecate methods in TableEnvironment
Hequn Cheng created FLINK-11934: --- Summary: Remove the deprecate methods in TableEnvironment Key: FLINK-11934 URL: https://issues.apache.org/jira/browse/FLINK-11934 Project: Flink Issue Type: New Feature Components: API / Table SQL Reporter: Hequn Cheng Assignee: Hequn Cheng There are several {{getTableEnvironment()}} methods which are deprecated during 1.8 in {{TableEnvironment}}. As the release-1.8 has been cut off. We can remove these deprecate methods now. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11916) Join with a Temporal Table should throw exception for left join
Hequn Cheng created FLINK-11916: --- Summary: Join with a Temporal Table should throw exception for left join Key: FLINK-11916 URL: https://issues.apache.org/jira/browse/FLINK-11916 Project: Flink Issue Type: Bug Components: API / Table SQL Reporter: Hequn Cheng In {{TemporalJoinITCase.testProcessTimeInnerJoin}}, if we change the inner join to left join the test works fine. We may need to throw an exception if we only support inner join. CC [~pnowojski] The problem can be reproduced with the following sql: {code:java} @Test def testEventTimeInnerJoin(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) env.setStateBackend(getStateBackend) StreamITCase.clear env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sqlQuery = """ |SELECT | o.amount * r.rate AS amount |FROM | Orders AS o left join | LATERAL TABLE (Rates(o.rowtime)) AS r on true |WHERE r.currency = o.currency |""".stripMargin val ordersData = new mutable.MutableList[(Long, String, Timestamp)] ordersData.+=((2L, "Euro", new Timestamp(2L))) ordersData.+=((1L, "US Dollar", new Timestamp(3L))) ordersData.+=((50L, "Yen", new Timestamp(4L))) ordersData.+=((3L, "Euro", new Timestamp(5L))) val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)] ratesHistoryData.+=(("US Dollar", 102L, new Timestamp(1L))) ratesHistoryData.+=(("Euro", 114L, new Timestamp(1L))) ratesHistoryData.+=(("Yen", 1L, new Timestamp(1L))) ratesHistoryData.+=(("Euro", 116L, new Timestamp(5L))) ratesHistoryData.+=(("Euro", 119L, new Timestamp(7L))) var expectedOutput = new mutable.HashSet[String]() expectedOutput += (2 * 114).toString expectedOutput += (3 * 116).toString val orders = env .fromCollection(ordersData) .assignTimestampsAndWatermarks(new TimestampExtractor[Long, String]()) .toTable(tEnv, 'amount, 'currency, 'rowtime.rowtime) val ratesHistory = env .fromCollection(ratesHistoryData) .assignTimestampsAndWatermarks(new TimestampExtractor[String, Long]()) .toTable(tEnv, 'currency, 'rate, 'rowtime.rowtime) tEnv.registerTable("Orders", orders) tEnv.registerTable("RatesHistory", ratesHistory) tEnv.registerTable("FilteredRatesHistory", tEnv.scan("RatesHistory").filter('rate > 110L)) tEnv.registerFunction( "Rates", tEnv.scan("FilteredRatesHistory").createTemporalTableFunction('rowtime, 'currency)) tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery)) // Scan from registered table to test for interplay between // LogicalCorrelateToTemporalTableJoinRule and TableScanRule val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() assertEquals(expectedOutput, StreamITCase.testResults.toSet) } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11908) Port window classes into flink-api-java
Hequn Cheng created FLINK-11908: --- Summary: Port window classes into flink-api-java Key: FLINK-11908 URL: https://issues.apache.org/jira/browse/FLINK-11908 Project: Flink Issue Type: Improvement Components: API / Table SQL Reporter: Hequn Cheng Assignee: Hequn Cheng As discussed in FLINK-11068, it is good to open a separate issue for porting the window classes before opening a PR for the {{Table}} classes. This can make our PR smaller thus will be better to be reviewed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11398) Add a dedicated phase to materialize time indicators for nodes produce updates
Hequn Cheng created FLINK-11398: --- Summary: Add a dedicated phase to materialize time indicators for nodes produce updates Key: FLINK-11398 URL: https://issues.apache.org/jira/browse/FLINK-11398 Project: Flink Issue Type: Improvement Components: Table API SQL Reporter: Hequn Cheng Assignee: Hequn Cheng As discussed [here|https://github.com/apache/flink/pull/6787#discussion_r249056249], we need a dedicated phase to materialize time indicators for nodes produce updates. Details: Currently, we materialize time indicators in `RelTimeInidicatorConverter`. We need to introduce another materialize phase that materializes all time attributes on nodes that produce updates. We can not do it inside `RelTimeInidicatorConverter`, because only later, after physical optimization phase, we know whether it is a non-window outer join which will produce updates There are a few other things we need to consider. - Whether we can unify the two converter phase. - Take window with early fire into consideration(not been implemented yet). In this case, we don't need to materialize time indicators even it produces updates. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11342) Add example for every built-In TableAPI Function
Hequn Cheng created FLINK-11342: --- Summary: Add example for every built-In TableAPI Function Key: FLINK-11342 URL: https://issues.apache.org/jira/browse/FLINK-11342 Project: Flink Issue Type: Improvement Components: Table API SQL Affects Versions: 1.7.1, 1.7.0 Reporter: Hequn Cheng There are a lot of built-in tableApi functions: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions.html Take {{TEMPORAL.extract(TIMEINTERVALUNIT)}} as an example, we have an example for it. {code:java} E.g., '2006-06-05'.toDate.extract(DAY) returns 5; '2006-06-05'.toDate.extract(QUARTER) returns 2. {code} The example is very helpful for users who are not familiar with the udf. And I think it would be great if we can add an example for every built-in function. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11331) Fix errors in tableApi.md and functions.md
Hequn Cheng created FLINK-11331: --- Summary: Fix errors in tableApi.md and functions.md Key: FLINK-11331 URL: https://issues.apache.org/jira/browse/FLINK-11331 Project: Flink Issue Type: Improvement Components: Table API SQL Affects Versions: 1.7.1, 1.7.0 Reporter: Hequn Cheng Assignee: Hequn Cheng -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11310) Convert predicates to IN or NOT_IN for Project
Hequn Cheng created FLINK-11310: --- Summary: Convert predicates to IN or NOT_IN for Project Key: FLINK-11310 URL: https://issues.apache.org/jira/browse/FLINK-11310 Project: Flink Issue Type: Improvement Components: Table API SQL Reporter: Hequn Cheng Assignee: Hequn Cheng In FLINK-10474, we force translate IN into a predicate to avoid translating to a JOIN. In addition, we add a Rule to convert the predicates back to IN so that we can generate code using a HashSet for the IN. However, FLINK-10474 only takes Filter into consideration. It would be great to also convert predicates in Project to IN. It not only will improve the performance for the Project, but also will address the problem raised in FLINK-11308, as all predicates will be converted into one IN expression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11301) Travis failed: No output has been received in the last 10m0s
Hequn Cheng created FLINK-11301: --- Summary: Travis failed: No output has been received in the last 10m0s Key: FLINK-11301 URL: https://issues.apache.org/jira/browse/FLINK-11301 Project: Flink Issue Type: Bug Components: Travis Reporter: Hequn Cheng https://api.travis-ci.org/v3/job/47082/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11222) Change api.scala.DataStream to api.datastream.DataStream in HarnessTestBase
Hequn Cheng created FLINK-11222: --- Summary: Change api.scala.DataStream to api.datastream.DataStream in HarnessTestBase Key: FLINK-11222 URL: https://issues.apache.org/jira/browse/FLINK-11222 Project: Flink Issue Type: Improvement Components: Table API SQL Reporter: Hequn Cheng Thanks to FLINK-11074, we can create harness tester from a DataStream which makes easier to write harness test. However, it would be better if we change the parameter type from api.scala.DataStream to api.datastream.DataStream for the \{{createHarnessTester()}} method, so that both java.DataStream and scala.DataStream can use this method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11221) Support delimited identifiers in TableAPI
Hequn Cheng created FLINK-11221: --- Summary: Support delimited identifiers in TableAPI Key: FLINK-11221 URL: https://issues.apache.org/jira/browse/FLINK-11221 Project: Flink Issue Type: Improvement Components: Table API SQL Reporter: Hequn Cheng Assignee: Hequn Cheng There are two kinds of identifiers in SQL: regular identifiers and delimited identifiers. Different from regular identifiers, delimited identifiers are identifiers enclosed in double backticks(``), double quotation marks (") or brackets ([ ]). Currently, delimited identifiers in double backticks have been supported in Flink SQL(thanks to Calcite). In Calcite, delimited identifiers may contain virtually any character, including spaces and other punctuation. With delimited identifiers, we can name some special identifiers such as `a-a` which may be an essential requirement. However, delimited identifiers are not supported in TableApi. It would be nice if we support it in TableApi. Any suggestions are welcomed! Thank you. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11210) Enable auto state cleanup strategy for RANGE OVER
Hequn Cheng created FLINK-11210: --- Summary: Enable auto state cleanup strategy for RANGE OVER Key: FLINK-11210 URL: https://issues.apache.org/jira/browse/FLINK-11210 Project: Flink Issue Type: Improvement Components: Table API SQL Reporter: Hequn Cheng Assignee: Hequn Cheng As FLINK-11188 discussed, OVER RANGE window should automatically clean up its state instead of relying on the state retention cleanup strategy. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11203) FunctionContext of AggregateFunction will not be initialized for window GroupBy
Hequn Cheng created FLINK-11203: --- Summary: FunctionContext of AggregateFunction will not be initialized for window GroupBy Key: FLINK-11203 URL: https://issues.apache.org/jira/browse/FLINK-11203 Project: Flink Issue Type: Improvement Components: Table API SQL Reporter: Hequn Cheng Assignee: Hequn Cheng Currently, in tableApi/SQL, the implementable of aggregation of group window is base on the WindowStream and {{org.apache.flink.api.common.functions.AggregateFunction}}. Due to FLINK-11198, metrics cannot be accessed within {{org.apache.flink.table.functions.AggregateFunction}} either. It would be nice if we support metrics for both of them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11197) Improve migration test comments about how to generate snapshots
Hequn Cheng created FLINK-11197: --- Summary: Improve migration test comments about how to generate snapshots Key: FLINK-11197 URL: https://issues.apache.org/jira/browse/FLINK-11197 Project: Flink Issue Type: Sub-task Components: Tests Reporter: Hequn Cheng Assignee: Hequn Cheng We should generate snapshots base on the release branch instead of the master. It would better to add these notice in the comments. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11188) Bounded over should not enable state retention time
Hequn Cheng created FLINK-11188: --- Summary: Bounded over should not enable state retention time Key: FLINK-11188 URL: https://issues.apache.org/jira/browse/FLINK-11188 Project: Flink Issue Type: Bug Components: Table API SQL Reporter: Hequn Cheng Assignee: Hequn Cheng As discussed in FLINK-11172, time-based operations (GROUP BY windows, OVER windows, time-windowed join, etc.) are inherently bound by time and automatically clean up their state. We should not add state cleanup or TTL for these operators. If I understand correctly, we should not add the retention logic for rows-bounded operations either. I think we should disable state retention logic for: - ProcTimeBoundedRangeOver - ProcTimeBoundedRowsOver - RowTimeBoundedRangeOver - RowTimeBoundedRowsOver Any suggestions are appreciated! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11150) Check exception messages in ValidationTest of flink-table
Hequn Cheng created FLINK-11150: --- Summary: Check exception messages in ValidationTest of flink-table Key: FLINK-11150 URL: https://issues.apache.org/jira/browse/FLINK-11150 Project: Flink Issue Type: Improvement Components: Table API SQL Reporter: Hequn Cheng Assignee: Hequn Cheng Problem Currently, there are a lot of {{ValidationTests}} in flink-table. These tests are used to test whether exceptions are thrown correctly. However, the detailed messages of the exception have not been checked which makes the tests very fragile. Take the following test as an example: {code:java} class TableSinkValidationTest extends TableTestBase { @Test(expected = classOf[TableException]) def testAppendSinkOnUpdatingTable(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'id, 'num, 'text) tEnv.registerTableSink("testSink", new TestAppendSink) t.groupBy('text) .select('text, 'id.count, 'num.sum) .insertInto("testSink") // must fail because table is not append-only env.execute() } } {code} The test is used to check validation for AppendSink on updating table. The test will pass without any exceptions. If we remove the {{(expected = classOf[TableException])}}, we can see the following exception: {code:java} org.apache.flink.table.api.TableException: Table sink is not configured. {code} However, the correct exception should be: {code:java} org.apache.flink.table.api.TableException: AppendStreamTableSink requires that Table has only insert changes. {code} Since the two exceptions share the same exception class name, we also have to check the exception messages. Proposal To make our test more rigorous, I think it is better to use {{ExpectedException}} to check both the exception class and exception messages. So the previous test would be: {code:java} @Test def testAppendSinkOnUpdatingTable(): Unit = { expectedException.expect(classOf[TableException]) expectedException.expectMessage("AppendStreamTableSink requires that Table has only insert changes.") val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'id, 'num, 'text) tEnv.registerTableSink("testSink", new TestAppendSink() .configure(Array("text", "id", "num"), Array(Types.STRING, Types.LONG, Types.LONG))) t.groupBy('text) .select('text, 'id.count, 'num.sum) .insertInto("testSink") // must fail because table is not append-only env.execute() } {code} which adds two more lines to the test: {code:java} expectedException.expect(classOf[TableException]) expectedException.expectMessage("AppendStreamTableSink requires that Table has only insert changes.") {code} Any suggestions are greatly appreciated! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11147) Add document for TableAggregate Function
Hequn Cheng created FLINK-11147: --- Summary: Add document for TableAggregate Function Key: FLINK-11147 URL: https://issues.apache.org/jira/browse/FLINK-11147 Project: Flink Issue Type: Sub-task Components: Table API SQL Reporter: Hequn Cheng Assignee: Hequn Cheng Add document for {{TableAggregateFunction}}, similar to the document of {{AggregateFunction}}: https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/udfs.html#aggregation-functions Most parts of {{TableAggregateFunction}} would be same with {{AggregateFunction}}, except for the ways of handling outputs. {{AggregateFunction}} outputs a scalar value, while {{TableAggregateFunction}} outputs a Table with multi rows and columns. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11131) Enable unused import checkstyle on flink-core and flink-runtime tests
Hequn Cheng created FLINK-11131: --- Summary: Enable unused import checkstyle on flink-core and flink-runtime tests Key: FLINK-11131 URL: https://issues.apache.org/jira/browse/FLINK-11131 Project: Flink Issue Type: Improvement Components: Core, Local Runtime Reporter: Hequn Cheng Assignee: Hequn Cheng Flink-11125 removes some unused imports. It would be better to enable checkstyle on these modules so that we don't have to check manually. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11125) Remove useless import
Hequn Cheng created FLINK-11125: --- Summary: Remove useless import Key: FLINK-11125 URL: https://issues.apache.org/jira/browse/FLINK-11125 Project: Flink Issue Type: Improvement Components: Table API SQL, Tests Reporter: Hequn Cheng Assignee: Hequn Cheng -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11124) Add private[flink] to TemporalTableFunction.create()
Hequn Cheng created FLINK-11124: --- Summary: Add private[flink] to TemporalTableFunction.create() Key: FLINK-11124 URL: https://issues.apache.org/jira/browse/FLINK-11124 Project: Flink Issue Type: Improvement Components: Table API SQL Reporter: Hequn Cheng Assignee: Hequn Cheng {{TemporalTableFunction}} is an user-oriented class. I think it would be better to add {{private[flink]}} to the {{TemporalTableFunction.create()}} method in order to make it invisible to users. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11090) Unused parameter in WindowedStream.aggregate()
Hequn Cheng created FLINK-11090: --- Summary: Unused parameter in WindowedStream.aggregate() Key: FLINK-11090 URL: https://issues.apache.org/jira/browse/FLINK-11090 Project: Flink Issue Type: Improvement Components: DataStream API Reporter: Hequn Cheng Assignee: Hequn Cheng The {{aggregateResultType}} parameter in {{WindowedStream.aggregate()}} seems useless. Or what have I missed? If it is useless, I prefer to remove the parameter by adding a new API and deprecate the current one. We can't remove it directly as it is PublicEvolving. {code:java} @PublicEvolving public SingleOutputStreamOperator aggregate( AggregateFunction aggregateFunction, ProcessWindowFunction windowFunction, TypeInformation accumulatorType, TypeInformation aggregateResultType, TypeInformation resultType) { } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)