[jira] [Created] (FLINK-35546) Elasticsearch 8 connector fails fast for non-retryable bulk request items
Mingliang Liu created FLINK-35546: - Summary: Elasticsearch 8 connector fails fast for non-retryable bulk request items Key: FLINK-35546 URL: https://issues.apache.org/jira/browse/FLINK-35546 Project: Flink Issue Type: Improvement Components: Connectors / ElasticSearch Reporter: Mingliang Liu Discussion thread: [https://lists.apache.org/thread/yrf0mmbch0lhk3rgkz94fr0x5qz2417l] {quote} Currently the Elasticsearch 8 connector retries all items if the request fails as a whole, and retries failed items if the request has partial failures [[1|https://github.com/apache/flink-connector-elasticsearch/blob/5d1f8d03e3cff197ed7fe30b79951e44808b48fe/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java#L152-L170]\]. I think this infinitely retries might be problematic in some cases when retrying can never eventually succeed. For example, if the request is 400 (bad request) or 404 (not found), retries do not help. If there are too many failed items non-retriable, new requests will get processed less effectively. In extreme cases, it may stall the pipeline if in-flight requests are occupied by those failed items. FLIP-451 proposes timeout for retrying which helps with un-acknowledged requests, but not addressing the case when request gets processed and failed items keep failing no matter how many times we retry. Correct me if I'm wrong. One opinionated option is to fail fast for non-retriable errors like 400 / 404 and to drop items for 409. Or we can allow users to configure "drop/fail" behavior for non-retriable errors. I prefer the latter. I checked how LogStash ingests data to Elasticsearch and it takes a similar approach for non-retriable errors [[2|https://github.com/logstash-plugins/logstash-output-elasticsearch/blob/main/lib/logstash/plugin_mixins/elasticsearch/common.rb#L283-L304]\]. In my day job, we have a dead-letter-queue in AsynSinkWriter for failed entries that exhaust retries. I guess that is too specific to our setup and seems an overkill here for Elasticsearch connector. {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35504) Improve Elasticsearch 8 connector observability
Mingliang Liu created FLINK-35504: - Summary: Improve Elasticsearch 8 connector observability Key: FLINK-35504 URL: https://issues.apache.org/jira/browse/FLINK-35504 Project: Flink Issue Type: Improvement Components: Connectors / ElasticSearch Affects Versions: elasticsearch-3.1.0 Reporter: Mingliang Liu Currently all logs are in DEBUG level. Some of those messages are very helpful to get the progress and errors, which can be changed to INFO or WARN level. We can also include error details into DEBUG level messages so it's easier to debug with more context. Meanwhile, we can update the metric to track {{numRecordsSend}}. FWIW, the base class tracks following metrics already so we don't need to implement them: {{CurrentSendTime}} Gauge, {{NumBytesOut}} and {{NumRecordsOut}} Counters. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35472) Improve tests for Elasticsearch 8 connector
Mingliang Liu created FLINK-35472: - Summary: Improve tests for Elasticsearch 8 connector Key: FLINK-35472 URL: https://issues.apache.org/jira/browse/FLINK-35472 Project: Flink Issue Type: Improvement Components: Connectors / ElasticSearch, Tests Reporter: Mingliang Liu Per discussion in [this PR|https://github.com/apache/flink-connector-elasticsearch/pull/104], it makes the tests more reusable if we use parameterized tests. It requires some changes of the existing tests, which includes: # Make base test class parameterized with secure parameter. As JUnit 5 has limited support for parameterized tests with inheritance, we can use the {{ParameterizedTestExtension}} introduced in Flink, see this doc # Manage the test container lifecycle instead of using the managed annotation {{@Testcontainers}} and {{@Container}} so that the test containers can be used as a singleton for all tests in the suite # Create and use common methods in the base class that concrete test classes can be mostly parameter-agnostic This JIRA intends to not change any logic or functionality. Instead it focuses on tests refactoring for more reusable tests and future proof. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35424) Elasticsearch connector 8 supports SSL context
Mingliang Liu created FLINK-35424: - Summary: Elasticsearch connector 8 supports SSL context Key: FLINK-35424 URL: https://issues.apache.org/jira/browse/FLINK-35424 Project: Flink Issue Type: Improvement Components: Connectors / ElasticSearch Affects Versions: 1.17.1 Reporter: Mingliang Liu Assignee: Mingliang Liu The current Flink ElasticSearch connector does not support SSL option, causing issues connecting to secure ES clusters. As SSLContext is not serializable and possibly environment aware, we can add a (serializable) provider of SSL context to the {{NetworkClientConfig}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35287) Builder builds NetworkConfig for Elasticsearch connector 8
Mingliang Liu created FLINK-35287: - Summary: Builder builds NetworkConfig for Elasticsearch connector 8 Key: FLINK-35287 URL: https://issues.apache.org/jira/browse/FLINK-35287 Project: Flink Issue Type: Improvement Components: Connectors / ElasticSearch Reporter: Mingliang Liu In FLINK-26088 we added support for ElasticSearch 8.0. It is based on Async sink API and does not use the base module {{flink-connector-elasticsearch-base}}. Regarding the config options (host, username, password, headers, ssl...), we pass all options from the builder to AsyncSink, and last to AsyncWriter. It is less flexible when we add new options and the constructors will get longer and multiple places may validate options unnecessarily. I think it's nice if we make the sink builder builds the NetworkConfig once, and pass it all the way to the writer. This is also how the base module for 6.x / 7.x is implemented. In my recent work adding new options to the network config, this way works simpler. Let me create a PR to demonstrate the idea. No new features or major code refactoring other than the builder builds the NetworkConfig (code will be shorter). I have a few small fixes which I'll include into the incoming PR. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35221) Support SQL 2011 reserved keywords as identifiers in Flink HiveParser
Wencong Liu created FLINK-35221: --- Summary: Support SQL 2011 reserved keywords as identifiers in Flink HiveParser Key: FLINK-35221 URL: https://issues.apache.org/jira/browse/FLINK-35221 Project: Flink Issue Type: Improvement Components: Connectors / Hive Affects Versions: 1.20.0 Reporter: Wencong Liu According to Hive user documentation[1], starting from version 0.13.0, Hive prohibits the use of reserved keywords as identifiers. Moreover, versions 2.1.0 and earlier allow using SQL11 reserved keywords as identifiers by setting {{hive.support.sql11.reserved.keywords=false}} in hive-site.xml. This compatibility feature facilitates jobs that utilize keywords as identifiers. HiveParser in Flink, relying on Hive version 2.3.9, lacks the option to treat SQL11 reserved keywords as identifiers. This poses a challenge for users migrating SQL from Hive 1.x to Flink SQL, as they might encounter scenarios where keywords are used as identifiers. Addressing this issue is necessary to support such cases. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35148) Improve InstantiationUtil for checking nullary public constructor
Mingliang Liu created FLINK-35148: - Summary: Improve InstantiationUtil for checking nullary public constructor Key: FLINK-35148 URL: https://issues.apache.org/jira/browse/FLINK-35148 Project: Flink Issue Type: Improvement Components: API / Core Affects Versions: 1.18.1, 1.19.0 Reporter: Mingliang Liu {{InstantiationUtil#hasPublicNullaryConstructor}} checks whether the given class has a public nullary constructor. The implementation can be improved a bit: the `Modifier#isPublic` check within the for-loop can be skipped as the {{Class#getConstructors()}} only returns public constructors. We can also add a negative unit test for this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34632) Log checkpoint Id when logging checkpoint processing delay
Mingliang Liu created FLINK-34632: - Summary: Log checkpoint Id when logging checkpoint processing delay Key: FLINK-34632 URL: https://issues.apache.org/jira/browse/FLINK-34632 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Affects Versions: 1.18.1 Reporter: Mingliang Liu Currently we log a warning message when the checkpoint barrier takes too long to start processing. It has the delay and would be easier for debugging respective checkpoint if the id is also logged. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34543) Support Full Partition Processing On Non-keyed DataStream
Wencong Liu created FLINK-34543: --- Summary: Support Full Partition Processing On Non-keyed DataStream Key: FLINK-34543 URL: https://issues.apache.org/jira/browse/FLINK-34543 Project: Flink Issue Type: Improvement Components: API / DataStream Affects Versions: 1.20.0 Reporter: Wencong Liu Fix For: 1.20.0 1. Introduce MapParititon, SortPartition, Aggregate, Reduce API in DataStream. 2. Introduce SortPartition API in KeyedStream. The related FLIP can be found in [FLIP-380|https://cwiki.apache.org/confluence/display/FLINK/FLIP-380%3A+Support+Full+Partition+Processing+On+Non-keyed+DataStream]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34376) FLINK SQL SUM() causes a precision error
Fangliang Liu created FLINK-34376: - Summary: FLINK SQL SUM() causes a precision error Key: FLINK-34376 URL: https://issues.apache.org/jira/browse/FLINK-34376 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.18.1, 1.14.3 Reporter: Fangliang Liu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34369) Elasticsearch connector supports SSL provider
Mingliang Liu created FLINK-34369: - Summary: Elasticsearch connector supports SSL provider Key: FLINK-34369 URL: https://issues.apache.org/jira/browse/FLINK-34369 Project: Flink Issue Type: Improvement Components: Connectors / ElasticSearch Affects Versions: 1.17.1 Reporter: Mingliang Liu The current Flink ElasticSearch connector does not support SSL option, causing issues connecting to secure ES clusters. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34251) ClosureCleaner to include reference classes for non-serialization exception
Mingliang Liu created FLINK-34251: - Summary: ClosureCleaner to include reference classes for non-serialization exception Key: FLINK-34251 URL: https://issues.apache.org/jira/browse/FLINK-34251 Project: Flink Issue Type: Improvement Components: API / Core Affects Versions: 1.18.2 Reporter: Mingliang Liu Currently the ClosureCleaner throws exception if {{checkSerializable} is enabled while some object is non-serializable. It includes the non-serializable (nested) object in the exception in the exception message. However, when the user job program gets more complex pulling multiple operators each of which pulls multiple 3rd party libraries, it is unclear how the non-serializable object is referenced as some of those objects could be nested in multiple levels. For example, following exception is not straightforward where to check: {code} org.apache.flink.api.common.InvalidProgramException: java.lang.Object@528c868 is not serializable. {code} It would be nice to include the reference stack in the exception message, as following: {code} org.apache.flink.api.common.InvalidProgramException: java.lang.Object@72437d8d is not serializable. Referenced via [class com.mycompany.myapp.ComplexMap, class com.mycompany.myapp.LocalMap, class com.yourcompany.yourapp.YourPojo, class com.hercompany.herapp.Random, class java.lang.Object] ... {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33949) METHOD_ABSTRACT_NOW_DEFAULT should be both source compatible and binary compatible
Wencong Liu created FLINK-33949: --- Summary: METHOD_ABSTRACT_NOW_DEFAULT should be both source compatible and binary compatible Key: FLINK-33949 URL: https://issues.apache.org/jira/browse/FLINK-33949 Project: Flink Issue Type: Bug Components: Test Infrastructure Affects Versions: 1.19.0 Reporter: Wencong Liu Fix For: 1.19.0 Currently I'm trying to refactor some APIs annotated by @Public in [FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs - Apache Flink - Apache Software Foundation|https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs]. When an abstract method is changed into a default method, the japicmp maven plugin names this change METHOD_ABSTRACT_NOW_DEFAULT and considers it as source incompatible and binary incompatible. The reason maybe that if the abstract method becomes default, the logic in the default method will be ignored by the previous implementations. I create a test case in which a job is compiled with newly changed default method and submitted to the previous version. There is no exception thrown. Therefore, the METHOD_ABSTRACT_NOW_DEFAULT shouldn't be incompatible both for source and binary. By the way, currently the master branch checks both source compatibility and binary compatibility between minor versions. According to Flink's API compatibility constraints, the master branch shouldn't check binary compatibility. There is already a [Jira|[FLINK-33009] tools/release/update_japicmp_configuration.sh should only enable binary compatibility checks in the release branch - ASF JIRA (apache.org)] to track it and we should fix it as soon as possible. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33905) FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs
Wencong Liu created FLINK-33905: --- Summary: FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs Key: FLINK-33905 URL: https://issues.apache.org/jira/browse/FLINK-33905 Project: Flink Issue Type: Improvement Components: API / Core Affects Versions: 1.19.0 Reporter: Wencong Liu This ticket is proposed for [FLIP-382|https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33785) TableJdbcUpsertOutputFormat could not deal with DELETE record correctly when primary keys were set
Bodong Liu created FLINK-33785: -- Summary: TableJdbcUpsertOutputFormat could not deal with DELETE record correctly when primary keys were set Key: FLINK-33785 URL: https://issues.apache.org/jira/browse/FLINK-33785 Project: Flink Issue Type: Bug Components: Connectors / JDBC Affects Versions: jdbc-3.1.1 Environment: Flink: 1.17.1 Jdbc connector: 3.1.1 Postgresql: 16.1 Reporter: Bodong Liu Attachments: image-2023-12-08-22-24-20-295.png, image-2023-12-08-22-24-26-493.png, image-2023-12-08-22-24-58-986.png, image-2023-12-08-22-28-44-948.png, image-2023-12-08-22-38-08-559.png, image-2023-12-08-22-40-35-530.png, image-2023-12-08-22-42-06-566.png h1. Issue Description When using jdbc connector to DELETE records in database, I found it CAN NOT delete records correctly. h1. Reproduction steps The steps are as follows: * Create a table with 5 fields and a pk. DDL in postgres: {code:java} create table public.fake ( id bigint not null default nextval('fake_id_seq'::regclass), name character varying(128) not null, age integer, location character varying(256), birthday timestamp without time zone default CURRENT_TIMESTAMP, primary key (id, name) );{code} !image-2023-12-08-22-24-26-493.png! * Insert some data into the table: {code:java} INSERT INTO public.fake (id, name, age, location, birthday) VALUES (1, 'Jack', 10, null, '2023-12-08 21:35:46.00'); INSERT INTO public.fake (id, name, age, location, birthday) VALUES (2, 'Jerry', 18, 'Fake Location', '2023-12-08 13:36:17.088295'); INSERT INTO public.fake (id, name, age, location, birthday) VALUES (3, 'John', 20, null, null); INSERT INTO public.fake (id, name, age, location, birthday) VALUES (4, 'Marry', null, null, '2023-12-08 13:37:09.721785'); {code} !image-2023-12-08-22-24-58-986.png! * Run the flink code: {code:java} public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final String[] fieldNames = {"id", "name", "age", "location", "birthday"}; final int[] fieldTypes = { Types.BIGINT, Types.VARCHAR, Types.INTEGER, Types.VARCHAR, Types.TIMESTAMP }; final String[] primaryKeys = {"id", "name"}; InternalJdbcConnectionOptions internalJdbcConnectionOptions = InternalJdbcConnectionOptions.builder() .setClassLoader(Thread.currentThread().getContextClassLoader()) .setDriverName(Driver.class.getName()) .setDBUrl("jdbc:postgresql://localhost:5432/postgres") .setUsername("postgres") .setPassword("postgres") .setTableName("fake") .setParallelism(1) .setConnectionCheckTimeoutSeconds(10) .setDialect(new PostgresDialect()) .build(); JdbcOutputFormat, Row, JdbcBatchStatementExecutor> jdbcOutputFormat = JdbcOutputFormat.builder() .setFieldNames(fieldNames) .setKeyFields(primaryKeys) .setFieldTypes(fieldTypes) .setOptions(internalJdbcConnectionOptions) .setFlushIntervalMills(1000) .setFlushMaxSize(10) .setMaxRetryTimes(3) .build(); GenericJdbcSinkFunction> jdbcSinkFunction = new GenericJdbcSinkFunction<>(jdbcOutputFormat); Timestamp timestamp = Timestamp.valueOf("2023-12-08 21:35:46.00"); // Row to delete Row row = Row.ofKind(RowKind.DELETE, 1L, "Jack", 10, null, timestamp); Tuple2 element = Tuple2.of(false, row); env.fromCollection(Collections.singleton(element)).addSink(jdbcSinkFunction); env.execute(); } {code} When the code executed successfully, we can see that the record id=1 and name=Jack was not deleted. h1. Cause Analysis In the build method of JdbcOutputFormat.Builder, if 'keyFields' option was set in the JdbcDmlOptions, the method will return a 'org.apache.flink.connector.jdbc.internal.TableJdbcUpsertOutputFormat'. !image-2023-12-08-22-28-44-948.png! And in 'org.apache.flink.connector.jdbc.internal.TableJdbcUpsertOutputFormat#createDeleteExecutor', the method get all the fieldNames instead of keyFields to build the delete sql statement. So the detele sql may not execute correctly. !image-2023-12-08-22-38-08-559.png! h1. How to fix * Use the real keyFields then fallback to fieldNames to build the executor. !image-2023-12-08-22-42-06-566.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33569) Could not deploy yarn-application when using yarn over s3a filesystem.
Bodong Liu created FLINK-33569: -- Summary: Could not deploy yarn-application when using yarn over s3a filesystem. Key: FLINK-33569 URL: https://issues.apache.org/jira/browse/FLINK-33569 Project: Flink Issue Type: Bug Components: Deployment / YARN Affects Versions: 1.17.1, 1.18.0 Environment: h1. *Env:* * OS: ArchLinux kernel:{color:#00}6.6.1 AMD64{color} * Flink: 1.17.1 * Hadoop: 3.3.6 * Minio: 2023-11-15 h1. Settings h2. hadoop core-site.xml: {code:java} fs.defaultFS s3a://hadoop fs.s3a.path.style.access true fs.s3a.access.key admin fs.s3a.secret.key password fs.s3a.endpoint http://localhost:9000 fs.s3a.connection.establish.timeout 5000 fs.s3a.multipart.size 512M fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem fs.AbstractFileSystem.s3a.impl org.apache.hadoop.fs.s3a.S3A {code} h1. Flink run command: ./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar Reporter: Bodong Liu Attachments: 2023-11-16_16-47.png, image-2023-11-16-16-46-21-684.png, image-2023-11-16-16-48-40-223.png I now use the `yarn-application` mode to deploy Flink. I found that when I set Hadoop's storage to the s3a file system, Flink could not submit tasks to Yarn. The error is reported as follows: {code:java} The program finished with the following exception: org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn Application Cluster at org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:481) at org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67) at org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:212) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1098) at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157) Caused by: org.apache.hadoop.fs.PathIOException: `Cannot get relative path for URI:file:///tmp/application_1700122774429_0001-flink-conf.yaml5526160496134930395.tmp': Input/output error at org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.getFinalPath(CopyFromLocalOperation.java:360) at org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.uploadSourceFromFS(CopyFromLocalOperation.java:222) at org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:169) at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$26(S3AFileSystem.java:3854) at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:547) at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:528) at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:449) at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2480) at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2499) at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3847) at org.apache.flink.yarn.YarnApplicationFileUploader.copyToRemoteApplicationDir(YarnApplicationFileUploader.java:397) at org.apache.flink.yarn.YarnApplicationFileUploader.uploadLocalFileToRemote(YarnApplicationFileUploader.java:202) at org.apache.flink.yarn.YarnApplicationFileUploader.registerSingleLocalResource(YarnApplicationFileUploader.java:181) at org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1050) at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:626) at org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:474) ... 10 more {code} I found by looking through the source code and debugging that when Hadoop uses the s3a file system, uploading and downloading files must use URIs with
[jira] [Created] (FLINK-33445) Translate DataSet migration guideline to Chinese
Wencong Liu created FLINK-33445: --- Summary: Translate DataSet migration guideline to Chinese Key: FLINK-33445 URL: https://issues.apache.org/jira/browse/FLINK-33445 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.19.0 Reporter: Wencong Liu Fix For: 1.19.0 The [FLIINK-33041|https://issues.apache.org/jira/browse/FLINK-33041] about adding an introduction about how to migrate DataSet API to DataStream has been merged into master branch. Here is the link in the Flink website: [How to Migrate from DataSet to DataStream | Apache Flink|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/dataset_migration/] According to the [contribution guidelines|https://flink.apache.org/how-to-contribute/contribute-documentation/#chinese-documentation-translation], we should add an identical markdown file in {{content.zh/}} and translate it to Chinese. Any community volunteers are welcomed to take this task. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33144) Deprecate Iteration API in DataStream
Wencong Liu created FLINK-33144: --- Summary: Deprecate Iteration API in DataStream Key: FLINK-33144 URL: https://issues.apache.org/jira/browse/FLINK-33144 Project: Flink Issue Type: Technical Debt Components: API / DataStream Affects Versions: 1.19.0 Reporter: Wencong Liu Fix For: 1.19.0 Currently, the Iteration API of DataStream is incomplete. For instance, it lacks support for iteration in sync mode and exactly once semantics. Additionally, it does not offer the ability to set iteration termination conditions. As a result, it's hard for developers to build an iteration pipeline by DataStream in the practical applications such as machine learning. [FLIP-176: Unified Iteration to Support Algorithms|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=184615300] has introduced a unified iteration library in the Flink ML repository. This library addresses all the issues present in the Iteration API of DataStream and could provide solution for all the iteration use-cases. However, maintaining two separate implementations of iteration in both the Flink repository and the Flink ML repository would introduce unnecessary complexity and make it difficult to maintain the Iteration API. FLIP-357 has decided to deprecate the Iteration API of DataStream and remove it completely in the next major version. In the future, if other modules in the Flink repository require the use of the Iteration API, we can consider extracting all Iteration implementations from the Flink ML repository into an independent module. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33079) The gap between the checkpoint timeout and the interval settings is too large
Fangliang Liu created FLINK-33079: - Summary: The gap between the checkpoint timeout and the interval settings is too large Key: FLINK-33079 URL: https://issues.apache.org/jira/browse/FLINK-33079 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.19.0 Reporter: Fangliang Liu The gap between the checkpoint timeout and the interval settings is too large Some users will think that the documentation is the optimal solution and refer to this demo setting, -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33041) Add an introduction about how to migrate DataSet API to DataStream
Wencong Liu created FLINK-33041: --- Summary: Add an introduction about how to migrate DataSet API to DataStream Key: FLINK-33041 URL: https://issues.apache.org/jira/browse/FLINK-33041 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.18.0 Reporter: Wencong Liu Fix For: 1.18.0 The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their data processing requirements. Most of the DataSet operators can be implemented using the DataStream API. However, we believe it would be beneficial to have an introductory article on the Flink website that guides users in migrating their DataSet jobs to DataStream. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32979) Deprecate WindowAssigner#getDefaultTrigger(StreamExecutionEnvironment env)
Wencong Liu created FLINK-32979: --- Summary: Deprecate WindowAssigner#getDefaultTrigger(StreamExecutionEnvironment env) Key: FLINK-32979 URL: https://issues.apache.org/jira/browse/FLINK-32979 Project: Flink Issue Type: Technical Debt Components: API / Core Affects Versions: 1.19.0 Reporter: Wencong Liu Fix For: 1.19.0 The [FLIP-343|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263425229] has decided that the parameter in WindowAssigner#getDefaultTrigger() will be removed in the next major version. We should deprecate it now and remove it in Flink 2.0. The removal will be tracked in [FLINK-4675|https://issues.apache.org/jira/browse/FLINK-4675]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32978) Deprecate RichFunction#open(Configuration parameters)
Wencong Liu created FLINK-32978: --- Summary: Deprecate RichFunction#open(Configuration parameters) Key: FLINK-32978 URL: https://issues.apache.org/jira/browse/FLINK-32978 Project: Flink Issue Type: Technical Debt Components: API / Core Affects Versions: 1.19.0 Reporter: Wencong Liu The [FLIP-344|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263425231] has decided that the parameter in RichFunction#open will be removed in the next major version. We should deprecate it now and remove it in Flink 2.0. The removal will be tracked in [FLINK-6912|https://issues.apache.org/jira/browse/FLINK-6912]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32770) Fix the inaccurate backlog number of Hybrid Shuffle
Wencong Liu created FLINK-32770: --- Summary: Fix the inaccurate backlog number of Hybrid Shuffle Key: FLINK-32770 URL: https://issues.apache.org/jira/browse/FLINK-32770 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.18.0 Reporter: Wencong Liu Fix For: 1.18.0 The backlog is inaccurate in both memory and disk tier. We should fix it to prevent redundant memory usage in reader side. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32742) Remove flink-examples-batch module
Wencong Liu created FLINK-32742: --- Summary: Remove flink-examples-batch module Key: FLINK-32742 URL: https://issues.apache.org/jira/browse/FLINK-32742 Project: Flink Issue Type: Technical Debt Components: Examples Affects Versions: 2.0.0 Reporter: Wencong Liu Fix For: 2.0.0 All DataSet APIs will be deprecated in [FLINK-32558], and the examples in the flink-examples-batch module should no longer be included in flink-dist. This change aims to prevent developers from continuing to use the DataSet API. However, it is important to note that for testing purposes, the module is still utilized by many end-to-end tests. Therefore, we should explore options to remove the examples from the flink-dist before removing the DataSet API. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32741) Remove DataSet related descriptions in doc
Wencong Liu created FLINK-32741: --- Summary: Remove DataSet related descriptions in doc Key: FLINK-32741 URL: https://issues.apache.org/jira/browse/FLINK-32741 Project: Flink Issue Type: Technical Debt Components: Documentation Affects Versions: 2.0.0 Reporter: Wencong Liu Fix For: 2.0.0 Since All DataSet APIs will be deprecated in [FLINK-32558] and we don't recommend developers to use the DataSet, the descriptions of DataSet should be removed in the doc after [FLINK-32558]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32708) Fix the write logic in remote tier of hybrid shuffle
Wencong Liu created FLINK-32708: --- Summary: Fix the write logic in remote tier of hybrid shuffle Key: FLINK-32708 URL: https://issues.apache.org/jira/browse/FLINK-32708 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.18.0 Reporter: Wencong Liu Fix For: 1.18.0 Currently, on the writer side in the remote tier, the flag file indicating the latest segment id is updated first, followed by the creation of the data file. This results in an incorrect order of file creation and we should fix it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32593) DelimitedInputFormat will cause record loss for multi-bytes delimit when a delimit is seperated to two splits
Zhaofu Liu created FLINK-32593: -- Summary: DelimitedInputFormat will cause record loss for multi-bytes delimit when a delimit is seperated to two splits Key: FLINK-32593 URL: https://issues.apache.org/jira/browse/FLINK-32593 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.17.1, 1.16.2, 1.16.1 Reporter: Zhaofu Liu Attachments: 5parallel.dat, image-2023-07-15-10-30-03-740.png Run the following test to reproduce this bug. {code:java} // code placeholder import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.io.DelimitedInputFormat; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.Test; import javax.xml.bind.DatatypeConverter; import java.io.IOException; public class MyTest { @Test public void myTest() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(5); String path = MyTest.class.getClassLoader().getResource("5parallel.dat").getPath(); final DelimitedInputFormat inputFormat = new TestInputFormat(); // The delimiter is "B87E7E7E" inputFormat.setDelimiter(new byte[]{(byte) 184, (byte) 126, (byte) 126, (byte) 126}); // Set buffer size less than default value of 1M for easily debugging inputFormat.setBufferSize(128); DataStreamSource source = env.readFile(inputFormat, path); source.map(new MapFunction() { @Override public Object map(byte[] value) throws Exception { System.out.println(DatatypeConverter.printHexBinary(value)); return value; } }).setParallelism(1); env.execute(); } private class TestInputFormat extends DelimitedInputFormat { @Override public byte[] readRecord(byte[] reuse, byte[] bytes, int offset, int numBytes) throws IOException { final int delimiterLen = this.getDelimiter().length; if (numBytes > 0) { byte[] record = new byte[delimiterLen + numBytes]; System.arraycopy(this.getDelimiter(), 0, record, 0, delimiterLen); System.arraycopy(bytes, offset, record, delimiterLen, numBytes); return record; } return new byte[0]; } } } {code} The actually output result is: {code:java} // code placeholder B87E7E7E1A00EB900A4EDC6850160070F6BED4631321ADDC6F06DC137C221E99 B87E7E7E1A00EB900A4EDC6A51160070F61A8AFE022A3EC67718002A217C2181 B87E7E7E1A00EB900A4EDC6D5516 {code} The expected output result shoud be: {code:java} // code placeholder B87E7E7E1A00EB900A4EDC6850160070F6BED4631321ADDC6F06DC137C221E99 B87E7E7E1A00EB900A4EDC6B52150070F6BE468EFD20BEEEB756E03FD7F653D0 B87E7E7E1A00EB900A4EDC6D5516 B87E7E7E1A00EB900A4EDC6A51160070F61A8AFE022A3EC67718002A217C2181 {code} The view of a delimit is seperated to two splits (The tail of line 2 and head of line 3): !image-2023-07-15-10-30-03-740.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32133) Batch requests and remove requests in the end to reduce YarnResourceManager's excess containers
Liu created FLINK-32133: --- Summary: Batch requests and remove requests in the end to reduce YarnResourceManager's excess containers Key: FLINK-32133 URL: https://issues.apache.org/jira/browse/FLINK-32133 Project: Flink Issue Type: Improvement Components: Deployment / YARN Affects Versions: 1.18.0 Reporter: Liu h1. Problem When the initial job requests many containers from yarn, it is easy to get more than needed containers for that the YARN AM-RM protocol is not a delta protocol (please see YARN-1902). For example, we are needing 3000 containers. Consider the following case. *Case one:* # The job requests 2000 containers firstly and the the yarn client has 2000 requests. # {color:#FF}The yarn heartbeat happens and the yarn client +request 2000 containers+ to yarn rm.{color} # The job requests another 1000 containers and the the yarn client has 3000 requests. # {color:#FF}The yarn heartbeat happens and the yarn client +request 3000 containers+ to yarn rm.{color} # On heartbeat finish, yarn rm {+}returns 2000 containers{+}. After the callback the method onContainersAllocated and removeContainerRequest, yarn client has 1000 requests. # {color:#FF}The yarn heartbeat happens and the yarn client +request 1000 containers+ to yarn rm. {color} # On heartbeat finish, yarn rm {+}returns 3000 containers{+}. After the callback the method onContainersAllocated and removeContainerRequest, yarn client has 0 requests. # {color:#FF}The yarn heartbeat happens.{color} # On heartbeat finish, yarn rm +returns 1000 containers+{color:#FF} {color:#172b4d}which are excess since the last client request number is 1000.{color}{color} {color:#172b4d}In the end, the yarn may allocate 2000 + 3000 + 1000 = 6000 containers. But we only need 3000 containers and should return 3000 containers.{color} *{color:#172b4d}Case two:{color}* # {color:#172b4d}The job requests 3000 containers firstly and the the yarn client has 3000 requests.{color} # {color:#FF}The yarn heartbeat happens and the yarn client +request 3000 containers+ to yarn rm.{color} # On heartbeat finish, yarn rm {+}returns 1000 containers({+}2000 allocating{+}){+}. After the callback the method onContainersAllocated and removeContainerRequest, yarn client has 2000 requests. # {color:#FF}The yarn heartbeat happens and the yarn client +request 2000 containers+ to yarn rm.{color} # On heartbeat finish, yarn rm {+}returns 2000 containers{+}. After the callback the method onContainersAllocated and removeContainerRequest, yarn client has 0 requests. # {color:#FF}The yarn heartbeat happens.{color} # On heartbeat finish, yarn rm +returns 2000 containers+{color:#FF} {color:#172b4d}which are excess since the last client request number is 2000.{color}{color} {color:#172b4d}In the end, the yarn may allocate 1000 + 2000 + 2000 = 5000 containers. But we only need 3000 containers and should return 2000 containers.{color} {color:#172b4d}The reason is that any update to the yarn client's requests may produce undesired behavior. {color} h1. {color:#172b4d}Solution{color} {color:#172b4d}In our inner flink version, we use two ways to resolve the problem as following:{color} # {color:#172b4d}{color:#172b4d}Compute the total resource requests at start and request by batch{color}{color}{color:#172b4d} to avoid being interrupted by yarn heartbeat. Note that we {color}{color:#172b4d}{color:#172b4d}loop {color}{color}resourceManagerClient.addContainerRequest(containerRequest){color:#172b4d}) to simulate batch-request quickly.{color} # {color:#172b4d}Remove the yarn client's container requests after receiving enough resources to avoid request update.{color} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32121) Avro Confluent Schema Registry nightly end-to-end test failed due to timeout
Wencong Liu created FLINK-32121: --- Summary: Avro Confluent Schema Registry nightly end-to-end test failed due to timeout Key: FLINK-32121 URL: https://issues.apache.org/jira/browse/FLINK-32121 Project: Flink Issue Type: Bug Components: Build System / CI Affects Versions: 1.18.0 Environment: !temp2.jpg! Reporter: Wencong Liu Attachments: temp1.jpg, temp2.jpg [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49102=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=0f3adb59-eefa-51c6-2858-3654d9e0749d#:~:text=%5BFAIL%5D%20%27Avro%20Confluent%20Schema%20Registry] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32066) Flink Ci service on Azure stops responding to pull requests
Wencong Liu created FLINK-32066: --- Summary: Flink Ci service on Azure stops responding to pull requests Key: FLINK-32066 URL: https://issues.apache.org/jira/browse/FLINK-32066 Project: Flink Issue Type: Bug Components: Build System / Azure Pipelines Affects Versions: 1.18.0 Reporter: Wencong Liu Attachments: 20230512152023.jpg As of the time when this issue was created, Flink's CI service on Azure could no longer be triggered by new pull requests. !20230512152023.jpg! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32023) execution.buffer-timeout cannot be set to -1 ms
Liu created FLINK-32023: --- Summary: execution.buffer-timeout cannot be set to -1 ms Key: FLINK-32023 URL: https://issues.apache.org/jira/browse/FLINK-32023 Project: Flink Issue Type: Improvement Components: API / DataStream Reporter: Liu The desc for execution.buffer-timeout is as following: {code:java} public static final ConfigOption BUFFER_TIMEOUT = ConfigOptions.key("execution.buffer-timeout") .durationType() .defaultValue(Duration.ofMillis(100)) .withDescription( Description.builder() .text( "The maximum time frequency (milliseconds) for the flushing of the output buffers. By default " + "the output buffers flush frequently to provide low latency and to aid smooth developer " + "experience. Setting the parameter can result in three logical modes:") .list( text( "A positive value triggers flushing periodically by that interval"), text( FLUSH_AFTER_EVERY_RECORD + " triggers flushing after every record thus minimizing latency"), text( DISABLED_NETWORK_BUFFER_TIMEOUT + " ms triggers flushing only when the output buffer is full thus maximizing " + "throughput")) .build()); {code} When we set execution.buffer-timeout to -1 ms, the following error is reported: {code:java} Caused by: java.lang.IllegalArgumentException: Could not parse value '-1 ms' for key 'execution.buffer-timeout'. at org.apache.flink.configuration.Configuration.getOptional(Configuration.java:856) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:822) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.(StreamExecutionEnvironment.java:224) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.(StreamContextEnvironment.java:51) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createStreamExecutionEnvironment(StreamExecutionEnvironment.java:1996) at java.util.Optional.orElseGet(Optional.java:267) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:1986) at com.kuaishou.flink.examples.api.WordCount.main(WordCount.java:27) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:327) ... 11 more Caused by: java.lang.NumberFormatException: text does not start with a number at org.apache.flink.util.TimeUtils.parseDuration(TimeUtils.java:78) at org.apache.flink.configuration.Configuration.convertToDuration(Configuration.java:1058) at org.apache.flink.configuration.Configuration.convertValue(Configuration.java:996) at org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:853) at java.util.Optional.map(Optional.java:215) at org.apache.flink.configuration.Configuration.getOptional(Configuration.java:853) ... 23 more {code} The reason is that the value for Duration can not be negative. We should change the behavior or support to trigger flushing only when the output buffer is full. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31984) Savepoint on S3 should be relocatable if entropy injection is not effective
Mingliang Liu created FLINK-31984: - Summary: Savepoint on S3 should be relocatable if entropy injection is not effective Key: FLINK-31984 URL: https://issues.apache.org/jira/browse/FLINK-31984 Project: Flink Issue Type: Improvement Components: FileSystems, Runtime / Checkpointing Affects Versions: 1.16.1 Reporter: Mingliang Liu We have a limitation that if we create savepoints with an injected entropy, they are not relocatable (https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints). FLINK-25952 improves the check by inspecting both the FileSystem extending {{EntropyInjectingFileSystem}} and {{FlinkS3FileSystem#getEntropyInjectionKey}} not returning null. We can improve this further by checking the checkpoint path is indeed using the entropy injection key. Without that, the savepoint is not relocatable even if the {{state.savepoints.dir}} does not contain the entropy. In our setting, we enable entropy injection by setting {{s3.entropy.key}} to {{\__ENTROPY_KEY\__}} and use the entropy key in the checkpoint path (for e.g. {{s3://mybuket/checkpoints/__ENTROPY_KEY__/myapp}}). However, in the savepoint path, we don't use the entropy key (for e.g. {{s3://mybuket/savepoints/myapp}}) because we want the savepoint to be relocatable. But the current logic still generates non-relocatable savepoint path just because the entropy injection key is non-null. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31841) Redundant local variables in AllWindowedStream#reduce
Wencong Liu created FLINK-31841: --- Summary: Redundant local variables in AllWindowedStream#reduce Key: FLINK-31841 URL: https://issues.apache.org/jira/browse/FLINK-31841 Project: Flink Issue Type: Improvement Components: API / DataStream Affects Versions: 1.18.0 Reporter: Wencong Liu Fix For: 1.18.0 Currently, there is two redundant local variables in AllWindowedStream#reduce. {code:java} public SingleOutputStreamOperator reduce(ReduceFunction function) { if (function instanceof RichFunction) { throw new UnsupportedOperationException( "ReduceFunction of reduce can not be a RichFunction. " + "Please use reduce(ReduceFunction, WindowFunction) instead."); } // clean the closure function = input.getExecutionEnvironment().clean(function); String callLocation = Utils.getCallLocationName(); String udfName = "AllWindowedStream." + callLocation; return reduce(function, new PassThroughAllWindowFunction()); } {code} `callLocation` and `udfName` are not used. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31463) When I use apache flink1.12.2 version, the following akka error often occurs.
Zhuang Liu created FLINK-31463: --- Summary: When I use apache flink1.12.2 version, the following akka error often occurs. Key: FLINK-31463 URL: https://issues.apache.org/jira/browse/FLINK-31463 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.15.4 Reporter: Zhuang Liu When I use apache flink1.12.2 version, the following akka error often occurs. java.util.concurrent.TimeoutException: Remote system has been silent for too long. (more than 48.0 hours) at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) I checked that 48 hours ago, there was indeed a process hang inside flink, and the flink job was restarted.How to deal with this? Is this a bug in akka or flink? Thank you ! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31357) A record is deleted before being inserted, it will be deleted
Fangliang Liu created FLINK-31357: - Summary: A record is deleted before being inserted, it will be deleted Key: FLINK-31357 URL: https://issues.apache.org/jira/browse/FLINK-31357 Project: Flink Issue Type: Bug Components: Connectors / JDBC Affects Versions: 1.14.3 Reporter: Fangliang Liu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31124) Add it case for HiveTableSink speculative execution
Biao Liu created FLINK-31124: Summary: Add it case for HiveTableSink speculative execution Key: FLINK-31124 URL: https://issues.apache.org/jira/browse/FLINK-31124 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Biao Liu The part of HiveTableSink has supported speculative execution in https://issues.apache.org/jira/browse/FLINK-30823. We would like to add some integration test cases for this feature. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31123) Add it case for FileSink speculative execution
Biao Liu created FLINK-31123: Summary: Add it case for FileSink speculative execution Key: FLINK-31123 URL: https://issues.apache.org/jira/browse/FLINK-31123 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Biao Liu The FileSink has supported speculative execution in https://issues.apache.org/jira/browse/FLINK-30823. We would like to add some integration test cases for this feature. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31110) Web UI shows "User Configuration" preserving lines and whitespaces
Mingliang Liu created FLINK-31110: - Summary: Web UI shows "User Configuration" preserving lines and whitespaces Key: FLINK-31110 URL: https://issues.apache.org/jira/browse/FLINK-31110 Project: Flink Issue Type: Improvement Components: Runtime / Web Frontend Affects Versions: 1.16.1 Reporter: Mingliang Liu Currently one can use \{{env.getConfig().setGlobalJobParameters(...)}} for setting user configurations. It will also show up in the Web UI > Running Jobs > Job Configuration > User Configuration section. This is nice so users can confirm the user configuration (key/value pair) gets populated. However, it prints the plain string which does not work well with values that contains whitespaces and line breaks. For example, we have some prettified JSON configuration and sometimes formatted SQL statements in those configurations, and it's showing in a compacted HTML format - not human readable. I propose we keep the whitespaces and lines for this "User Configuration" section in the Web UI. The implementation can be as simple as adding {{style="white-space: pre-wrap;"}} to the rows in that section. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30842) Add document for sink supports speculative execution
Biao Liu created FLINK-30842: Summary: Add document for sink supports speculative execution Key: FLINK-30842 URL: https://issues.apache.org/jira/browse/FLINK-30842 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Biao Liu Fix For: 1.17.0 Add document to describe how sink supports speculative execution. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30823) Enable speculative execution for some of the typical built-in sinks
Biao Liu created FLINK-30823: Summary: Enable speculative execution for some of the typical built-in sinks Key: FLINK-30823 URL: https://issues.apache.org/jira/browse/FLINK-30823 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Biao Liu Fix For: 1.17.0 After the Sink supports speculative execution, here we enable speculative execution for some built-in sinks. For each type of Sink (SinkV2, SinkFunction, OutputFormat) we picked some typical sinks that do not require any or only minor changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30799) Make SinkFunction support speculative execution for batch jobs
Biao Liu created FLINK-30799: Summary: Make SinkFunction support speculative execution for batch jobs Key: FLINK-30799 URL: https://issues.apache.org/jira/browse/FLINK-30799 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Biao Liu Fix For: 1.17.0 In this ticket, it would make SinkFunction based sink run with speculative execution for batch jobs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30798) Make OutputFormat support speculative execution for batch jobs
Biao Liu created FLINK-30798: Summary: Make OutputFormat support speculative execution for batch jobs Key: FLINK-30798 URL: https://issues.apache.org/jira/browse/FLINK-30798 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Biao Liu Fix For: 1.17.0 This issue would make OutputFormat based sink run with speculative execution for batch jobs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30755) Make SinkV2 support speculative execution for batch jobs
Biao Liu created FLINK-30755: Summary: Make SinkV2 support speculative execution for batch jobs Key: FLINK-30755 URL: https://issues.apache.org/jira/browse/FLINK-30755 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Biao Liu Fix For: 1.17.0 This is the first part of FLIP-281 implementation. In this ticket, we would introduce some base abstraction of supporting speculative sink and then make SinkV2 API work with it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30725) FLIP-281: Sink Supports Speculative Execution For Batch Job
Biao Liu created FLINK-30725: Summary: FLIP-281: Sink Supports Speculative Execution For Batch Job Key: FLINK-30725 URL: https://issues.apache.org/jira/browse/FLINK-30725 Project: Flink Issue Type: New Feature Components: Runtime / Coordination Reporter: Biao Liu Fix For: 1.17.0 This is the umbrella issue of FLIP-281. More details can be found in https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30483) Make Avro format support for TIMESTAMP_LTZ
Mingliang Liu created FLINK-30483: - Summary: Make Avro format support for TIMESTAMP_LTZ Key: FLINK-30483 URL: https://issues.apache.org/jira/browse/FLINK-30483 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.16.0 Reporter: Mingliang Liu Currently Avro format does not support TIMESTAMP_LTZ (short for TIMESTAMP_WITH_LOCAL_TIME_ZONE) type. Avro 1.10+ introduces local timestamp logic type (both milliseconds and microseconds), see [spec||https://avro.apache.org/docs/1.10.2/spec.html#Local+timestamp+%28millisecond+precision%29]. As TIMESTAMP currently only supports milliseconds, we can make TIMESTAMP_LTZ support milliseconds first. A related work is to support microseconds, and there is already work-in-progress Jira [[FLINK-23589]] for TIMESTAMP type. We can consolidate the effort or track that separately for TIMESTAMP_LTZ. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30479) Document flink-connector-files for local execution
Mingliang Liu created FLINK-30479: - Summary: Document flink-connector-files for local execution Key: FLINK-30479 URL: https://issues.apache.org/jira/browse/FLINK-30479 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.16.0 Reporter: Mingliang Liu The file system SQL connector itself is included in Flink and does not require an additional dependency. However, if a user uses the filesystem connector for [local execution](\{{< ref "docs/dev/dataset/local_execution" >}}), for e.g. running Flink job in the IDE, she will need to add dependency. Otherwise, the user will get validation exception: {{{}Cannot discover a connector using option: 'connector'='filesystem'{}}}. This is confusing and can be documented. The scope of the files connector dependency should be {{{}provided{}}}, because they should not be packaged into the JAR file. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30459) SQL Client supports "SET 'property'"
Mingliang Liu created FLINK-30459: - Summary: SQL Client supports "SET 'property'" Key: FLINK-30459 URL: https://issues.apache.org/jira/browse/FLINK-30459 Project: Flink Issue Type: Improvement Components: Table SQL / Client Affects Versions: 1.16.0 Reporter: Mingliang Liu Currently the SET command in SQL client has two syntax: - {{SET;}} for listing all config properties - {{SET 'key'='value'}} for setting a property with (new) value As in Spark SQL and Flink SQL with Hive dialect, it would be convenient to show one config property using the syntax {{SET 'key';}}. Without this, users will need to find the very one config from all config properties. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30179) Stop stringifying exception for logging
Mingliang Liu created FLINK-30179: - Summary: Stop stringifying exception for logging Key: FLINK-30179 URL: https://issues.apache.org/jira/browse/FLINK-30179 Project: Flink Issue Type: Improvement Components: Runtime / Task Affects Versions: 1.16.0 Reporter: Mingliang Liu In {{org.apache.flink.runtime.taskmanager.Task}} there are multiple places where we stringify an exception before logging. According to Slf4j logging, we can just leave the last argument as exception (or throwable) without fitting them into log message string with placeholders. So essentially: {code} LOG.debug( "{} ({}) switched from {} to {} due to CancelTaskException: {}", taskNameWithSubtask, executionId, currentState, newState, ExceptionUtils.stringifyException(cause)); {code} should be {code} LOG.debug( "{} ({}) switched from {} to {} due to CancelTaskException:", taskNameWithSubtask, executionId, currentState, newState, cause); {code} Interestingly there was a [hot fix|https://github.com/apache/flink/commit/7eac5c62a10158ef210906deb161ac791f18d3ae] that deliberately changed the code from the latter to the former. This JIRA can track the discussion why that was needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30081) Local executor can not accept different jvm-overhead.min/max values
Mingliang Liu created FLINK-30081: - Summary: Local executor can not accept different jvm-overhead.min/max values Key: FLINK-30081 URL: https://issues.apache.org/jira/browse/FLINK-30081 Project: Flink Issue Type: Improvement Components: Runtime / Configuration Affects Versions: 1.16.0 Reporter: Mingliang Liu In local executor, it's not possible to set different values for {{taskmanager.memory.jvm-overhead.max}} and {{{}taskmanager.memory.jvm-overhead.min{}}}. The same problem for {{taskmanager.memory.network.max}} and {{taskmanager.memory.network.min}}. Sample code to reproduce: {code:java} Configuration conf = new Configuration(); conf.setString(TaskManagerOptions.JVM_OVERHEAD_MAX.key(), "1GB"); conf.setString(TaskManagerOptions.JVM_OVERHEAD_MIN.key(), "2GB"); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(conf); env.fromElements("Hello", "World") .executeAndCollect() .forEachRemaining(System.out::println); {code} The failing exception is something like: {code} Exception in thread "main" java.lang.IllegalArgumentException at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122) at org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.calculateTotalProcessMemoryFromComponents(TaskExecutorResourceUtils.java:182) at org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration.create(TaskExecutorMemoryConfiguration.java:119) {code} I think the problem was that we expect the max and min to equal, but local executor did not reset them correctly? {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30079) Stop using deprecated TM options in doc
Mingliang Liu created FLINK-30079: - Summary: Stop using deprecated TM options in doc Key: FLINK-30079 URL: https://issues.apache.org/jira/browse/FLINK-30079 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Mingliang Liu The option {{ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY}} was deprecated and configuring it should have no effect now. However, in the [documentation|https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/dataset/local_execution/#local-environment] we still reference it and show in example code. This can be replaced with {{TaskManagerOptions.MANAGED_MEMORY_FRACTION}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29961) Make referencing custom image clearer for Docker
Mingliang Liu created FLINK-29961: - Summary: Make referencing custom image clearer for Docker Key: FLINK-29961 URL: https://issues.apache.org/jira/browse/FLINK-29961 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Mingliang Liu Make referencing custom image clearer for Docker. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29940) ExecutionGraph logs job state change at ERROR level when job fails
Mingliang Liu created FLINK-29940: - Summary: ExecutionGraph logs job state change at ERROR level when job fails Key: FLINK-29940 URL: https://issues.apache.org/jira/browse/FLINK-29940 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.16.0 Reporter: Mingliang Liu When job switched to FAILED state, the log is very useful to understand why it failed along with the root cause exception stack. However, the current log level is INFO - a bit inconvenient for users to search from logging with so many surrounding log lines. We can log at ERROR level when the job switched to FAILED state. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29920) Minor reformat Kafka connector documentation
Mingliang Liu created FLINK-29920: - Summary: Minor reformat Kafka connector documentation Key: FLINK-29920 URL: https://issues.apache.org/jira/browse/FLINK-29920 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.16.0 Reporter: Mingliang Liu Attachments: Screenshot 2022-11-07 at 2.55.00 PM.png, Screenshot 2022-11-07 at 2.55.08 PM.png We used some HTML tag in the documentation which does not interpret Markdown format nicely. This fixes this by replacing with Markdown tags. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29750) Improve PostgresCatalog#listTables() by reusing resources
Mingliang Liu created FLINK-29750: - Summary: Improve PostgresCatalog#listTables() by reusing resources Key: FLINK-29750 URL: https://issues.apache.org/jira/browse/FLINK-29750 Project: Flink Issue Type: Bug Components: Connectors / JDBC, Table SQL / Ecosystem Reporter: Mingliang Liu Currently the {{PostgresCatalog#listTables()}} creates a new connection and prepared statement for every schema and table when listing tables. This can be optimized by reusing those resources. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29640) Enhance the function configured by execution.shutdown-on-attached-exit by heartbeat between client and dispatcher
Liu created FLINK-29640: --- Summary: Enhance the function configured by execution.shutdown-on-attached-exit by heartbeat between client and dispatcher Key: FLINK-29640 URL: https://issues.apache.org/jira/browse/FLINK-29640 Project: Flink Issue Type: Improvement Reporter: Liu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29639) Add ResourceId in TransportException for debugging
Liu created FLINK-29639: --- Summary: Add ResourceId in TransportException for debugging Key: FLINK-29639 URL: https://issues.apache.org/jira/browse/FLINK-29639 Project: Flink Issue Type: Improvement Reporter: Liu When the taskmanager is lost, only the host and port are shown in the exception. It is hard to find the exactly taskmanger by resourceId. Add ResourceId info will help a lot in debugging the job. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29164) There is a problem when compiling Flink 1.15.rc-2 in Windows operations
mindest liu created FLINK-29164: --- Summary: There is a problem when compiling Flink 1.15.rc-2 in Windows operations Key: FLINK-29164 URL: https://issues.apache.org/jira/browse/FLINK-29164 Project: Flink Issue Type: Bug Components: flink-contrib Affects Versions: 1.15.2 Reporter: mindest liu Compile Flink 1.15.2 on Windows platform and it appears Appears when compiling to Flink clients [ERROR] failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile (default-testCompile) on project flink-avro-confluent-registry:compilation failure [/ D: / xxx / XX / cachedschemacoderprovidertest. Java] unable to access org.apache.kafka.common Configurable The class file of org.apache.kafka.common.configurable cannot be found The compiled JDK version is 11 Maven version is 3.2.5 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29163) There is a problem when compiling Flink 1.15.rc-2 and flink-1.14.3-rc1 in Windows operations
mindest liu created FLINK-29163: --- Summary: There is a problem when compiling Flink 1.15.rc-2 and flink-1.14.3-rc1 in Windows operations Key: FLINK-29163 URL: https://issues.apache.org/jira/browse/FLINK-29163 Project: Flink Issue Type: Bug Components: flink-contrib Affects Versions: 1.15.2 Reporter: mindest liu Compile Flink 1.15.2 on Windows platform and it appears [ERROR]Failed to execute goal org.apache.maven.plugins:maven-assembly-plugin:2.4:single (create-test-dependency-user-jar-depend) on project flink-clients_2.11:failed to create assembly:Error creating assembly archive test-user-classloader-job-lib-jar:You must set as least one file.->[help 1] Error creating assembly archive Pack: you must set at least one file. Occurs when compiling to Flink clients The compiled JDK version is 11 Maven version is 3.2.5 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29160) 在windows 编译flink 1.15.rc-2时出现问题
mindest liu created FLINK-29160: --- Summary: 在windows 编译flink 1.15.rc-2时出现问题 Key: FLINK-29160 URL: https://issues.apache.org/jira/browse/FLINK-29160 Project: Flink Issue Type: Bug Components: flink-contrib Affects Versions: 1.15.2 Environment: 操作系统:windows jdk:11. maven:3.2.5 Reporter: mindest liu 在windows 平台上编译flink 1.15.2出现 编译到flink-clients 时出现 Error creating assembly archive pack: You must set at least one file.问题 其中 编译的jdk 版本是11. maven版本是3.2.5 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29097) Moving json se/deserializers from sql-gateway-api to sql-gateway
Wencong Liu created FLINK-29097: --- Summary: Moving json se/deserializers from sql-gateway-api to sql-gateway Key: FLINK-29097 URL: https://issues.apache.org/jira/browse/FLINK-29097 Project: Flink Issue Type: Technical Debt Components: Table SQL / Gateway Affects Versions: 1.16.0 Reporter: Wencong Liu Considering that the current json se/deserialization rules for results returned by SqlGateway are only used in Rest Endpoint, we migrated the serialization related tools from the flink-sql-gateway-api to the flink-sql-gateway package. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28974) Add doc for the API and Option of sql gateway rest endpoint
Wencong Liu created FLINK-28974: --- Summary: Add doc for the API and Option of sql gateway rest endpoint Key: FLINK-28974 URL: https://issues.apache.org/jira/browse/FLINK-28974 Project: Flink Issue Type: Technical Debt Components: Table SQL / Gateway Affects Versions: 1.16.0 Reporter: Wencong Liu Add document for the API and Option of sql gateway rest endpoint. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28963) Add API compatibility test for Sql Gateway Rest Endpoint
Wencong Liu created FLINK-28963: --- Summary: Add API compatibility test for Sql Gateway Rest Endpoint Key: FLINK-28963 URL: https://issues.apache.org/jira/browse/FLINK-28963 Project: Flink Issue Type: Technical Debt Components: Table SQL / Gateway Affects Versions: 1.16.0 Reporter: Wencong Liu Under the package {_}flink-runtime-web{_}, RestAPIStabilityTest performs compatibility checks on Rest API based on a series of CompatibilityRoutines. For Sql Gateway, its Rest Endpoint also needs to reuse the same rules to verify API compatibility, so as to ensure that all modifications to the Sql Gateway Rest API are compatible. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28924) Translate "LIMIT clause" page into Chinese
Zhuang Liu created FLINK-28924: --- Summary: Translate "LIMIT clause" page into Chinese Key: FLINK-28924 URL: https://issues.apache.org/jira/browse/FLINK-28924 Project: Flink Issue Type: Improvement Components: chinese-translation Affects Versions: 1.15.1 Reporter: Zhuang Liu Fix For: 1.16.0 Translate "LIMIT clause" page into Chinese: [https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/limit/.|https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/limit/] The markdown file is located in "docs/content.zh/docs/dev/table/sql/queries/limit.md". -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28922) Translate "ORDER BY clause" page into Chinese
Zhuang Liu created FLINK-28922: --- Summary: Translate "ORDER BY clause" page into Chinese Key: FLINK-28922 URL: https://issues.apache.org/jira/browse/FLINK-28922 Project: Flink Issue Type: Improvement Components: chinese-translation Affects Versions: 1.15.1 Reporter: Zhuang Liu Fix For: 1.16.0 Translate "ORDER BY clause" page into Chinese. The markdown file is located in "docs/content.zh/docs/dev/table/sql/queries/orderby.md". -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28897) Fail to use udf in added jar when enabling checkpoint
Liu created FLINK-28897: --- Summary: Fail to use udf in added jar when enabling checkpoint Key: FLINK-28897 URL: https://issues.apache.org/jira/browse/FLINK-28897 Project: Flink Issue Type: Bug Affects Versions: 1.16.0 Reporter: Liu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28883) Report hive tasks' file number, total number and total size to metastore
Liu created FLINK-28883: --- Summary: Report hive tasks' file number, total number and total size to metastore Key: FLINK-28883 URL: https://issues.apache.org/jira/browse/FLINK-28883 Project: Flink Issue Type: Bug Components: Connectors / Hive Affects Versions: 1.16.0 Reporter: Liu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28796) Add Statement Completement API for sql gateway rest endpoint
Wencong Liu created FLINK-28796: --- Summary: Add Statement Completement API for sql gateway rest endpoint Key: FLINK-28796 URL: https://issues.apache.org/jira/browse/FLINK-28796 Project: Flink Issue Type: Improvement Components: Table SQL / Gateway Reporter: Wencong Liu SQL Gateway supports various clients: sql client, rest, hiveserver2, etc. Given the 1.16 feature freeze date, we won't be able to finish all the endpoints. Thus, we'd exclude one of the rest apis (tracked by this ticket) from [#FLINK-28163], which is only needed by the sql client, and still try to complete the remaining of them. In other words, we'd expect the sql gateway to support rest & hiveserver2 apis in 1.16, and sql client in 1.17. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28777) Add configure session API for sql gateway rest endpoint
Wencong Liu created FLINK-28777: --- Summary: Add configure session API for sql gateway rest endpoint Key: FLINK-28777 URL: https://issues.apache.org/jira/browse/FLINK-28777 Project: Flink Issue Type: Improvement Components: Table SQL / Gateway Reporter: Wencong Liu In the development of version 1.16, we will temporarily skip the development of configure session api in sql gateway rest endpoint. The compatibility between sql client and sql gateway is temporarily ignored, so the relevant development work will be carried out in the development work of version 1.17. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28548) The commit partition base path is not created when no data is sent which may cause FileNotFoundException
Liu created FLINK-28548: --- Summary: The commit partition base path is not created when no data is sent which may cause FileNotFoundException Key: FLINK-28548 URL: https://issues.apache.org/jira/browse/FLINK-28548 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Affects Versions: 1.15.1, 1.14.5, 1.16.0 Reporter: Liu The commit partition base path is not created when no data is sent which may cause FileNotFoundException. The exception is as following: {code:java} Caused by: java.io.FileNotFoundException: File /home/ljg/test_sql.db/flink_batch_test/.staging_1657697612169 does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:771) ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:120) ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:828) ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:824) ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?] at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) ~[hadoop-common-2.6.0U203-cdh5.4.4.jar:?] at org.apache.hadoop.hdfs.perflog.FileSystemLinkResolverWithStatistics$1.doCall(FileSystemLinkResolverWithStatistics.java:37) ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?] at org.apache.hadoop.hdfs.perflog.PerfProxy.call(PerfProxy.java:49) ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?] at org.apache.hadoop.hdfs.perflog.FileSystemLinkResolverWithStatistics.resolve(FileSystemLinkResolverWithStatistics.java:39) ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:835) ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?] at org.apache.hadoop.fs.FilterFileSystem.listStatus(FilterFileSystem.java:238) ~[hadoop-common-2.6.0U203-cdh5.4.4.jar:?] at org.apache.hadoop.fs.FilterFileSystem.listStatus(FilterFileSystem.java:238) ~[hadoop-common-2.6.0U203-cdh5.4.4.jar:?] at org.apache.hadoop.fs.viewfs.ChRootedFileSystem.listStatus(ChRootedFileSystem.java:241) ~[hadoop-common-2.6.0U203-cdh5.4.4.jar:?] at org.apache.hadoop.fs.viewfs.ViewFileSystem.listStatus(ViewFileSystem.java:376) ~[hadoop-common-2.6.0U203-cdh5.4.4.jar:?] at org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[flink-sql-connector-hive-2.3.6_2.12-1.15.0.jar:1.15.0] at org.apache.flink.connector.file.table.PartitionTempFileManager.listTaskTemporaryPaths(PartitionTempFileManager.java:87) ~[flink-connector-files-1.15.0.jar:1.15.0] at org.apache.flink.connector.file.table.FileSystemCommitter.commitPartitions(FileSystemCommitter.java:78) ~[flink-connector-files-1.15.0.jar:1.15.0] at org.apache.flink.connector.file.table.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:89) ~[flink-connector-files-1.15.0.jar:1.15.0] at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:153) ~[flink-dist-1.15.0.jar:1.15.0] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.jobFinished(DefaultExecutionGraph.java:1190) ~[flink-dist-1.15.0.jar:1.15.0] ... 43 more {code} We should check whether the base path exists before listStatus for the path. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28208) The method createBatchSink in class HiveTableSink should setParallelism for map operator
Liu created FLINK-28208: --- Summary: The method createBatchSink in class HiveTableSink should setParallelism for map operator Key: FLINK-28208 URL: https://issues.apache.org/jira/browse/FLINK-28208 Project: Flink Issue Type: Improvement Components: Connectors / Hive Affects Versions: 1.16.0 Reporter: Liu The problem is found when using Adaptive Batch Scheduler. In these, a simple SQL like "select * from * where *" would generate three operators including source, map and sink. The map's parallelism is set to -1 by default and is not the same with source and sink. As a result, the three operators can not be chained together. The reason is that we add map operator in method createBatchSink but not setParallelism. The changed code is as following: {code:java} private DataStreamSink createBatchSink( DataStream dataStream, DataStructureConverter converter, StorageDescriptor sd, HiveWriterFactory recordWriterFactory, OutputFileConfig fileNaming, final int parallelism) throws IOException { ... return dataStream .map((MapFunction) value -> (Row) converter.toExternal(value)) .setParallelism(parallelism) // New added to ensure the right parallelism .writeUsingOutputFormat(builder.build()) .setParallelism(parallelism); } {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28156) Flink KafkaSource with Bounded Throw Exception
Jiangfei Liu created FLINK-28156: Summary: Flink KafkaSource with Bounded Throw Exception Key: FLINK-28156 URL: https://issues.apache.org/jira/browse/FLINK-28156 Project: Flink Issue Type: Bug Components: API / DataStream, Connectors / Kafka Affects Versions: 1.14.3 Reporter: Jiangfei Liu I want to use KafkaSource consume topic between commited offset and last-offset, but throw a exception KafkaSource.builder() .setBootstrapServers("10.18.34.43:9092,10.18.34.44:9092,10.18.34.45:9092") .setTopics(topic) .setGroupId(groupId) // .setStartingOffsets(OffsetsInitializer.timestamp(165571776L)) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) .setValueOnlyDeserializer(new SimpleStringSchema()) .setBounded(OffsetsInitializer.latest()) .build(); Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more Caused by: java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1223) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:99) at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138) ... 6 more -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28148) Unable to load jar connector to a Python Table API app
Zichen Liu created FLINK-28148: -- Summary: Unable to load jar connector to a Python Table API app Key: FLINK-28148 URL: https://issues.apache.org/jira/browse/FLINK-28148 Project: Flink Issue Type: Bug Components: API / Python, Connectors / Common, Table SQL / API Affects Versions: 1.16.0 Reporter: Zichen Liu Reproduction steps: # Clone the latest Flink from the master branch. # Follow the Flink [recommended steps](https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/) to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, Python 3.6-3.9, actual: Maven 3.2.5, Python 3.7. # Create a new Python Table API app that loads in a jar, similar to: {code:java} from pyflink.table import TableEnvironment, StreamTableEnvironment, EnvironmentSettings env_settings = EnvironmentSettings.in_streaming_mode() t_env = StreamTableEnvironment.create(environment_settings=env_settings) t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") {code} The jar loaded here can be any jar, and the following message will appear: {code:java} Traceback (most recent call last): File "pyflink_table_api_firehose.py", line 48, in log_processing() File "pyflink_table_api_firehose.py", line 14, in log_processing t_env.get_config().set("pipeline.classpaths", "file:///home/YOUR_USER/pyflink-table-api/flink/flink-connectors/flink-sql-connector-aws-kinesis-firehose/target/flink-sql-connector-aws-kinesis-firehose-1.16-SNAPSHOT.jar") File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/table/table_config.py", line 109, in set add_jars_to_context_class_loader(value.split(";")) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/java_utils.py", line 169, in add_jars_to_context_class_loader addURL.invoke(loader, to_jarray(get_gateway().jvm.Object, [url])) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1322, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco return f(*a, **kw) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o45.invoke. : java.lang.IllegalArgumentException: object is not an instance of declaring class at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) {code} Reproduced on Mac and Amazon Linux 2. Next do: {code:java} pip uninstall apache-flink pip install apache-flink{code} To downgrade it to 1.15 release. The loading of the jar should be successful. Even if you try to load the same connector built from master (reproduced with Kafka, Kinesis Firehose). -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28123) When a flink job is restarted, the metaspce size of the taskmanager does not decrease but keeps increasing. After several restarts, the flink job metaspce oom.
Zhuang Liu created FLINK-28123: --- Summary: When a flink job is restarted, the metaspce size of the taskmanager does not decrease but keeps increasing. After several restarts, the flink job metaspce oom. Key: FLINK-28123 URL: https://issues.apache.org/jira/browse/FLINK-28123 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.12.0 Reporter: Zhuang Liu Fix For: 1.16.0 When I use the flink standalone deployment mode, when the flink job restarts, the metaspce size of the taskmanager does not decrease but keeps increasing. After restarting several times, the flink job metaspce oom. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28093) Flink SQL Kafka Source Can Not Support Recovery On Checkpoint
Jiangfei Liu created FLINK-28093: Summary: Flink SQL Kafka Source Can Not Support Recovery On Checkpoint Key: FLINK-28093 URL: https://issues.apache.org/jira/browse/FLINK-28093 Project: Flink Issue Type: Bug Components: Connectors / Kafka, Table SQL / API Affects Versions: 1.14.4, 1.14.3, 1.13.5 Reporter: Jiangfei Liu Attachments: s.png 1.Flink SQL Kafka Source Consumer Topic Msg,And Do Checkpoint 2.Stop Flink Task 3.Write Data to Topic 4.Recovery On Checkpoint 5.Flink Job Can Not Recovery On Checkpoint And Dont Continued consumption On Checkpoint State -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28062) Flink SQL Upsert-Kafka can not support Flink1.14.x With Sink Buffer
Jiangfei Liu created FLINK-28062: Summary: Flink SQL Upsert-Kafka can not support Flink1.14.x With Sink Buffer Key: FLINK-28062 URL: https://issues.apache.org/jira/browse/FLINK-28062 Project: Flink Issue Type: Bug Components: Connectors / Kafka, Table SQL / API Affects Versions: 1.14.4, 1.14.3, 1.14.2, 1.14.0 Reporter: Jiangfei Liu Attachments: 1.13-upsert-kafka.png, 1.14-upsert-kafka.png In Flink1.14.x,Table API,Upsert-Kafka Sink can not support with sink buffer In Flink1.13.x,can support I look Flink1.13.x、Flink1.14.x source code, In Flink1.13.x,Upsert-Kafka use the class org.apache.flink.streaming.connectors.kafka.table.BufferedUpsertSinkFunction In Flink1.14.x,Upert-Kafka use the class org.apache.flink.streaming.connectors.kafka.table.ReducingUpsertWriter I find some diffrent with two class,please look pictures -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28037) Flink SQL Upsert-Kafka can not support Flink1.14.x
Jiangfei Liu created FLINK-28037: Summary: Flink SQL Upsert-Kafka can not support Flink1.14.x Key: FLINK-28037 URL: https://issues.apache.org/jira/browse/FLINK-28037 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.14.4, 1.14.3, 1.14.2, 1.14.0 Environment: Flink Version: 1.14.0 1.14.2 1.14.3 1.14.4 Reporter: Jiangfei Liu Attachments: kafka-sql.png, kafka-sql2.png in Flink 1.14.x,flink sql upsert-kafka sink can not write data into kafka topic with sink buffer flush config,eg h5. sink.buffer-flush.max-rows h5. sink.buffer-flush.interval in Flink1.13.x,flink sql upsert-kafka sink can write data into kafka topic with sink buffer lush config -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28027) Initialise Async Sink maximum number of in flight messages to low number for rate limiting strategy
Zichen Liu created FLINK-28027: -- Summary: Initialise Async Sink maximum number of in flight messages to low number for rate limiting strategy Key: FLINK-28027 URL: https://issues.apache.org/jira/browse/FLINK-28027 Project: Flink Issue Type: Bug Components: Connectors / Common, Connectors / Kinesis Affects Versions: 1.15.0 Reporter: Zichen Liu Fix For: 1.16.0 *Background* In the AsyncSinkWriter, we implement a rate limiting strategy. The initial value for the maximum number of in flight messages is set extremely high ({{{}maxBatchSize * maxInFlightRequests{}}}). However, in accordance with the AIMD strategy, the TCP implementation for congestion control has found a small value to start with [is better]([https://en.wikipedia.org/wiki/TCP_congestion_control#Slow_start]). *Suggestion* A better default might be: * maxBatchSize * maxBatchSize / parallelism -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28007) Tests for AWS Connectors Using SDK v2 to use Synchronous Clients
Zichen Liu created FLINK-28007: -- Summary: Tests for AWS Connectors Using SDK v2 to use Synchronous Clients Key: FLINK-28007 URL: https://issues.apache.org/jira/browse/FLINK-28007 Project: Flink Issue Type: Bug Components: Connectors / Kinesis Reporter: Zichen Liu Assignee: Zichen Liu Fix For: 1.16.0 h3. Background AWS SDK v2 async clients use a Netty async client for Kinesis Data Streams/Firehose sink and Kinesis Data Streams EFO consumer. The SDK creates a shared thread pool for Netty to use for network operations when one is not configured. The thread pool is managed by a shared ELG (event loop group), and this is stored in a static field. We do not configure this for the AWS connectors in the Flink codebase. When threads are spawned within the ELG, they inherit the context classloader from the current thread. If the ELG is created from a shared classloader, for instance Flink parent classloader, or MiniCluster parent classloader, multiple Flink jobs can share the same ELG. When an ELG thread is spawned from a Flink job, it will inherit the Flink user classloader. When this job completes/fails, the classloader is destroyed, however the Netty thread is still referencing it, and this leads to below exception. h3. Impact This issue *does not* impact jobs deployed to TM when AWS SDK v2 is loaded via the Flink User Classloader. It is expected this is the standard deployment configuration. This issue is known to impact: - Flink mini cluster, for example in integration tests (FLINK-26064) - Flink cluster loading AWS SDK v2 via parent classloader h3. Suggested solution There are a few possible solutions, as discussed https://github.com/apache/flink/pull/18733 1. Create a separate ELG per Flink job 2. Create a separate ELG per subtask 3. Attach the correct classloader to ELG spawned threads h3. Error Stack (shortened stack trace, as full is too large) {noformat} Feb 09 20:05:04 java.util.concurrent.ExecutionException: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'. Feb 09 20:05:04 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) Feb 09 20:05:04 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) (...) Feb 09 20:05:04 Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'. Feb 09 20:05:04 at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98) Feb 09 20:05:04 at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43) Feb 09 20:05:04 at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:204) Feb 09 20:05:04 at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:200) Feb 09 20:05:04 at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:179) Feb 09 20:05:04 at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159) (...) Feb 09 20:05:04 Caused by: java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'. Feb 09 20:05:04 at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164) Feb 09 20:05:04 at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResources(FlinkUserCodeClassLoaders.java:188) Feb 09 20:05:04 at java.util.ServiceLoader$LazyIterator.hasNextService(ServiceLoader.java:348) Feb 09 20:05:04 at
[jira] [Created] (FLINK-27828) FlinkKafkaProducer VS KafkaSink
Jiangfei Liu created FLINK-27828: Summary: FlinkKafkaProducer VS KafkaSink Key: FLINK-27828 URL: https://issues.apache.org/jira/browse/FLINK-27828 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.14.3 Reporter: Jiangfei Liu sorry,my english is bad. in flink1.14.3,write 1 data to kafka. when use FlinkKafkaProducer,completed 7s when use KafkaSink,completed 1m40s why KafkaSink is low speed? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27805) Bump ORC version to 1.7.2
jia liu created FLINK-27805: --- Summary: Bump ORC version to 1.7.2 Key: FLINK-27805 URL: https://issues.apache.org/jira/browse/FLINK-27805 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Reporter: jia liu The current ORC dependency version of flink is 1.5.6, but the latest ORC version 1.7.x has been released for a long time. In order to use these new features (zstd compression, column encryption etc.), we should upgrade the orc version. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27775) FlinkKafkaProducer VS KafkaSink
Jiangfei Liu created FLINK-27775: Summary: FlinkKafkaProducer VS KafkaSink Key: FLINK-27775 URL: https://issues.apache.org/jira/browse/FLINK-27775 Project: Flink Issue Type: Improvement Components: API / DataStream Affects Versions: 1.14.3 Reporter: Jiangfei Liu Attachments: Snipaste_2022-05-25_19-52-11.png sorry,my english is bad in Flink1.14.3,write 1 data to kafka when use FlinkKafkaProducer,comleted 7s when use KafkaSink,comleted 1m40s -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27670) Python wrappers for Kinesis Sinks
Zichen Liu created FLINK-27670: -- Summary: Python wrappers for Kinesis Sinks Key: FLINK-27670 URL: https://issues.apache.org/jira/browse/FLINK-27670 Project: Flink Issue Type: Improvement Components: Connectors / Kinesis Reporter: Zichen Liu Fix For: 1.15.1 Create Python Wrappers for the new Kinesis Streams sink and the Kinesis Firehose sink. An example implementation may be found here [https://github.com/apache/flink/pull/15491/files] for the old Kinesis sink. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27579) The param client.timeout can not be set by dynamic properties when stopping the job
Liu created FLINK-27579: --- Summary: The param client.timeout can not be set by dynamic properties when stopping the job Key: FLINK-27579 URL: https://issues.apache.org/jira/browse/FLINK-27579 Project: Flink Issue Type: Improvement Components: Client / Job Submission Affects Versions: 1.16.0 Reporter: Liu The default client.timeout value is one minute which may be too short when stop-with-savepoint for big state jobs. When we stop the job by dynamic properties(-D or -yD for yarn), the client.timeout is not effective. >From the code, we can see that the dynamic properties are only effect for run >command. We should support it for stop command. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27537) Remove requirement for Async Sink's RequestEntryT to be serializable
Zichen Liu created FLINK-27537: -- Summary: Remove requirement for Async Sink's RequestEntryT to be serializable Key: FLINK-27537 URL: https://issues.apache.org/jira/browse/FLINK-27537 Project: Flink Issue Type: Improvement Components: Connectors / Common Affects Versions: 1.15.0 Reporter: Zichen Liu Currently, in `AsyncSinkBase` and it's dependent classes, e.g. the sink writer, element converter etc., the `RequestEntryT` generic type is required to be `serializable`. However, this requirement no longer holds and there is nothing that actually requires this. Proposed approach: * Remove the `extends serializable` from the generic type `RequestEntryT` -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27536) Rename method parameter in AsyncSinkWriter
Zichen Liu created FLINK-27536: -- Summary: Rename method parameter in AsyncSinkWriter Key: FLINK-27536 URL: https://issues.apache.org/jira/browse/FLINK-27536 Project: Flink Issue Type: Improvement Components: Connectors / Common Affects Versions: 1.15.0 Reporter: Zichen Liu Change the abstract method's parameter naming in AsyncSinkWriter. From Consumer> requestResult to Consumer> requestToRetry or something similar. This is because the consumer here is supposed to accept a list of requests that need to be retried. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27516) The config execution.attached doesn't take effect because it is override by Cli param
Liu created FLINK-27516: --- Summary: The config execution.attached doesn't take effect because it is override by Cli param Key: FLINK-27516 URL: https://issues.apache.org/jira/browse/FLINK-27516 Project: Flink Issue Type: Improvement Components: Client / Job Submission Affects Versions: 1.16.0 Reporter: Liu The config execution.attached's default value is false. But no matter what value we set, it take no effect. After digging in, we find that it is only affected by Cli param as following: # If we don't specify -d or -yd, the member detachedMode in ProgramOptions is set to false. # In method applyToConfiguration, the execution.attached is set true. # No matter what value is set to execution.attached, it take no effect. If -d or -yd is not set, we should use the config execution.attached. Since the actual attach mode is using for a long time, we may need to change execution.attached's default value to true after the modification. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27505) Add javadoc comments to AsyncSinkBase
Zichen Liu created FLINK-27505: -- Summary: Add javadoc comments to AsyncSinkBase Key: FLINK-27505 URL: https://issues.apache.org/jira/browse/FLINK-27505 Project: Flink Issue Type: Improvement Components: Connectors / Common Affects Versions: 1.15.0 Reporter: Zichen Liu Currently the javadocs describing each of the parameters on the constructor for AsyncSinkBase exist in AsyncSinkBaseBuilder. Since we are not enforcing the use of the builder, it makes more sense to have these descriptions in the AsyncSinkBase. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27278) Fix wrong indentation in doc dev/datastream/operators/windows.md
Mingliang Liu created FLINK-27278: - Summary: Fix wrong indentation in doc dev/datastream/operators/windows.md Key: FLINK-27278 URL: https://issues.apache.org/jira/browse/FLINK-27278 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.15.0 Reporter: Mingliang Liu Attachments: after.png, before.png Due to special char the current indentation of {{ProcessWindowFunction}} in doc {{dev/datastream/operators/windows.md}} is wrong. It's not very visible in the source code. However, if you visit the formatted doc (e.g.), you will see the wrong indentation. It can be fixed by using spaces in the example code. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-27275) Support null value not update in flink-connector-jdbc
Fangliang Liu created FLINK-27275: - Summary: Support null value not update in flink-connector-jdbc Key: FLINK-27275 URL: https://issues.apache.org/jira/browse/FLINK-27275 Project: Flink Issue Type: New Feature Components: Connectors / JDBC Affects Versions: 1.14.3 Reporter: Fangliang Liu The follow DDL {code:java} CREATE TABLE IF NOT EXISTS `db`.`tablea` ( `user_id` bigint, `A` string, `B` string, `C` string, `flag` varchar(256), PRIMARY KEY (`user_id`) NOT ENFORCED )WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://xx.xx.xx.xx:xxx/test', 'table-name' = 'user', 'username'='root', 'password'='root', 'sink.buffer-flush.interval'='1s', 'sink.buffer-flush.max-rows'='50', 'sink.parallelism'='2' ); {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-27205) Translate "Concepts -> Glossary" page into Chinese
Zhuang Liu created FLINK-27205: --- Summary: Translate "Concepts -> Glossary" page into Chinese Key: FLINK-27205 URL: https://issues.apache.org/jira/browse/FLINK-27205 Project: Flink Issue Type: Bug Components: chinese-translation, Documentation Affects Versions: 1.14.4 Reporter: Zhuang Liu Fix For: 1.15.0 Translate Glossary page into Chinese: https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/concepts/glossary/. The markdown file is located in docs/concepts/glossary.md. In https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/concepts/glossary/, most of them have been translated, but a small part has not been translated into Chinese. Details See FLINK-13037 for information. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26909) Support AdaptiveBatch when parallelism is -1 from Cli
Liu created FLINK-26909: --- Summary: Support AdaptiveBatch when parallelism is -1 from Cli Key: FLINK-26909 URL: https://issues.apache.org/jira/browse/FLINK-26909 Project: Flink Issue Type: Improvement Reporter: Liu When we start the job by command with args "-p $parallelism", the error is thrown with "The parallelism must be a positive number: -1". Since we can use AdaptiveBatch with config parallelism.default: -1, we should support it from Cli. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26906) Spelling error for AdaptiveBatch in elastic_scaling.md
Liu created FLINK-26906: --- Summary: Spelling error for AdaptiveBatch in elastic_scaling.md Key: FLINK-26906 URL: https://issues.apache.org/jira/browse/FLINK-26906 Project: Flink Issue Type: Improvement Reporter: Liu In setence "- 配置 `jobmanager.scheduler: AdpaptiveBatch`", the word AdaptiveBatch is spelling wrong which may confuse the users. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26868) In JobLeaderIdService.java#L36 the annotation may be " Start the service with the given job leader id actions" instead of "Start the service with the given job lead
Zhuang Liu created FLINK-26868: --- Summary: In JobLeaderIdService.java#L36 the annotation may be " Start the service with the given job leader id actions" instead of "Start the service with the given job leader actions." Key: FLINK-26868 URL: https://issues.apache.org/jira/browse/FLINK-26868 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.14.4 Reporter: Zhuang Liu Fix For: 1.16.0 In https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java#L36 the annotation may be " Start the service with the given job leader id actions" instead of "Start the service with the given job leader actions." -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26693) Modify the grammar mistakes in savepoints.md
Liu created FLINK-26693: --- Summary: Modify the grammar mistakes in savepoints.md Key: FLINK-26693 URL: https://issues.apache.org/jira/browse/FLINK-26693 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.15.0 Reporter: Liu Change the sentence "The legacy is mode is how Flink worked until 1.15." to "The legacy mode is how Flink worked until 1.15." -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26683) Terminate the job anyway if savepoint finished when stop-with-savepoint
Liu created FLINK-26683: --- Summary: Terminate the job anyway if savepoint finished when stop-with-savepoint Key: FLINK-26683 URL: https://issues.apache.org/jira/browse/FLINK-26683 Project: Flink Issue Type: Improvement Affects Versions: 1.15.0 Reporter: Liu When we stop with savepoint, the savepoint finishes. But some tasks failover for some reason and restart to running. In the end, some tasks are finished and some tasks are running. In this case, I think that we should terminate all the tasks anyway instead of restarting since the savepoint is finished and the job stops consuming data. What do you think? -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26589) Update the logging level of Kinesis Streams and Firehose sinks
Zichen Liu created FLINK-26589: -- Summary: Update the logging level of Kinesis Streams and Firehose sinks Key: FLINK-26589 URL: https://issues.apache.org/jira/browse/FLINK-26589 Project: Flink Issue Type: Bug Components: Connectors / Kinesis Reporter: Zichen Liu Assignee: Ahmed Hamdy Fix For: 1.15.0 h2. Bug: The Async Sink Base sink is not limiting throughput to the destination and therefore exceeding rate limits *Cause:* We are not throttling our requests downstream at all. We should monitor for requests that have failed with ThroughputExceeded exceptions and reduce the throughput accordingly. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26491) In org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L173 the annotation should be "if" instead of "iff"
Zhuang Liu created FLINK-26491: --- Summary: In org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L173 the annotation should be "if" instead of "iff" Key: FLINK-26491 URL: https://issues.apache.org/jira/browse/FLINK-26491 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.14.3, 1.13.6, 1.12.7, 1.11.6 Reporter: Zhuang Liu In org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L173 the annotation should be "if" instead of "iff" -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26380) Pending record count must be zero at this point: 5592
Fangliang Liu created FLINK-26380: - Summary: Pending record count must be zero at this point: 5592 Key: FLINK-26380 URL: https://issues.apache.org/jira/browse/FLINK-26380 Project: Flink Issue Type: Bug Reporter: Fangliang Liu Caused by: java.lang.IllegalStateException: Pending record count must be zero at this point: 5592 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26309) Add a polling strategy to determine whether Localstack test container has started
Zichen Liu created FLINK-26309: -- Summary: Add a polling strategy to determine whether Localstack test container has started Key: FLINK-26309 URL: https://issues.apache.org/jira/browse/FLINK-26309 Project: Flink Issue Type: Technical Debt Components: Connectors / Common Reporter: Zichen Liu Assignee: Zichen Liu The firehose sink is an at least once sink. But we only expect there to be duplicates during failures and reload from save/checkpoints. During `KinesisFirehoseSinkITCase` there is no such action, and yet, we occasionally get duplicates in the test result. The test was originally asserting exactly once erroneously and this has been fixed in #18876 to assert at least once. However, a curiosity still remains: why were there duplicates? That is the purpose of this investigation. {code:java} Feb 22 02:47:37 [ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 83.215 s <<< FAILURE! - in org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase Feb 22 02:47:37 [ERROR] org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase.test Time elapsed: 50.712 s <<< FAILURE! Feb 22 02:47:37 org.opentest4j.AssertionFailedError: Feb 22 02:47:37 Feb 22 02:47:37 expected: 92 Feb 22 02:47:37 but was: 93 Feb 22 02:47:37 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) Feb 22 02:47:37 at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) Feb 22 02:47:37 at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) Feb 22 02:47:37 at org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase.test(KinesisFirehoseSinkITCase.java:133) Feb 22 02:47:37 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) Feb 22 02:47:37 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) Feb 22 02:47:37 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) Feb 22 02:47:37 at java.lang.reflect.Method.invoke(Method.java:498) Feb 22 02:47:37 at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) Feb 22 02:47:37 at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) Feb 22 02:47:37 at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) Feb 22 02:47:37 at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) Feb 22 02:47:37 at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) Feb 22 02:47:37 at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) Feb 22 02:47:37 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) Feb 22 02:47:37 at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) Feb 22 02:47:37 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) Feb 22 02:47:37 at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) Feb 22 02:47:37 at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) Feb 22 02:47:37 at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) Feb 22 02:47:37 at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) Feb 22 02:47:37 at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) Feb 22 02:47:37 at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) Feb 22 02:47:37 at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) Feb 22 02:47:37 at org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:30) Feb 22 02:47:37 at org.junit.rules.RunRules.evaluate(RunRules.java:20) Feb 22 02:47:37 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) Feb 22 02:47:37 at org.junit.runners.ParentRunner.run(ParentRunner.java:413) Feb 22 02:47:37 at org.junit.runner.JUnitCore.run(JUnitCore.java:137) Feb 22 02:47:37 at org.junit.runner.JUnitCore.run(JUnitCore.java:115) Feb 22 02:47:37 at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) Feb 22 02:47:37 at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) {code} [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=31983=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d=44249] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26305) KinesisFirehoseSinkITCase writes duplicates to Localstack
Zichen Liu created FLINK-26305: -- Summary: KinesisFirehoseSinkITCase writes duplicates to Localstack Key: FLINK-26305 URL: https://issues.apache.org/jira/browse/FLINK-26305 Project: Flink Issue Type: Bug Components: Connectors / Kinesis Affects Versions: 1.15.0 Reporter: Zichen Liu Assignee: Zichen Liu {code:java} Feb 22 02:47:37 [ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 83.215 s <<< FAILURE! - in org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase Feb 22 02:47:37 [ERROR] org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase.test Time elapsed: 50.712 s <<< FAILURE! Feb 22 02:47:37 org.opentest4j.AssertionFailedError: Feb 22 02:47:37 Feb 22 02:47:37 expected: 92 Feb 22 02:47:37 but was: 93 Feb 22 02:47:37 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) Feb 22 02:47:37 at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) Feb 22 02:47:37 at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) Feb 22 02:47:37 at org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase.test(KinesisFirehoseSinkITCase.java:133) Feb 22 02:47:37 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) Feb 22 02:47:37 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) Feb 22 02:47:37 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) Feb 22 02:47:37 at java.lang.reflect.Method.invoke(Method.java:498) Feb 22 02:47:37 at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) Feb 22 02:47:37 at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) Feb 22 02:47:37 at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) Feb 22 02:47:37 at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) Feb 22 02:47:37 at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) Feb 22 02:47:37 at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) Feb 22 02:47:37 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) Feb 22 02:47:37 at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) Feb 22 02:47:37 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) Feb 22 02:47:37 at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) Feb 22 02:47:37 at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) Feb 22 02:47:37 at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) Feb 22 02:47:37 at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) Feb 22 02:47:37 at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) Feb 22 02:47:37 at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) Feb 22 02:47:37 at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) Feb 22 02:47:37 at org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:30) Feb 22 02:47:37 at org.junit.rules.RunRules.evaluate(RunRules.java:20) Feb 22 02:47:37 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) Feb 22 02:47:37 at org.junit.runners.ParentRunner.run(ParentRunner.java:413) Feb 22 02:47:37 at org.junit.runner.JUnitCore.run(JUnitCore.java:137) Feb 22 02:47:37 at org.junit.runner.JUnitCore.run(JUnitCore.java:115) Feb 22 02:47:37 at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) Feb 22 02:47:37 at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) {code} https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=31983=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d=44249 -- This message was sent by Atlassian Jira (v8.20.1#820001)