[jira] [Updated] (FLINK-31335) using sql-gateway to submit job to yarn cluster, sql-gateway don't support kerberos
[ https://issues.apache.org/jira/browse/FLINK-31335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] felixzh updated FLINK-31335: Affects Version/s: 1.16.0 (was: 1.16.1) > using sql-gateway to submit job to yarn cluster, sql-gateway don't support > kerberos > --- > > Key: FLINK-31335 > URL: https://issues.apache.org/jira/browse/FLINK-31335 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Gateway >Affects Versions: 1.16.0 >Reporter: felixzh >Priority: Major > Labels: pull-request-available > > when submit job to yarn cluster, sql-gateway don't support kerberos. > 1. yarn-per-job mode > -Dexecution.target=yarn-per-job > 2. yarn-session mode > -Dexecution.target=yarn-session -Dyarn.application.id=yarnSessionAppID(eg: > application_1677479737242_0052) > sql-gateway need to use SecurityUtils Modules. > default use flink-conf.yaml, also support -Dkey=value(eg: 1/2) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-kafka] chucheng92 commented on a diff in pull request #8: [FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not qui
chucheng92 commented on code in PR #8: URL: https://github.com/apache/flink-connector-kafka/pull/8#discussion_r1139572887 ## flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java: ## @@ -166,6 +174,68 @@ public void testReaderRegistrationTriggersAssignments() throws Throwable { } } +@Test +public void testRunWithDiscoverPartitionsOnceToCheckNoMoreSplit() throws Throwable { +try (MockSplitEnumeratorContext context = +new MockSplitEnumeratorContext<>(NUM_SUBTASKS); +KafkaSourceEnumerator enumerator = +createEnumerator(context, DISABLE_PERIODIC_PARTITION_DISCOVERY)) { + +// Start the enumerator and it should schedule a one time task to discover and assign +// partitions. +enumerator.start(); + +// Run the partition discover callable and check the partition assignment. +runOneTimePartitionDiscovery(context); + +// enumerator noMoreNewPartitionSplits first will be false, when execute +// handlePartitionSplitChanges will be set true +assertThat((Boolean) Whitebox.getInternalState(enumerator, "noMoreNewPartitionSplits")) +.isTrue(); +} +} + +@Test +public void testRunWithPeriodicPartitionDiscoveryOnceToCheckNoMoreSplit() throws Throwable { +try (MockSplitEnumeratorContext context = +new MockSplitEnumeratorContext<>(NUM_SUBTASKS); +KafkaSourceEnumerator enumerator = +createEnumerator(context, ENABLE_PERIODIC_PARTITION_DISCOVERY)) { + +// Start the enumerator and it should schedule a one time task to discover and assign +// partitions. +enumerator.start(); +assertThat(context.getOneTimeCallables()).isEmpty(); +assertThat(context.getPeriodicCallables()) +.as("A periodic partition discovery callable should have been scheduled") +.hasSize(1); + +// enumerator noMoreNewPartitionSplits first will be false, even when execute +// handlePartitionSplitChanges it still be false +assertThat((Boolean) Whitebox.getInternalState(enumerator, "noMoreNewPartitionSplits")) +.isFalse(); +} +} + +@Test +public void testRunWithDiscoverPartitionsOnceWithZeroMsToCheckNoMoreSplit() throws Throwable { +try (MockSplitEnumeratorContext context = +new MockSplitEnumeratorContext<>(NUM_SUBTASKS); +// set partitionDiscoveryIntervalMs = 0 +KafkaSourceEnumerator enumerator = createEnumerator(context, 0L)) { + +// Start the enumerator, and it should schedule a one time task to discover and assign +// partitions. +enumerator.start(); +runOneTimePartitionDiscovery(context); Review Comment: Yes. good advice. I have fixed it. pls check it, thanks~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31486) Using KeySelector in IterationBody causes ClassNotFoundException
[ https://issues.apache.org/jira/browse/FLINK-31486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701508#comment-17701508 ] Jiang Xin commented on FLINK-31486: --- [~zhangzp] I'm afraid not. This issue is most likely causing by incorrect class loader, but FLINK-31255 seems not. > Using KeySelector in IterationBody causes ClassNotFoundException > > > Key: FLINK-31486 > URL: https://issues.apache.org/jira/browse/FLINK-31486 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Reporter: Jiang Xin >Priority: Major > Labels: pull-request-available > Fix For: ml-2.2.0 > > > When we use CoGroup along with KeySelector in an IterationBody, the following > exception occurs. > {code:java} > Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: > Could not instantiate state partitioner. at > org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662) > at > org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96) > at > org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168) > at > org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146) > at > org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at > java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ClassCastException: cannot assign instance of > java.lang.invoke.SerializedLambda to field > org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector.keySelector1 > of type org.apache.flink.api.java.functions.KeySelector in instance of > org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector > at > java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302) > at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543) > at > org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:659) > ... 17 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-jdbc] lightzhao commented on pull request #26: [Improve]Increase the valid of judging connection.
lightzhao commented on PR #26: URL: https://github.com/apache/flink-connector-jdbc/pull/26#issuecomment-1473130167 @kezhuw @leonardBang @wanglijie95 PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector
Aitozi commented on code in PR #21522: URL: https://github.com/apache/flink/pull/21522#discussion_r1139726153 ## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java: ## @@ -503,16 +522,16 @@ public void testViewSchema() throws Exception { CatalogView catalogView = (CatalogView) hiveCatalog.getTable(new ObjectPath("db1", "v1")); -Schema viewSchema = catalogView.getUnresolvedSchema(); +ResolvedSchema viewSchema = Review Comment: The main problem is the `UnresolvedDataType` The `UnresolvedDataType` do not provide the `equals` to compare. It's reasonable, because it's hard to say whether two `UnresolvedDataType` is equal. IMO, the resolved version can be comparable 樂. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector
luoyuxia commented on code in PR #21522: URL: https://github.com/apache/flink/pull/21522#discussion_r1139717230 ## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java: ## @@ -503,16 +522,16 @@ public void testViewSchema() throws Exception { CatalogView catalogView = (CatalogView) hiveCatalog.getTable(new ObjectPath("db1", "v1")); -Schema viewSchema = catalogView.getUnresolvedSchema(); +ResolvedSchema viewSchema = Review Comment: Not understand why `compare the Schema directly is not feasible/easy. Is there any problem if we don't change this test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22199: [FLINK-31489][sql-gateway] Fix instable OperationManagerTest.testCloseOperation
flinkbot commented on PR #22199: URL: https://github.com/apache/flink/pull/22199#issuecomment-1473119783 ## CI report: * 698b344967b261ebf458a6a678680a78a9a206fe UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector
Aitozi commented on code in PR #21522: URL: https://github.com/apache/flink/pull/21522#discussion_r1139716355 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java: ## @@ -96,11 +106,104 @@ public class HiveTableUtil { private HiveTableUtil() {} -public static TableSchema createTableSchema( +/** Create a Flink's Schema by hive client. */ +public static org.apache.flink.table.api.Schema createSchema( +HiveConf hiveConf, +Table hiveTable, +HiveMetastoreClientWrapper client, +HiveShim hiveShim) { + +Tuple4, List, Set, Optional> +hiveTableInfo = extractHiveTableInfo(hiveConf, hiveTable, client, hiveShim); + +return createSchema( +hiveTableInfo.f0, +hiveTableInfo.f1, +hiveTableInfo.f2, +hiveTableInfo.f3.orElse(null)); +} + +/** Create a Flink's Schema from Hive table's columns and partition keys. */ +public static org.apache.flink.table.api.Schema createSchema( +List nonPartCols, +List partitionKeys, +Set notNullColumns, +@Nullable UniqueConstraint primaryKey) { +return Schema.newBuilder() +.fromResolvedSchema( +createResolvedSchema( Review Comment: Make sense, will apply your commits :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31489) OperationManagerTest.testCloseOperation failed because it couldn't find a submitted operation
[ https://issues.apache.org/jira/browse/FLINK-31489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701505#comment-17701505 ] lincoln lee commented on FLINK-31489: - [~mapohl] seems it's an instable test(confirmed with [~shengkai] offline, he's on vocation), the test code can be improved, I've submited a fix and deprioritize this issue to Critical. > OperationManagerTest.testCloseOperation failed because it couldn't find a > submitted operation > - > > Key: FLINK-31489 > URL: https://issues.apache.org/jira/browse/FLINK-31489 > Project: Flink > Issue Type: Bug > Components: Table SQL / Gateway >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Priority: Blocker > Labels: pull-request-available, test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47218=logs=ce3801ad-3bd5-5f06-d165-34d37e757d90=5e4d9387-1dcc-5885-a901-90469b7e6d2f=13386 > {code} > Mar 16 02:49:52 [ERROR] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 30.433 s <<< FAILURE! - in > org.apache.flink.table.gateway.service.operation.OperationManagerTest > Mar 16 02:49:52 [ERROR] > org.apache.flink.table.gateway.service.operation.OperationManagerTest.testCloseOperation > Time elapsed: 0.042 s <<< ERROR! > Mar 16 02:49:52 org.apache.flink.table.gateway.api.utils.SqlGatewayException: > Can not find the submitted operation in the OperationManager with the > 1734d6cf-cf52-40c5-804f-809e48a9818a. > Mar 16 02:49:52 at > org.apache.flink.table.gateway.service.operation.OperationManager.lambda$getOperation$3(OperationManager.java:487) > Mar 16 02:49:52 at > org.apache.flink.table.gateway.service.operation.OperationManager.readLock(OperationManager.java:518) > Mar 16 02:49:52 at > org.apache.flink.table.gateway.service.operation.OperationManager.getOperation(OperationManager.java:482) > Mar 16 02:49:52 at > org.apache.flink.table.gateway.service.operation.OperationManager.awaitOperationTermination(OperationManager.java:149) > Mar 16 02:49:52 at > org.apache.flink.table.gateway.service.operation.OperationManagerTest.testCloseOperation(OperationManagerTest.java:199) > Mar 16 02:49:52 at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31489) OperationManagerTest.testCloseOperation failed because it couldn't find a submitted operation
[ https://issues.apache.org/jira/browse/FLINK-31489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee updated FLINK-31489: Priority: Critical (was: Blocker) > OperationManagerTest.testCloseOperation failed because it couldn't find a > submitted operation > - > > Key: FLINK-31489 > URL: https://issues.apache.org/jira/browse/FLINK-31489 > Project: Flink > Issue Type: Bug > Components: Table SQL / Gateway >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Priority: Critical > Labels: pull-request-available, test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47218=logs=ce3801ad-3bd5-5f06-d165-34d37e757d90=5e4d9387-1dcc-5885-a901-90469b7e6d2f=13386 > {code} > Mar 16 02:49:52 [ERROR] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 30.433 s <<< FAILURE! - in > org.apache.flink.table.gateway.service.operation.OperationManagerTest > Mar 16 02:49:52 [ERROR] > org.apache.flink.table.gateway.service.operation.OperationManagerTest.testCloseOperation > Time elapsed: 0.042 s <<< ERROR! > Mar 16 02:49:52 org.apache.flink.table.gateway.api.utils.SqlGatewayException: > Can not find the submitted operation in the OperationManager with the > 1734d6cf-cf52-40c5-804f-809e48a9818a. > Mar 16 02:49:52 at > org.apache.flink.table.gateway.service.operation.OperationManager.lambda$getOperation$3(OperationManager.java:487) > Mar 16 02:49:52 at > org.apache.flink.table.gateway.service.operation.OperationManager.readLock(OperationManager.java:518) > Mar 16 02:49:52 at > org.apache.flink.table.gateway.service.operation.OperationManager.getOperation(OperationManager.java:482) > Mar 16 02:49:52 at > org.apache.flink.table.gateway.service.operation.OperationManager.awaitOperationTermination(OperationManager.java:149) > Mar 16 02:49:52 at > org.apache.flink.table.gateway.service.operation.OperationManagerTest.testCloseOperation(OperationManagerTest.java:199) > Mar 16 02:49:52 at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31489) OperationManagerTest.testCloseOperation failed because it couldn't find a submitted operation
[ https://issues.apache.org/jira/browse/FLINK-31489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31489: --- Labels: pull-request-available test-stability (was: test-stability) > OperationManagerTest.testCloseOperation failed because it couldn't find a > submitted operation > - > > Key: FLINK-31489 > URL: https://issues.apache.org/jira/browse/FLINK-31489 > Project: Flink > Issue Type: Bug > Components: Table SQL / Gateway >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Priority: Blocker > Labels: pull-request-available, test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47218=logs=ce3801ad-3bd5-5f06-d165-34d37e757d90=5e4d9387-1dcc-5885-a901-90469b7e6d2f=13386 > {code} > Mar 16 02:49:52 [ERROR] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 30.433 s <<< FAILURE! - in > org.apache.flink.table.gateway.service.operation.OperationManagerTest > Mar 16 02:49:52 [ERROR] > org.apache.flink.table.gateway.service.operation.OperationManagerTest.testCloseOperation > Time elapsed: 0.042 s <<< ERROR! > Mar 16 02:49:52 org.apache.flink.table.gateway.api.utils.SqlGatewayException: > Can not find the submitted operation in the OperationManager with the > 1734d6cf-cf52-40c5-804f-809e48a9818a. > Mar 16 02:49:52 at > org.apache.flink.table.gateway.service.operation.OperationManager.lambda$getOperation$3(OperationManager.java:487) > Mar 16 02:49:52 at > org.apache.flink.table.gateway.service.operation.OperationManager.readLock(OperationManager.java:518) > Mar 16 02:49:52 at > org.apache.flink.table.gateway.service.operation.OperationManager.getOperation(OperationManager.java:482) > Mar 16 02:49:52 at > org.apache.flink.table.gateway.service.operation.OperationManager.awaitOperationTermination(OperationManager.java:149) > Mar 16 02:49:52 at > org.apache.flink.table.gateway.service.operation.OperationManagerTest.testCloseOperation(OperationManagerTest.java:199) > Mar 16 02:49:52 at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] luoyuxia commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector
luoyuxia commented on code in PR #21522: URL: https://github.com/apache/flink/pull/21522#discussion_r1139712015 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java: ## @@ -96,11 +106,104 @@ public class HiveTableUtil { private HiveTableUtil() {} -public static TableSchema createTableSchema( +/** Create a Flink's Schema by hive client. */ +public static org.apache.flink.table.api.Schema createSchema( +HiveConf hiveConf, +Table hiveTable, +HiveMetastoreClientWrapper client, +HiveShim hiveShim) { + +Tuple4, List, Set, Optional> +hiveTableInfo = extractHiveTableInfo(hiveConf, hiveTable, client, hiveShim); + +return createSchema( +hiveTableInfo.f0, +hiveTableInfo.f1, +hiveTableInfo.f2, +hiveTableInfo.f3.orElse(null)); +} + +/** Create a Flink's Schema from Hive table's columns and partition keys. */ +public static org.apache.flink.table.api.Schema createSchema( +List nonPartCols, +List partitionKeys, +Set notNullColumns, +@Nullable UniqueConstraint primaryKey) { +return Schema.newBuilder() +.fromResolvedSchema( +createResolvedSchema( Review Comment: I aggree it's not harmful to do extra schema conversion althogth it'll create extra object and `createSchema` will be called a liitle of frequently. But if we can meet both code save and reduce schema conversion with minor efforts, why not do that? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lincoln-lil opened a new pull request, #22199: [FLINK-31489][sql-gateway] Fix instable OperationManagerTest.testCloseOperation
lincoln-lil opened a new pull request, #22199: URL: https://github.com/apache/flink/pull/22199 ## What is the purpose of the change This is hotfix for the instable test `OperationManagerTest.testCloseOperation`, the reason is the concurrent closeOperation may finished before awaitOperationTermination(operationHandle) which may encounter an expected error when do getOperation inside it. ## Brief change log move the awaitOperationTermination into the assert thrown block ## Verifying this change existing testCloseOperation case ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with @Public(Evolving): (no) - The serializers: (no ) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector
Aitozi commented on code in PR #21522: URL: https://github.com/apache/flink/pull/21522#discussion_r1139710984 ## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java: ## @@ -503,16 +522,16 @@ public void testViewSchema() throws Exception { CatalogView catalogView = (CatalogView) hiveCatalog.getTable(new ObjectPath("db1", "v1")); -Schema viewSchema = catalogView.getUnresolvedSchema(); +ResolvedSchema viewSchema = Review Comment: Yes, you are right. But as my comments above, compare the Schema directly is not feasible/easy. So we compare the resolved version to compare with the expected one. I think it can cover the original requirements. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector
Aitozi commented on code in PR #21522: URL: https://github.com/apache/flink/pull/21522#discussion_r1139706415 ## flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/TestSchemaResolver.java: ## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.SqlCallExpression; +import org.apache.flink.table.types.AbstractDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.UnresolvedDataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.utils.LogicalTypeParser; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; + +/** Testing implementation of {@link SchemaResolver}. */ +public class TestSchemaResolver implements SchemaResolver { + +private final DataTypeFactory dataTypeFactory = new TestingDataTypeFactory(); +private final Map resolveExpressionTable = new HashMap<>(); + +@Override +public ResolvedSchema resolve(Schema schema) { +final List columns = resolveColumns(schema.getColumns()); + +final List watermarkSpecs = +schema.getWatermarkSpecs().stream() +.map(this::resolveWatermarkSpecs) +.collect(Collectors.toList()); + +final UniqueConstraint primaryKey = resolvePrimaryKey(schema.getPrimaryKey().orElse(null)); + +return new ResolvedSchema(columns, watermarkSpecs, primaryKey); +} + +private List resolveColumns(List unresolvedColumns) { +final Column[] resolvedColumns = new Column[unresolvedColumns.size()]; + +int i = 0; +for (Schema.UnresolvedColumn unresolvedColumn : unresolvedColumns) { +if (unresolvedColumn instanceof Schema.UnresolvedPhysicalColumn) { +resolvedColumns[i] = + resolvePhysicalColumn((Schema.UnresolvedPhysicalColumn) unresolvedColumn); +} else if (unresolvedColumn instanceof Schema.UnresolvedMetadataColumn) { +resolvedColumns[i] = + resolveMetadataColumn((Schema.UnresolvedMetadataColumn) unresolvedColumn); +} else if (unresolvedColumn instanceof Schema.UnresolvedComputedColumn) { +resolvedColumns[i] = +Column.computed( +unresolvedColumn.getName(), +resolveExpression( + ((Schema.UnresolvedComputedColumn) unresolvedColumn) +.getExpression())) + .withComment(unresolvedColumn.getComment().orElse(null)); +} +i++; +} +return Arrays.asList(resolvedColumns); +} + +private Column.PhysicalColumn resolvePhysicalColumn( +Schema.UnresolvedPhysicalColumn unresolvedColumn) { +return Column.physical( +unresolvedColumn.getName(), + dataTypeFactory.createDataType(unresolvedColumn.getDataType())) +.withComment(unresolvedColumn.getComment().orElse(null)); +} + +private Column.MetadataColumn resolveMetadataColumn( +Schema.UnresolvedMetadataColumn unresolvedColumn) { +return Column.metadata( +unresolvedColumn.getName(), + dataTypeFactory.createDataType(unresolvedColumn.getDataType()), +unresolvedColumn.getMetadataKey(), +unresolvedColumn.isVirtual()) +
[jira] [Commented] (FLINK-31081) Update Jira links in How To Contribute guide
[ https://issues.apache.org/jira/browse/FLINK-31081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701502#comment-17701502 ] Eric Brzezenski commented on FLINK-31081: - Do you mind assigning this to me? ill update these locations and any others that I may find. > Update Jira links in How To Contribute guide > > > Key: FLINK-31081 > URL: https://issues.apache.org/jira/browse/FLINK-31081 > Project: Flink > Issue Type: Bug > Components: Project Website >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Priority: Major > Labels: starter > > FLINK-30007 added a description in [the community > docs|https://flink.apache.org/community.html#issue-tracker] on how to get > access to the Flink Jira. But several other locations mentioned and link Jira > as well (especially in the how to contribute sections). Newcomers might be > miss the paragraph in community and wonder how they could get a working Jira > account. > This issue is about replacing all the Jira links (where it's useful) by a > reference to the issue track section in the community docs > (https://flink.apache.org/community.html#issue-tracker). That way, they are > redirected to the information on how they can gain access to the Flink jira > board. > Locations that needs to be updated are (not exclusively!): > * https://flink.apache.org/contributing/contribute-code.html > * https://flink.apache.org/gettinghelp.html > * https://flink.apache.org/contributing/how-to-contribute.html -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector
Aitozi commented on code in PR #21522: URL: https://github.com/apache/flink/pull/21522#discussion_r1139701322 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java: ## @@ -96,11 +106,104 @@ public class HiveTableUtil { private HiveTableUtil() {} -public static TableSchema createTableSchema( +/** Create a Flink's Schema by hive client. */ +public static org.apache.flink.table.api.Schema createSchema( +HiveConf hiveConf, +Table hiveTable, +HiveMetastoreClientWrapper client, +HiveShim hiveShim) { + +Tuple4, List, Set, Optional> +hiveTableInfo = extractHiveTableInfo(hiveConf, hiveTable, client, hiveShim); + +return createSchema( +hiveTableInfo.f0, +hiveTableInfo.f1, +hiveTableInfo.f2, +hiveTableInfo.f3.orElse(null)); +} + +/** Create a Flink's Schema from Hive table's columns and partition keys. */ +public static org.apache.flink.table.api.Schema createSchema( +List nonPartCols, +List partitionKeys, +Set notNullColumns, +@Nullable UniqueConstraint primaryKey) { +return Schema.newBuilder() +.fromResolvedSchema( +createResolvedSchema( Review Comment: > In here, we will fisrt convert to ResolvedSchema and then convert to Schema. So why not convert to Schema directly? It's intend to save the code, because creating `ResolvedSchema` and `Schema` actually doing the same thing. IMO, we do not have to avoid using `Schema.newBuilder().fromResolvedSchema().build()` as much as possible, I think its harmless. Since it is a bridge to covert the ResolvedSchema back to Schema when needed. In this case, we can create `ResolvedSchema` directly from the Hive table information. In my original thought, we even do not have to add the dedicated method as below, the caller just can do it depend on the requirements. ``` Schema createScheam(List nonPartCols, List partitionKeys, Set notNullColumns, @Nullable UniqueConstraint primaryKey) { return Schema.newBuilder() .fromResolvedSchema( createResolvedSchema( nonPartCols, partitionKeys, notNullColumns, primaryKey)) .build(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30719) flink-runtime-web failed due to a corrupted
[ https://issues.apache.org/jira/browse/FLINK-30719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701501#comment-17701501 ] Leonard Xu commented on FLINK-30719: [~junhan] Could you take a look this ticket ? > flink-runtime-web failed due to a corrupted > > > Key: FLINK-30719 > URL: https://issues.apache.org/jira/browse/FLINK-30719 > Project: Flink > Issue Type: Bug > Components: Runtime / Web Frontend, Test Infrastructure, Tests >Affects Versions: 1.16.0, 1.17.0 >Reporter: Matthias Pohl >Priority: Critical > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44954=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=54421a62-0c80-5aad-3319-094ff69180bb=12550 > The build failed due to a corrupted nodejs dependency: > {code} > [ERROR] The archive file > /__w/1/.m2/repository/com/github/eirslett/node/16.13.2/node-16.13.2-linux-x64.tar.gz > is corrupted and will be deleted. Please try the build again. > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] luoyuxia commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector
luoyuxia commented on code in PR #21522: URL: https://github.com/apache/flink/pull/21522#discussion_r1139691598 ## flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/TestSchemaResolver.java: ## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.SqlCallExpression; +import org.apache.flink.table.types.AbstractDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.UnresolvedDataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.utils.LogicalTypeParser; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; + +/** Testing implementation of {@link SchemaResolver}. */ +public class TestSchemaResolver implements SchemaResolver { + +private final DataTypeFactory dataTypeFactory = new TestingDataTypeFactory(); +private final Map resolveExpressionTable = new HashMap<>(); + +@Override +public ResolvedSchema resolve(Schema schema) { +final List columns = resolveColumns(schema.getColumns()); + +final List watermarkSpecs = +schema.getWatermarkSpecs().stream() +.map(this::resolveWatermarkSpecs) +.collect(Collectors.toList()); + +final UniqueConstraint primaryKey = resolvePrimaryKey(schema.getPrimaryKey().orElse(null)); + +return new ResolvedSchema(columns, watermarkSpecs, primaryKey); +} + +private List resolveColumns(List unresolvedColumns) { +final Column[] resolvedColumns = new Column[unresolvedColumns.size()]; + +int i = 0; +for (Schema.UnresolvedColumn unresolvedColumn : unresolvedColumns) { +if (unresolvedColumn instanceof Schema.UnresolvedPhysicalColumn) { +resolvedColumns[i] = + resolvePhysicalColumn((Schema.UnresolvedPhysicalColumn) unresolvedColumn); +} else if (unresolvedColumn instanceof Schema.UnresolvedMetadataColumn) { +resolvedColumns[i] = + resolveMetadataColumn((Schema.UnresolvedMetadataColumn) unresolvedColumn); +} else if (unresolvedColumn instanceof Schema.UnresolvedComputedColumn) { +resolvedColumns[i] = +Column.computed( +unresolvedColumn.getName(), +resolveExpression( + ((Schema.UnresolvedComputedColumn) unresolvedColumn) +.getExpression())) + .withComment(unresolvedColumn.getComment().orElse(null)); +} +i++; +} +return Arrays.asList(resolvedColumns); +} + +private Column.PhysicalColumn resolvePhysicalColumn( +Schema.UnresolvedPhysicalColumn unresolvedColumn) { +return Column.physical( +unresolvedColumn.getName(), + dataTypeFactory.createDataType(unresolvedColumn.getDataType())) +.withComment(unresolvedColumn.getComment().orElse(null)); +} + +private Column.MetadataColumn resolveMetadataColumn( +Schema.UnresolvedMetadataColumn unresolvedColumn) { +return Column.metadata( +unresolvedColumn.getName(), + dataTypeFactory.createDataType(unresolvedColumn.getDataType()), +unresolvedColumn.getMetadataKey(), +unresolvedColumn.isVirtual()) +
[jira] [Assigned] (FLINK-31443) FineGrainedSlotManager maintain some redundant task managers
[ https://issues.apache.org/jira/browse/FLINK-31443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo reassigned FLINK-31443: -- Assignee: Weihua Hu > FineGrainedSlotManager maintain some redundant task managers > > > Key: FLINK-31443 > URL: https://issues.apache.org/jira/browse/FLINK-31443 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Weihua Hu >Assignee: Weihua Hu >Priority: Major > > implementation of > [FLINK-18625|https://issues.apache.org/jira/browse/FLINK-18625] in > FineGrainedSlotManager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31447) Aligning unit tests of FineGrainedSlotManager with DeclarativeSlotManager
[ https://issues.apache.org/jira/browse/FLINK-31447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo reassigned FLINK-31447: -- Assignee: Weihua Hu > Aligning unit tests of FineGrainedSlotManager with DeclarativeSlotManager > - > > Key: FLINK-31447 > URL: https://issues.apache.org/jira/browse/FLINK-31447 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Weihua Hu >Assignee: Weihua Hu >Priority: Major > > There's the DeclarativeSlotManagerTest that covers some specific issues that > should be ported to the fine grained slot manager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] luoyuxia commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector
luoyuxia commented on code in PR #21522: URL: https://github.com/apache/flink/pull/21522#discussion_r1139688985 ## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java: ## @@ -503,16 +522,16 @@ public void testViewSchema() throws Exception { CatalogView catalogView = (CatalogView) hiveCatalog.getTable(new ObjectPath("db1", "v1")); -Schema viewSchema = catalogView.getUnresolvedSchema(); +ResolvedSchema viewSchema = Review Comment: IIUC, this test method is meant to check the `CatalogView` gotten from method `hiveCatalog.getTable` . As the gotten schema is UnResolvedSchema, which compare with ResolvedSchema? it'll extended the test scope. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector
Aitozi commented on code in PR #21522: URL: https://github.com/apache/flink/pull/21522#discussion_r1139685134 ## flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/TestSchemaResolver.java: ## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.SqlCallExpression; +import org.apache.flink.table.types.AbstractDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.UnresolvedDataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.utils.LogicalTypeParser; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; + +/** Testing implementation of {@link SchemaResolver}. */ +public class TestSchemaResolver implements SchemaResolver { + +private final DataTypeFactory dataTypeFactory = new TestingDataTypeFactory(); +private final Map resolveExpressionTable = new HashMap<>(); + +@Override +public ResolvedSchema resolve(Schema schema) { +final List columns = resolveColumns(schema.getColumns()); + +final List watermarkSpecs = +schema.getWatermarkSpecs().stream() +.map(this::resolveWatermarkSpecs) +.collect(Collectors.toList()); + +final UniqueConstraint primaryKey = resolvePrimaryKey(schema.getPrimaryKey().orElse(null)); + +return new ResolvedSchema(columns, watermarkSpecs, primaryKey); +} + +private List resolveColumns(List unresolvedColumns) { +final Column[] resolvedColumns = new Column[unresolvedColumns.size()]; + +int i = 0; +for (Schema.UnresolvedColumn unresolvedColumn : unresolvedColumns) { +if (unresolvedColumn instanceof Schema.UnresolvedPhysicalColumn) { +resolvedColumns[i] = + resolvePhysicalColumn((Schema.UnresolvedPhysicalColumn) unresolvedColumn); +} else if (unresolvedColumn instanceof Schema.UnresolvedMetadataColumn) { +resolvedColumns[i] = + resolveMetadataColumn((Schema.UnresolvedMetadataColumn) unresolvedColumn); +} else if (unresolvedColumn instanceof Schema.UnresolvedComputedColumn) { +resolvedColumns[i] = +Column.computed( +unresolvedColumn.getName(), +resolveExpression( + ((Schema.UnresolvedComputedColumn) unresolvedColumn) +.getExpression())) + .withComment(unresolvedColumn.getComment().orElse(null)); +} +i++; +} +return Arrays.asList(resolvedColumns); +} + +private Column.PhysicalColumn resolvePhysicalColumn( +Schema.UnresolvedPhysicalColumn unresolvedColumn) { +return Column.physical( +unresolvedColumn.getName(), + dataTypeFactory.createDataType(unresolvedColumn.getDataType())) +.withComment(unresolvedColumn.getComment().orElse(null)); +} + +private Column.MetadataColumn resolveMetadataColumn( +Schema.UnresolvedMetadataColumn unresolvedColumn) { +return Column.metadata( +unresolvedColumn.getName(), + dataTypeFactory.createDataType(unresolvedColumn.getDataType()), +unresolvedColumn.getMetadataKey(), +unresolvedColumn.isVirtual()) +
[GitHub] [flink] chucheng92 commented on pull request #22179: [FLINK-31380][table] FLIP-297: Support enhanced show catalogs syntax
chucheng92 commented on PR #22179: URL: https://github.com/apache/flink/pull/22179#issuecomment-1473088287 > @wuchong hi, jark. I added a lot of test cases and did integration testing for a while. Can u help me to review this feature? cc @wuchong PTAL~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector
Aitozi commented on code in PR #21522: URL: https://github.com/apache/flink/pull/21522#discussion_r1139678740 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java: ## @@ -91,16 +91,50 @@ public static Map serializeCatalogTable(ResolvedCatalogTable res } } +/** Serializes the given {@link ResolvedCatalogView} into a map of string properties. */ +public static Map serializeCatalogView(ResolvedCatalogView resolvedView) { +try { +final Map properties = new HashMap<>(); + +serializeResolvedSchema(properties, resolvedView.getResolvedSchema()); + +final String comment = resolvedView.getComment(); +if (comment != null && comment.length() > 0) { +properties.put(COMMENT, comment); +} + +properties.putAll(resolvedView.getOptions()); + +properties.remove(IS_GENERIC); // reserved option + +return properties; +} catch (Exception e) { +throw new CatalogException("Error in serializing catalog view.", e); +} +} + /** Deserializes the given map of string properties into an unresolved {@link CatalogTable}. */ public static CatalogTable deserializeCatalogTable(Map properties) { +return deserializeCatalogTable(properties, null); +} + +/** Deserializes the given map of string properties into an unresolved {@link CatalogTable}. */ +public static CatalogTable deserializeCatalogTable( Review Comment: updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31259) Gateway supports initialization of catalog at startup
[ https://issues.apache.org/jira/browse/FLINK-31259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701494#comment-17701494 ] Jark Wu commented on FLINK-31259: - [~lintingbin] I see. What do you think about this? [~fsk119] > Gateway supports initialization of catalog at startup > - > > Key: FLINK-31259 > URL: https://issues.apache.org/jira/browse/FLINK-31259 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Gateway >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > Support to initializing catalogs in gateway when it starts -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31380) Support enhanced show catalogs syntax
[ https://issues.apache.org/jira/browse/FLINK-31380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31380: --- Labels: pull-request-available (was: ) > Support enhanced show catalogs syntax > - > > Key: FLINK-31380 > URL: https://issues.apache.org/jira/browse/FLINK-31380 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API, Table SQL / Planner >Reporter: Ran Tao >Assignee: Ran Tao >Priority: Major > Labels: pull-request-available > > As FLIP discussed. We will support new syntax for some show operations. > To avoid bloat, this ticket supports ShowCatalogs. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector
Aitozi commented on code in PR #21522: URL: https://github.com/apache/flink/pull/21522#discussion_r1139673570 ## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java: ## @@ -503,16 +522,16 @@ public void testViewSchema() throws Exception { CatalogView catalogView = (CatalogView) hiveCatalog.getTable(new ObjectPath("db1", "v1")); -Schema viewSchema = catalogView.getUnresolvedSchema(); +ResolvedSchema viewSchema = Review Comment: Why? We need compare with ResolvedSchema -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22198: [FLINK-30923][documentation] Provide single script for installing Hugo
flinkbot commented on PR #22198: URL: https://github.com/apache/flink/pull/22198#issuecomment-1473083751 ## CI report: * 68d441a953af8352554af6e55455aab097fcf386 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-30011) HiveCatalogGenericMetadataTest azure CI failed due to catalog does not exist
[ https://issues.apache.org/jira/browse/FLINK-30011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia resolved FLINK-30011. -- Resolution: Duplicate > HiveCatalogGenericMetadataTest azure CI failed due to catalog does not exist > > > Key: FLINK-30011 > URL: https://issues.apache.org/jira/browse/FLINK-30011 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.16.1 >Reporter: Leonard Xu >Priority: Major > > {noformat} > Nov 13 01:55:18 [ERROR] > HiveCatalogHiveMetadataTest>CatalogTest.testGetPartitionStats:1212 » Catalog > F... > Nov 13 01:55:18 [ERROR] > HiveCatalogHiveMetadataTest>CatalogTest.testGetPartition_PartitionNotExist:1160 > » Catalog > Nov 13 01:55:18 [ERROR] > HiveCatalogHiveMetadataTest>CatalogTest.testGetPartition_PartitionSpecInvalid_invalidPartitionSpec:1124 > » Catalog > Nov 13 01:55:18 [ERROR] > HiveCatalogHiveMetadataTest>CatalogTest.testGetPartition_PartitionSpecInvalid_sizeNotEqual:1139 > » Catalog > Nov 13 01:55:18 [ERROR] > HiveCatalogHiveMetadataTest>CatalogTest.testGetPartition_TableNotPartitioned:1110 > » Catalog > Nov 13 01:55:18 [ERROR] > HiveCatalogHiveMetadataTest>CatalogTest.testGetTableStats_TableNotExistException:1201 > » Catalog > Nov 13 01:55:18 [ERROR] > HiveCatalogHiveMetadataTest>CatalogTest.testGetTable_TableNotExistException:323 > » Catalog > Nov 13 01:55:18 [ERROR] HiveCatalogHiveMetadataTest.testHiveStatistics:251 > » Catalog Failed to create ... > Nov 13 01:55:18 [ERROR] > HiveCatalogHiveMetadataTest>CatalogTest.testListFunctions:749 » Catalog > Failed... > Nov 13 01:55:18 [ERROR] > HiveCatalogHiveMetadataTest>CatalogTest.testListPartitionPartialSpec:1188 » > Catalog > Nov 13 01:55:18 [ERROR] > HiveCatalogHiveMetadataTest>CatalogTest.testListTables:498 » Catalog Failed > to... > Nov 13 01:55:18 [ERROR] > HiveCatalogHiveMetadataTest>CatalogTest.testListView:620 » Catalog Failed to > c... > Nov 13 01:55:18 [ERROR] > HiveCatalogHiveMetadataTest>CatalogTest.testPartitionExists:1174 » Catalog > Fai... > Nov 13 01:55:18 [ERROR] > HiveCatalogHiveMetadataTest>CatalogTest.testRenameTable_TableAlreadyExistException:483 > » Catalog > Nov 13 01:55:18 [ERROR] > HiveCatalogHiveMetadataTest>CatalogTest.testRenameTable_TableNotExistException:465 > » Catalog > Nov 13 01:55:18 [ERROR] > HiveCatalogHiveMetadataTest>CatalogTest.testRenameTable_TableNotExistException_ignored:477 > » Catalog > Nov 13 01:55:18 [ERROR] > HiveCatalogHiveMetadataTest>CatalogTest.testRenameTable_nonPartitionedTable:451 > » Catalog > Nov 13 01:55:18 [ERROR] > HiveCatalogHiveMetadataTest>CatalogTest.testRenameView:637 » Catalog Failed > to... > Nov 13 01:55:18 [ERROR] > HiveCatalogHiveMetadataTest>CatalogTest.testTableExists:510 » Catalog Failed > t... > Nov 13 01:55:18 [ERROR] > HiveCatalogHiveMetadataTest.testViewCompatibility:115 » Catalog Failed to > crea... > Nov 13 01:55:18 [INFO] > Nov 13 01:55:18 [ERROR] Tests run: 361, Failures: 0, Errors: 132, Skipped: 0 > {noformat} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43104=logs=245e1f2e-ba5b-5570-d689-25ae21e5302f=d04c9862-880c-52f5-574b-a7a79fef8e0f -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30923) Provide single script for installing Hugo
[ https://issues.apache.org/jira/browse/FLINK-30923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701489#comment-17701489 ] Eric Brzezenski commented on FLINK-30923: - Would you mind assigning this ticket to me please? i think i understood the description and pushed out a change: https://github.com/apache/flink/pull/22198 > Provide single script for installing Hugo > - > > Key: FLINK-30923 > URL: https://issues.apache.org/jira/browse/FLINK-30923 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Priority: Major > Labels: pull-request-available, starter > > Currently, we have multiple locations to install hugo. In the past this > caused using different Hugo version for building the docs depending which > script triggered the docs build (see > [ci/docs.sh|https://github.com/apache/flink/blob/de368acf0038328a751507a2fa7cb0989d6312e7/tools/ci/docs.sh#L20] > and > [.github/workflows/docs.sh:26|https://github.com/apache/flink/blob/278642219ebf4b68a02a3901fe06a7cb006d105b/.github/workflows/docs.sh#L26]). > We could have a {{docs/setup_hugo.sh}} next to {{docs/setup_docs.sh}} that > provides this functionality. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector
Aitozi commented on code in PR #21522: URL: https://github.com/apache/flink/pull/21522#discussion_r1139668108 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java: ## @@ -682,11 +682,11 @@ private RelNode handleDestSchema( return queryRelNode; } +ResolvedCatalogTable resolvedCatalogTable = catalogManager.resolveCatalogTable(destTable); Review Comment: forgot it, updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30923) Provide single script for installing Hugo
[ https://issues.apache.org/jira/browse/FLINK-30923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30923: --- Labels: pull-request-available starter (was: starter) > Provide single script for installing Hugo > - > > Key: FLINK-30923 > URL: https://issues.apache.org/jira/browse/FLINK-30923 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Priority: Major > Labels: pull-request-available, starter > > Currently, we have multiple locations to install hugo. In the past this > caused using different Hugo version for building the docs depending which > script triggered the docs build (see > [ci/docs.sh|https://github.com/apache/flink/blob/de368acf0038328a751507a2fa7cb0989d6312e7/tools/ci/docs.sh#L20] > and > [.github/workflows/docs.sh:26|https://github.com/apache/flink/blob/278642219ebf4b68a02a3901fe06a7cb006d105b/.github/workflows/docs.sh#L26]). > We could have a {{docs/setup_hugo.sh}} next to {{docs/setup_docs.sh}} that > provides this functionality. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] EricBrzezenski opened a new pull request, #22198: [FLINK-30923][documentation] Provide single script for installing Hugo
EricBrzezenski opened a new pull request, #22198: URL: https://github.com/apache/flink/pull/22198 ## What is the purpose of the change To Add a single source for downloading hugo artifacts for building the documentation. ## Brief change log - Consolidating Hugo artifact downloads to a single location ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): No - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: No - The serializers: No - The runtime per-record code paths (performance sensitive): No - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: No - The S3 file system connector: No ## Documentation - Does this pull request introduce a new feature? No - If yes, how is the feature documented? (N/A -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31432) Introduce a special StoreWriteOperator to deal with schema changes
[ https://issues.apache.org/jira/browse/FLINK-31432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31432: --- Labels: pull-request-available (was: ) > Introduce a special StoreWriteOperator to deal with schema changes > -- > > Key: FLINK-31432 > URL: https://issues.apache.org/jira/browse/FLINK-31432 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Affects Versions: table-store-0.4.0 >Reporter: Caizhi Weng >Assignee: Caizhi Weng >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.4.0 > > > Currently \{{StoreWriteOperator}} is not able to deal with schema changes. We > need to introduce a special \{{StoreWriteOperator}} to deal with schema > changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] luoyuxia commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector
luoyuxia commented on code in PR #21522: URL: https://github.com/apache/flink/pull/21522#discussion_r1139647818 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java: ## @@ -91,16 +91,50 @@ public static Map serializeCatalogTable(ResolvedCatalogTable res } } +/** Serializes the given {@link ResolvedCatalogView} into a map of string properties. */ +public static Map serializeCatalogView(ResolvedCatalogView resolvedView) { +try { +final Map properties = new HashMap<>(); + +serializeResolvedSchema(properties, resolvedView.getResolvedSchema()); + +final String comment = resolvedView.getComment(); +if (comment != null && comment.length() > 0) { +properties.put(COMMENT, comment); +} + +properties.putAll(resolvedView.getOptions()); + +properties.remove(IS_GENERIC); // reserved option + +return properties; +} catch (Exception e) { +throw new CatalogException("Error in serializing catalog view.", e); +} +} + /** Deserializes the given map of string properties into an unresolved {@link CatalogTable}. */ public static CatalogTable deserializeCatalogTable(Map properties) { +return deserializeCatalogTable(properties, null); +} + +/** Deserializes the given map of string properties into an unresolved {@link CatalogTable}. */ +public static CatalogTable deserializeCatalogTable( Review Comment: Please add comment for the parameter `fallbackKey`. I do spend some time to understand it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector
luoyuxia commented on code in PR #21522: URL: https://github.com/apache/flink/pull/21522#discussion_r1139614246 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java: ## @@ -96,11 +106,104 @@ public class HiveTableUtil { private HiveTableUtil() {} -public static TableSchema createTableSchema( +/** Create a Flink's Schema by hive client. */ +public static org.apache.flink.table.api.Schema createSchema( +HiveConf hiveConf, +Table hiveTable, +HiveMetastoreClientWrapper client, +HiveShim hiveShim) { + +Tuple4, List, Set, Optional> +hiveTableInfo = extractHiveTableInfo(hiveConf, hiveTable, client, hiveShim); + +return createSchema( +hiveTableInfo.f0, +hiveTableInfo.f1, +hiveTableInfo.f2, +hiveTableInfo.f3.orElse(null)); +} + +/** Create a Flink's Schema from Hive table's columns and partition keys. */ +public static org.apache.flink.table.api.Schema createSchema( +List nonPartCols, +List partitionKeys, +Set notNullColumns, +@Nullable UniqueConstraint primaryKey) { +return Schema.newBuilder() +.fromResolvedSchema( +createResolvedSchema( +nonPartCols, partitionKeys, notNullColumns, primaryKey)) +.build(); +} + +/** Create a Flink's ResolvedSchema from Hive table's columns and partition keys. */ +public static ResolvedSchema createResolvedSchema( +List nonPartCols, +List partitionKeys, +Set notNullColumns, +@Nullable UniqueConstraint primaryKey) { +List allCols = new ArrayList<>(nonPartCols); +allCols.addAll(partitionKeys); + +// PK columns cannot be null +if (primaryKey != null) { +notNullColumns.addAll(primaryKey.getColumns()); +} + +Tuple2 columnInformation = +extractColumnInformation(allCols, notNullColumns); + +org.apache.flink.table.catalog.UniqueConstraint pk = null; +if (primaryKey != null) { +pk = +org.apache.flink.table.catalog.UniqueConstraint.primaryKey( +primaryKey.getName(), primaryKey.getColumns()); +} +return new ResolvedSchema( +IntStream.range(0, columnInformation.f0.length) +.mapToObj( +i -> +Column.physical( +columnInformation.f0[i], columnInformation.f1[i])) +.collect(Collectors.toList()), +new ArrayList<>(), Review Comment: nit: ` Collections.emptyList();` ## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java: ## @@ -259,27 +261,55 @@ void testCreateTableWithConstraints() throws Exception { assumeThat(HiveVersionTestUtil.HIVE_310_OR_LATER).isTrue(); HiveCatalog hiveCatalog = (HiveCatalog) catalog; hiveCatalog.createDatabase(db1, createDb(), false); -TableSchema.Builder builder = TableSchema.builder(); -builder.fields( -new String[] {"x", "y", "z"}, -new DataType[] { -DataTypes.INT().notNull(), DataTypes.TIMESTAMP(9).notNull(), DataTypes.BIGINT() -}); -builder.primaryKey("pk_name", new String[] {"x"}); +ResolvedSchema resolvedSchema = +new ResolvedSchema( +Arrays.asList( +Column.physical("x", DataTypes.INT().notNull()), +Column.physical("y", DataTypes.TIMESTAMP(9).notNull()), +Column.physical("z", DataTypes.BIGINT().notNull())), +new ArrayList<>(), + org.apache.flink.table.catalog.UniqueConstraint.primaryKey( +"pk_name", Collections.singletonList("x"))); + hiveCatalog.createTable( path1, -new CatalogTableImpl(builder.build(), getBatchTableProperties(), null), +new ResolvedCatalogTable( +CatalogTable.of( + Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), +null, +new ArrayList<>(), +getBatchTableProperties()), +resolvedSchema), false); CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(path1); -
[jira] [Assigned] (FLINK-31445) Split resource allocate/release related logic from FineGrainedSlotManager to TaskManagerTracker
[ https://issues.apache.org/jira/browse/FLINK-31445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo reassigned FLINK-31445: -- Assignee: Weihua Hu > Split resource allocate/release related logic from FineGrainedSlotManager to > TaskManagerTracker > --- > > Key: FLINK-31445 > URL: https://issues.apache.org/jira/browse/FLINK-31445 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Weihua Hu >Assignee: Weihua Hu >Priority: Major > Labels: pull-request-available > > Currently the FineGrainedSlotManager is response to slots allocations and > resources request/release. This makes the logical of FineGrainedSlotManager > complicated, So we will move task manager related work from > FineGrainedSlotManager to TaskManagerTracker, which already tracks task > managers but not including request/release. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31444) FineGrainedSlotManager reclaims slots when job is finished
[ https://issues.apache.org/jira/browse/FLINK-31444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo reassigned FLINK-31444: -- Assignee: Weihua Hu > FineGrainedSlotManager reclaims slots when job is finished > -- > > Key: FLINK-31444 > URL: https://issues.apache.org/jira/browse/FLINK-31444 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Weihua Hu >Assignee: Weihua Hu >Priority: Major > Labels: pull-request-available > > implementation of > [FLINK-21751|https://issues.apache.org/jira/browse/FLINK-21751] in > FineGrainedSlotManager -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31441) FineGrainedSlotManager support select slot evenly
[ https://issues.apache.org/jira/browse/FLINK-31441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo reassigned FLINK-31441: -- Assignee: Weihua Hu > FineGrainedSlotManager support select slot evenly > - > > Key: FLINK-31441 > URL: https://issues.apache.org/jira/browse/FLINK-31441 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Weihua Hu >Assignee: Weihua Hu >Priority: Major > Labels: pull-request-available > > with [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122], we > support spread out tasks evenly to available task managers. > FineGrainedSlotManager should support this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31439) FLIP-298: Unifying the Implementation of SlotManager
[ https://issues.apache.org/jira/browse/FLINK-31439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo reassigned FLINK-31439: -- Assignee: Weihua Hu > FLIP-298: Unifying the Implementation of SlotManager > > > Key: FLINK-31439 > URL: https://issues.apache.org/jira/browse/FLINK-31439 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Weihua Hu >Assignee: Weihua Hu >Priority: Major > Fix For: 1.18.0 > > > This is an umbrella ticket for > [FLIP-298|https://cwiki.apache.org/confluence/display/FLINK/FLIP-298%3A+Unifying+the+Implementation+of+SlotManager]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-31430) Support migrating states between different instances of TableWriteImpl and AbstractFileStoreWrite
[ https://issues.apache.org/jira/browse/FLINK-31430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-31430. Resolution: Fixed master: 1af16a5a1cb27b6b72bbcef9e5862fda84d8c996 > Support migrating states between different instances of TableWriteImpl and > AbstractFileStoreWrite > - > > Key: FLINK-31430 > URL: https://issues.apache.org/jira/browse/FLINK-31430 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Affects Versions: table-store-0.4.0 >Reporter: Caizhi Weng >Assignee: Caizhi Weng >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.4.0 > > > Currently {{Table}} and {{TableWrite}} in Flink Table Store have a fixed > schema. However to consume schema changes, Flink Table Store CDC sinks should > have the ability to change its schema during a streaming job. > This require us to pause and store the states of a {{TableWrite}}, then > create a {{TableWrite}} with newer schema and recover the states in the new > {{TableWrite}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] chucheng92 commented on pull request #22192: [BP-1.16][FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not quit
chucheng92 commented on PR #22192: URL: https://github.com/apache/flink/pull/22192#issuecomment-1473068221 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31259) Gateway supports initialization of catalog at startup
[ https://issues.apache.org/jira/browse/FLINK-31259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701476#comment-17701476 ] Darcy Lin commented on FLINK-31259: --- [~jark] [FLIP-295|https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Introduce+Pluggable+Catalog+Management] may indeed be able to solve this problem, but I feel that it is more biased towards catalog storage problems. If sql-gateway can support a " -i, --init " parameter similar to sql-client or support a similar "sql-gateway-defaults.yaml" configuration file similar to [ververica-flink-sql-gateway|https://github.com/ververica/flink-sql-gateway/blob/master/conf] can solve this problem very well. Because it is not just catalogs, it may also involve some udf initialization and so on. > Gateway supports initialization of catalog at startup > - > > Key: FLINK-31259 > URL: https://issues.apache.org/jira/browse/FLINK-31259 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Gateway >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > Support to initializing catalogs in gateway when it starts -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-web] Myasuka commented on a diff in pull request #618: Announcement blogpost for the 1.17 release
Myasuka commented on code in PR #618: URL: https://github.com/apache/flink-web/pull/618#discussion_r1135085057 ## docs/content/posts/2023-03-09-release-1.17.0.md: ## @@ -0,0 +1,487 @@ +--- +authors: +- LeonardXu: + name: "Leonard Xu" + twitter: Leonardxbj +date: "2023-03-09T08:00:00Z" #FIXME: Change to the actual release date, also the date in the filename, and the directory name of linked images +subtitle: "" +title: Announcing the Release of Apache Flink 1.17 +aliases: +- /news/2023/03/09/release-1.17.0.html #FIXME: Change to the actual release date +--- + +The Apache Flink PMC is pleased to announce Apache Flink release 1.17.0. Apache +Flink is the leading stream processing standard, and the concept of unified +stream and batch data processing is being successfully adopted in more and more +companies. Thanks to our excellent community and contributors, Apache Flink +continues to grow as a technology and remains one of the most active projects in +the Apache Software Foundation. Flink 1.17 had 173 contributors enthusiastically +participating and saw the completion of 7 FLIPs and 600+ issues, bringing many +exciting new features and improvements to the community. + + +# Towards Streaming Warehouses + +In order to achieve greater efficiency in the realm of [streaming +warehouse](https://www.alibabacloud.com/blog/more-than-computing-a-new-era-led-by-the-warehouse-architecture-of-apache-flink_598821), +Flink 1.17 contains substantial improvements to both the performance of batch +processing and the semantics of streaming processing. These improvements +represent a significant stride towards the creation of a more efficient and +streamlined data warehouse, capable of processing large quantities of data in +real-time. + +For batch processing, this release includes several new features and +improvements: + +* **Streaming Warehouse API:** + [FLIP-282](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235838061) + introduces the new Delete and Update API in Flink SQL which works in(only) batch + mode. External storage systems like Flink Table Store can implement row-level + updates via this new API. The ALTER TABLE syntax is enhanced by including the + ability to ADD/MODIFY/DROP columns, primary keys, and watermarks, making it + easier for users to maintain their table schema. +* **Batch Execution Improvements:** Execution of batch workloads has been + significantly improved in Flink 1.17 in terms of performance, stability and + usability. Performance wise, a 26% TPC-DS improvement is achieved with + strategy and operator optimizations, such as new join reordering and adaptive + local hash aggregation, Hive aggregate functions improvements, and the hybrid + shuffle mode enhancements. Stability wise, speculative execution now supports + all operators, and the Adaptive Batch Scheduler is more robust against data + skew. Usability wise, the tuning effort required for batch workloads has been + reduced. The Adaptive Batch Scheduler is now the default scheduler in batch mode. + The hybrid shuffle is compatible with speculative execution and the Adaptive + Batch Scheduler, next to various configuration simplifications. +* **SQL Client/Gateway:** Apache Flink 1.17 introduces the "gateway mode" for + SQL Client, allowing users to submit SQL queries to a SQL Gateway for enhanced + functionality. Users can use SQL statements to manage job lifecycles, + including displaying job information and stopping running jobs. This provides + a powerful tool for managing Flink jobs. + +For stream processing, the following features and improvements are realized: + +* **Streaming SQL Semantics:** Non-deterministic operations may bring incorrect + results or exceptions which is a challenging topic in streaming SQL. Incorrect + optimization plans and functional issues have been fixed, and the experimental + feature of [PLAN_ADVICE](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/explain/#explaindetails) + is introduced to inform of potential correctness risks and optimization + suggestions to SQL users. +* **Checkpoint Improvements:** The generic incremental checkpoint improvements + enhance the speed and stability of the checkpoint procedure, and the unaligned + checkpoint has improved stability under backpressure and is production-ready + in Flink 1.17. Users can manually trigger checkpoints with self-defined + checkpoint types while a job is running with the newly introduced REST + interface for triggering checkpoints. +* **Watermark Alignment Enhancement:** Efficient watermark processing directly + affects the execution efficiency of event time applications. In Flink 1.17, + [FLIP-217](https://cwiki.apache.org/confluence/display/FLINK/FLIP-217%3A+Support+watermark+alignment+of+source+splits) + introduces an improvement to watermark alignment by aligning data emission + across splits within a source operator. This
[jira] [Updated] (FLINK-31486) Using KeySelector in IterationBody causes ClassNotFoundException
[ https://issues.apache.org/jira/browse/FLINK-31486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiang Xin updated FLINK-31486: -- Description: When we use CoGroup along with KeySelector in an IterationBody, the following exception occurs. {code:java} Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate state partitioner. at org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662) at org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96) at org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168) at org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146) at org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector.keySelector1 of type org.apache.flink.api.java.functions.KeySelector in instance of org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543) at org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:659) ... 17 more {code} was: When we use CoGroup along with KeySelector in an IterationBody, the following exception occurs. {code:java} Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate state partitioner. at org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662) at org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96) at org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168) at org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146) at org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68) at
[GitHub] [flink] ruanhang1993 commented on pull request #21589: [FLINK-25509][connector-base] Add RecordEvaluator to dynamically stop source based on de-serialized records
ruanhang1993 commented on PR #21589: URL: https://github.com/apache/flink/pull/21589#issuecomment-1473030088 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31259) Gateway supports initialization of catalog at startup
[ https://issues.apache.org/jira/browse/FLINK-31259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701465#comment-17701465 ] Jark Wu commented on FLINK-31259: - [~lintingbin] yes, I think this is a valuable feature, and I think [FLIP-295|https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Introduce+Pluggable+Catalog+Management] can address this. (Note that FLIP-295 is still under drafting and I haven't reviewed it yet). > Gateway supports initialization of catalog at startup > - > > Key: FLINK-31259 > URL: https://issues.apache.org/jira/browse/FLINK-31259 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Gateway >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > Support to initializing catalogs in gateway when it starts -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] JunRuiLee commented on a diff in pull request #21672: [FLINK-30683][runtime] Make adaptive batch scheduler as the default batch scheduler
JunRuiLee commented on code in PR #21672: URL: https://github.com/apache/flink/pull/21672#discussion_r1090132553 ## flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml: ## @@ -71,17 +75,21 @@ LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS
[jira] [Closed] (FLINK-31459) add UPDATE COLUMN POSITION for flink table store
[ https://issues.apache.org/jira/browse/FLINK-31459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-31459. Assignee: Jun Zhang Resolution: Fixed master: 14d87e4a85e716771c59a5e2fbb840fd1852a0f6 > add UPDATE COLUMN POSITION for flink table store > > > Key: FLINK-31459 > URL: https://issues.apache.org/jira/browse/FLINK-31459 > Project: Flink > Issue Type: Improvement > Components: Table Store >Affects Versions: table-store-0.3.1 >Reporter: Jun Zhang >Assignee: Jun Zhang >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] chucheng92 commented on pull request #22192: [BP-1.16][FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not quit
chucheng92 commented on PR #22192: URL: https://github.com/apache/flink/pull/22192#issuecomment-1473019407 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] chucheng92 commented on pull request #22192: [BP-1.16][FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not quit
chucheng92 commented on PR #22192: URL: https://github.com/apache/flink/pull/22192#issuecomment-1473019274 BP-1.16 ci failure because [FLINK-31477](https://issues.apache.org/jira/browse/FLINK-31477) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] chucheng92 commented on pull request #22192: [BP-1.16][FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not quit
chucheng92 commented on PR #22192: URL: https://github.com/apache/flink/pull/22192#issuecomment-1473018637 > @chucheng92 please see my comments in the original PR [apache/flink-connector-kafka#8](https://github.com/apache/flink-connector-kafka/pull/8) before merging this one. yes, i have added it. thanks @RamanVerma -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31319) Kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not quit
[ https://issues.apache.org/jira/browse/FLINK-31319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701462#comment-17701462 ] Ran Tao commented on FLINK-31319: - [~ramanverma] thanks. BP-1.16: [https://github.com/apache/flink/pull/22192] BP-1.17: [https://github.com/apache/flink/pull/22193] original: [https://github.com/apache/flink-connector-kafka/pull/8] > Kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not > quit > - > > Key: FLINK-31319 > URL: https://issues.apache.org/jira/browse/FLINK-31319 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.0, 1.16.1 >Reporter: Ran Tao >Assignee: Ran Tao >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-03-04-01-37-29-360.png, > image-2023-03-04-01-39-20-352.png, image-2023-03-04-01-40-44-124.png, > image-2023-03-04-01-41-55-664.png > > > As kafka option description, partitionDiscoveryIntervalMs <=0 means disabled. > !image-2023-03-04-01-37-29-360.png|width=781,height=147! > just like start kafka enumerator: > !image-2023-03-04-01-39-20-352.png|width=465,height=311! > but inner > handlePartitionSplitChanges use error if condition( < 0): > !image-2023-03-04-01-40-44-124.png|width=576,height=237! > > it will cause noMoreNewPartitionSplits can not be set to true. > !image-2023-03-04-01-41-55-664.png|width=522,height=610! > Finally cause bounded source can not signalNoMoreSplits, so it will not quit. > Besides,Both ends of the if condition should be mutually exclusive. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30719) flink-runtime-web failed due to a corrupted
[ https://issues.apache.org/jira/browse/FLINK-30719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701461#comment-17701461 ] Leonard Xu commented on FLINK-30719: [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47149=logs=ae4f8708-9994-57d3-c2d7-b892156e7812=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5] > flink-runtime-web failed due to a corrupted > > > Key: FLINK-30719 > URL: https://issues.apache.org/jira/browse/FLINK-30719 > Project: Flink > Issue Type: Bug > Components: Runtime / Web Frontend, Test Infrastructure, Tests >Affects Versions: 1.16.0, 1.17.0 >Reporter: Matthias Pohl >Priority: Critical > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44954=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=54421a62-0c80-5aad-3319-094ff69180bb=12550 > The build failed due to a corrupted nodejs dependency: > {code} > [ERROR] The archive file > /__w/1/.m2/repository/com/github/eirslett/node/16.13.2/node-16.13.2-linux-x64.tar.gz > is corrupted and will be deleted. Please try the build again. > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] chucheng92 commented on pull request #22193: [BP-1.17][FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not quit
chucheng92 commented on PR #22193: URL: https://github.com/apache/flink/pull/22193#issuecomment-1473015869 > @chucheng92 please see my comments in the original PR [apache/flink-connector-kafka#8](https://github.com/apache/flink-connector-kafka/pull/8) before merging this one. thanks~ @RamanVerma .fixed yet already. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-31477) NestedLoopJoinTest.testLeftOuterJoinWithFilter failed on azure
[ https://issues.apache.org/jira/browse/FLINK-31477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu resolved FLINK-31477. Resolution: Fixed Fixed in release-1.16 : 1bd25a48fa444390971149515810d057324b642b > NestedLoopJoinTest.testLeftOuterJoinWithFilter failed on azure > --- > > Key: FLINK-31477 > URL: https://issues.apache.org/jira/browse/FLINK-31477 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.1 >Reporter: Leonard Xu >Assignee: Yunhong Zheng >Priority: Major > Labels: pull-request-available > Fix For: 1.16.2 > > > {noformat} > Failures: > Mar 15 15:52:32 [ERROR] NestedLoopJoinTest.testLeftOuterJoinWithFilter1:37 > optimized exec plan expected:<...[InnerJoin], where=[[true], select=[a, e, > f], build=[left]) > Mar 15 15:52:32:- Exchange(distribution=[broadcast]) > Mar 15 15:52:32: +- Calc(select=[a], where=[(a = 10)]) > Mar 15 15:52:32: +- LegacyTableSourceScan(table=[[default_catalog, > default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, > b, c]) > Mar 15 15:52:32+- Calc(select=[e, f], where=[(d = 10])]) > Mar 15 15:52:32 +- LegacyT...> but was:<...[InnerJoin], where=[[(a = > d)], select=[a, d, e, f], build=[left]) > Mar 15 15:52:32:- Exchange(distribution=[broadcast]) > Mar 15 15:52:32: +- Calc(select=[a], where=[SEARCH(a, Sarg[10])]) > Mar 15 15:52:32: +- LegacyTableSourceScan(table=[[default_catalog, > default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, > b, c]) > Mar 15 15:52:32+- Calc(select=[d, e, f], where=[SEARCH(d, Sarg[10]])]) > Mar 15 15:52:32 +- LegacyT...>{noformat} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47202=logs=086353db-23b2-5446-2315-18e660618ef2=6cd785f3-2a2e-58a8-8e69-b4a03be28843 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] leonardBang merged pull request #22188: [FLINK-31477][table-planner] Fix NestedLoopJoinTest.testLeftOuterJoin failed with unexpected plan
leonardBang merged PR #22188: URL: https://github.com/apache/flink/pull/22188 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] liuyongvs commented on pull request #22144: [FLINK-31102][table] Add ARRAY_REMOVE function.
liuyongvs commented on PR #22144: URL: https://github.com/apache/flink/pull/22144#issuecomment-1473012377 @snuyanzin rebase to fix the conflicts and reviews from you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] liuyongvs commented on a diff in pull request #22144: [FLINK-31102][table] Add ARRAY_REMOVE function.
liuyongvs commented on code in PR #22144: URL: https://github.com/apache/flink/pull/22144#discussion_r1139573794 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java: ## @@ -1359,6 +1360,16 @@ public OutType arrayDistinct() { return toApiSpecificExpression(unresolvedCall(ARRAY_DISTINCT, toExpr())); } +/** + * Remove all elements that equal to element from array. Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] liuyongvs commented on a diff in pull request #22144: [FLINK-31102][table] Add ARRAY_REMOVE function.
liuyongvs commented on code in PR #22144: URL: https://github.com/apache/flink/pull/22144#discussion_r1139573434 ## docs/data/sql_functions.yml: ## @@ -617,6 +617,9 @@ collection: - sql: ARRAY_DISTINCT(haystack) table: haystack.arrayDistinct() description: Returns an array with unique elements. If the array itself is null, the function will return null. Keeps ordering of elements. + - sql: ARRAY_REMOVE(haystack, needle) +table: haystack.arrayRemove(needle) +description: Remove all elements that equal to element from array. If the array itself is null, the function will return null. Keeps ordering of elements. Review Comment: have fixed ## flink-python/pyflink/table/expression.py: ## @@ -1487,6 +1487,13 @@ def array_distinct(self) -> 'Expression': """ return _binary_op("arrayDistinct")(self) +def array_remove(self, needle) -> 'Expression': +""" +Remove all elements that equal to element from array. Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-kafka] chucheng92 commented on a diff in pull request #8: [FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not qui
chucheng92 commented on code in PR #8: URL: https://github.com/apache/flink-connector-kafka/pull/8#discussion_r1139572887 ## flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java: ## @@ -166,6 +174,68 @@ public void testReaderRegistrationTriggersAssignments() throws Throwable { } } +@Test +public void testRunWithDiscoverPartitionsOnceToCheckNoMoreSplit() throws Throwable { +try (MockSplitEnumeratorContext context = +new MockSplitEnumeratorContext<>(NUM_SUBTASKS); +KafkaSourceEnumerator enumerator = +createEnumerator(context, DISABLE_PERIODIC_PARTITION_DISCOVERY)) { + +// Start the enumerator and it should schedule a one time task to discover and assign +// partitions. +enumerator.start(); + +// Run the partition discover callable and check the partition assignment. +runOneTimePartitionDiscovery(context); + +// enumerator noMoreNewPartitionSplits first will be false, when execute +// handlePartitionSplitChanges will be set true +assertThat((Boolean) Whitebox.getInternalState(enumerator, "noMoreNewPartitionSplits")) +.isTrue(); +} +} + +@Test +public void testRunWithPeriodicPartitionDiscoveryOnceToCheckNoMoreSplit() throws Throwable { +try (MockSplitEnumeratorContext context = +new MockSplitEnumeratorContext<>(NUM_SUBTASKS); +KafkaSourceEnumerator enumerator = +createEnumerator(context, ENABLE_PERIODIC_PARTITION_DISCOVERY)) { + +// Start the enumerator and it should schedule a one time task to discover and assign +// partitions. +enumerator.start(); +assertThat(context.getOneTimeCallables()).isEmpty(); +assertThat(context.getPeriodicCallables()) +.as("A periodic partition discovery callable should have been scheduled") +.hasSize(1); + +// enumerator noMoreNewPartitionSplits first will be false, even when execute +// handlePartitionSplitChanges it still be false +assertThat((Boolean) Whitebox.getInternalState(enumerator, "noMoreNewPartitionSplits")) +.isFalse(); +} +} + +@Test +public void testRunWithDiscoverPartitionsOnceWithZeroMsToCheckNoMoreSplit() throws Throwable { +try (MockSplitEnumeratorContext context = +new MockSplitEnumeratorContext<>(NUM_SUBTASKS); +// set partitionDiscoveryIntervalMs = 0 +KafkaSourceEnumerator enumerator = createEnumerator(context, 0L)) { + +// Start the enumerator, and it should schedule a one time task to discover and assign +// partitions. +enumerator.start(); +runOneTimePartitionDiscovery(context); Review Comment: Yes. good advice. I have fixed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-kafka] chucheng92 commented on a diff in pull request #8: [FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not qui
chucheng92 commented on code in PR #8: URL: https://github.com/apache/flink-connector-kafka/pull/8#discussion_r1139572140 ## flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java: ## @@ -166,6 +174,68 @@ public void testReaderRegistrationTriggersAssignments() throws Throwable { } } +@Test +public void testRunWithDiscoverPartitionsOnceToCheckNoMoreSplit() throws Throwable { +try (MockSplitEnumeratorContext context = +new MockSplitEnumeratorContext<>(NUM_SUBTASKS); +KafkaSourceEnumerator enumerator = +createEnumerator(context, DISABLE_PERIODIC_PARTITION_DISCOVERY)) { + +// Start the enumerator and it should schedule a one time task to discover and assign +// partitions. +enumerator.start(); + +// Run the partition discover callable and check the partition assignment. +runOneTimePartitionDiscovery(context); + +// enumerator noMoreNewPartitionSplits first will be false, when execute +// handlePartitionSplitChanges will be set true +assertThat((Boolean) Whitebox.getInternalState(enumerator, "noMoreNewPartitionSplits")) +.isTrue(); +} +} + +@Test +public void testRunWithPeriodicPartitionDiscoveryOnceToCheckNoMoreSplit() throws Throwable { +try (MockSplitEnumeratorContext context = +new MockSplitEnumeratorContext<>(NUM_SUBTASKS); +KafkaSourceEnumerator enumerator = +createEnumerator(context, ENABLE_PERIODIC_PARTITION_DISCOVERY)) { + +// Start the enumerator and it should schedule a one time task to discover and assign +// partitions. +enumerator.start(); +assertThat(context.getOneTimeCallables()).isEmpty(); +assertThat(context.getPeriodicCallables()) +.as("A periodic partition discovery callable should have been scheduled") +.hasSize(1); + +// enumerator noMoreNewPartitionSplits first will be false, even when execute Review Comment: @RamanVerma thanks. i added it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-31465) [Flink] Fix shortcode errors in docs
[ https://issues.apache.org/jira/browse/FLINK-31465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-31465. Fix Version/s: table-store-0.4.0 Assignee: Ming Li Resolution: Fixed master: dd05a70d0b66fd3bbf1afe0cd1e8362405f024c7 > [Flink] Fix shortcode errors in docs > > > Key: FLINK-31465 > URL: https://issues.apache.org/jira/browse/FLINK-31465 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Ming Li >Assignee: Ming Li >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.4.0 > > > When running docs with hugo, I get the following exception: > {code:java} > hugo v0.111.3+extended darwin/amd64 BuildDate=unknown > Error: Error building site: > "/xxx/flink-table-store/docs/content/docs/how-to/writing-tables.md:303:1": > failed to extract shortcode: shortcode "tabs" must be closed or > self-closed{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-31321) Yarn-session mode, securityConfiguration supports dynamic configuration
[ https://issues.apache.org/jira/browse/FLINK-31321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song closed FLINK-31321. Fix Version/s: 1.17.0 Resolution: Fixed - master (1.18): 6f9bca971f69525f9be7779121706bfb5756089d - release-1.17: 17ab9fda0bcd2687275ea8af2215b939d644cb07 > Yarn-session mode, securityConfiguration supports dynamic configuration > --- > > Key: FLINK-31321 > URL: https://issues.apache.org/jira/browse/FLINK-31321 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.17.0 >Reporter: felixzh >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > when different tenants submit jobs using the same {_}flink-conf.yaml{_}, the > same user is displayed on the Yarn page. > _SecurityConfiguration_ does not support dynamic configuration. Therefore, > the user displayed on the Yarn page is the > _security.kerberos.login.principal_ in the {_}flink-conf.yaml{_}. > FLINK-29435 only fixed CliFrontend class(Corresponds to flink script). > FlinkYarnSessionCli class(Corresponds to yarn-session.sh script) still exists > this question. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31321) Yarn-session mode, securityConfiguration supports dynamic configuration
[ https://issues.apache.org/jira/browse/FLINK-31321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song reassigned FLINK-31321: Assignee: felixzh > Yarn-session mode, securityConfiguration supports dynamic configuration > --- > > Key: FLINK-31321 > URL: https://issues.apache.org/jira/browse/FLINK-31321 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.17.0 >Reporter: felixzh >Assignee: felixzh >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > when different tenants submit jobs using the same {_}flink-conf.yaml{_}, the > same user is displayed on the Yarn page. > _SecurityConfiguration_ does not support dynamic configuration. Therefore, > the user displayed on the Yarn page is the > _security.kerberos.login.principal_ in the {_}flink-conf.yaml{_}. > FLINK-29435 only fixed CliFrontend class(Corresponds to flink script). > FlinkYarnSessionCli class(Corresponds to yarn-session.sh script) still exists > this question. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-31253) Port itcases to Flink 1.15 and 1.14
[ https://issues.apache.org/jira/browse/FLINK-31253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-31253. Resolution: Fixed master: 7663615366a16d3481c67aa093a5f95973ae552e > Port itcases to Flink 1.15 and 1.14 > --- > > Key: FLINK-31253 > URL: https://issues.apache.org/jira/browse/FLINK-31253 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Assignee: Chao Tian >Priority: Major > Labels: pull-request-available, starter > Fix For: table-store-0.4.0 > > > At present, only common has tests. We need to copy a part of itcase to 1.14 > and 1.15 to ensure normal work. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] xintongsong closed pull request #22119: [FLINK-31321][Deployment/YARN] Yarn-session mode, securityConfiguration supports dynamic configuration
xintongsong closed pull request #22119: [FLINK-31321][Deployment/YARN] Yarn-session mode, securityConfiguration supports dynamic configuration URL: https://github.com/apache/flink/pull/22119 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22197: [FLINK-31487][table-planner] Add targetColumns to DynamicTableSink#Context
flinkbot commented on PR #22197: URL: https://github.com/apache/flink/pull/22197#issuecomment-1472992082 ## CI report: * cf28dc6603d3041ec88c7e8f137324201916032f UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31487) Add targetColumns to DynamicTableSink#Context
[ https://issues.apache.org/jira/browse/FLINK-31487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31487: --- Labels: pull-request-available (was: ) > Add targetColumns to DynamicTableSink#Context > - > > Key: FLINK-31487 > URL: https://issues.apache.org/jira/browse/FLINK-31487 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: lincoln lee >Assignee: lincoln lee >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > FLIP-300: Add targetColumns to DynamicTableSink#Context to solve the null > overwrite problem of partial-insert > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240885081 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] lincoln-lil closed pull request #22041: [FLINK-31487][table-planner] Add targetColumns to DynamicTableSink#Context
lincoln-lil closed pull request #22041: [FLINK-31487][table-planner] Add targetColumns to DynamicTableSink#Context URL: https://github.com/apache/flink/pull/22041 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lincoln-lil opened a new pull request, #22197: [FLINK-31487][table-planner] Add targetColumns to DynamicTableSink#Context
lincoln-lil opened a new pull request, #22197: URL: https://github.com/apache/flink/pull/22197 ## What is the purpose of the change The internal SinkRuntimeProviderContext will support new constructor with targetColumns param, this can be used by connectors to recognize the user-specified column list. Note: currently nested columns in column list of an insert/update statement is unsupported (as described in [FLINK-31301](https://issues.apache.org/jira/browse/FLINK-31301) & [FLINK-31344](https://issues.apache.org/jira/browse/FLINK-31344)), so we can make this pr support simple columns first and then support nested columns after the two issues been fixed. ## Brief change log * add new getTargetColumns to DynamicTableSink#Context * add targetColumns info to related relnodes and execnodes * add related tests ## Verifying this change * add partial insert releated case include ut & it ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with @Public(Evolving): (yes) - The serializers: (no ) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (java-docs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31486) Using KeySelector in IterationBody causes ClassNotFoundException
[ https://issues.apache.org/jira/browse/FLINK-31486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701452#comment-17701452 ] Zhipeng Zhang commented on FLINK-31486: --- Is this similar to this one [1]? [1] https://issues.apache.org/jira/browse/FLINK-31255 > Using KeySelector in IterationBody causes ClassNotFoundException > > > Key: FLINK-31486 > URL: https://issues.apache.org/jira/browse/FLINK-31486 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Reporter: Jiang Xin >Priority: Major > Labels: pull-request-available > Fix For: ml-2.2.0 > > > When we use CoGroup along with KeySelector in an IterationBody, the following > exception occurs. > {code:java} > Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: > Could not instantiate state partitioner. at > org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662) > at > org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96) > at > org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168) > at > org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146) > at > org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at > java.lang.Thread.run(Thread.java:748) Caused by: > java.lang.ClassCastException: cannot assign instance of > java.lang.invoke.SerializedLambda to field > org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector.keySelector1 > of type org.apache.flink.api.java.functions.KeySelector in instance of > org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector > at > java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302) > at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543) > at > org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:659) > ... 17 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-22484) Built-in functions for collections
[ https://issues.apache.org/jira/browse/FLINK-22484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701423#comment-17701423 ] Sergey Nuyanzin commented on FLINK-22484: - {{MAP_KEYS}}, {{MAP_VALUES}}, {{MAP_FROM_ARRAYS}} merged at [e42d7089b24f4e93e980ce724faf00341660bcd6|https://github.com/apache/flink/commit/e42d7089b24f4e93e980ce724faf00341660bcd6] > Built-in functions for collections > -- > > Key: FLINK-22484 > URL: https://issues.apache.org/jira/browse/FLINK-22484 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > There is a number of built-in functions to work with collections are > supported by other vendors. After looking at Postgresql, BigQuery, Spark > there was selected a list of more or less generic functions for collections > (for more details see [1]). > Feedback for the doc is welcome > [1] > [https://docs.google.com/document/d/1nS0Faur9CCop4sJoQ2kMQ2XU1hjg1FaiTSQp2RsZKEE/edit?usp=sharing] > MAP_KEYS > MAP_VALUES > MAP_FROM_ARRAYS -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-22484) Built-in functions for collections
[ https://issues.apache.org/jira/browse/FLINK-22484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin resolved FLINK-22484. - Fix Version/s: 1.18.0 Resolution: Fixed > Built-in functions for collections > -- > > Key: FLINK-22484 > URL: https://issues.apache.org/jira/browse/FLINK-22484 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > There is a number of built-in functions to work with collections are > supported by other vendors. After looking at Postgresql, BigQuery, Spark > there was selected a list of more or less generic functions for collections > (for more details see [1]). > Feedback for the doc is welcome > [1] > [https://docs.google.com/document/d/1nS0Faur9CCop4sJoQ2kMQ2XU1hjg1FaiTSQp2RsZKEE/edit?usp=sharing] > MAP_KEYS > MAP_VALUES > MAP_FROM_ARRAYS -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-22484) Built-in functions for collections
[ https://issues.apache.org/jira/browse/FLINK-22484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin reassigned FLINK-22484: --- Assignee: Sergey Nuyanzin > Built-in functions for collections > -- > > Key: FLINK-22484 > URL: https://issues.apache.org/jira/browse/FLINK-22484 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > There is a number of built-in functions to work with collections are > supported by other vendors. After looking at Postgresql, BigQuery, Spark > there was selected a list of more or less generic functions for collections > (for more details see [1]). > Feedback for the doc is welcome > [1] > [https://docs.google.com/document/d/1nS0Faur9CCop4sJoQ2kMQ2XU1hjg1FaiTSQp2RsZKEE/edit?usp=sharing] > MAP_KEYS > MAP_VALUES > MAP_FROM_ARRAYS -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30896) Reduce usage of CatalogViewImpl in planner
[ https://issues.apache.org/jira/browse/FLINK-30896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701422#comment-17701422 ] Sergey Nuyanzin commented on FLINK-30896: - Merged as [81f77a9d0e7f49a1786eb21e0adb1290306b89c5|https://github.com/apache/flink/commit/81f77a9d0e7f49a1786eb21e0adb1290306b89c5] > Reduce usage of CatalogViewImpl in planner > -- > > Key: FLINK-30896 > URL: https://issues.apache.org/jira/browse/FLINK-30896 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > Most of the work was done under > https://issues.apache.org/jira/browse/FLINK-21801 > > However there are still some usages of {{CatalogViewImpl}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-30896) Reduce usage of CatalogViewImpl in planner
[ https://issues.apache.org/jira/browse/FLINK-30896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin resolved FLINK-30896. - Resolution: Fixed > Reduce usage of CatalogViewImpl in planner > -- > > Key: FLINK-30896 > URL: https://issues.apache.org/jira/browse/FLINK-30896 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > Most of the work was done under > https://issues.apache.org/jira/browse/FLINK-21801 > > However there are still some usages of {{CatalogViewImpl}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30896) Reduce usage of CatalogViewImpl in planner
[ https://issues.apache.org/jira/browse/FLINK-30896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin updated FLINK-30896: Fix Version/s: 1.18.0 > Reduce usage of CatalogViewImpl in planner > -- > > Key: FLINK-30896 > URL: https://issues.apache.org/jira/browse/FLINK-30896 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > Most of the work was done under > https://issues.apache.org/jira/browse/FLINK-21801 > > However there are still some usages of {{CatalogViewImpl}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-30896) Reduce usage of CatalogViewImpl in planner
[ https://issues.apache.org/jira/browse/FLINK-30896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin reassigned FLINK-30896: --- Assignee: Sergey Nuyanzin > Reduce usage of CatalogViewImpl in planner > -- > > Key: FLINK-30896 > URL: https://issues.apache.org/jira/browse/FLINK-30896 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > Most of the work was done under > https://issues.apache.org/jira/browse/FLINK-21801 > > However there are still some usages of {{CatalogViewImpl}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] snuyanzin merged pull request #21847: [FLINK-30896][table] Reduce usage of CatalogViewImpl in table-planner
snuyanzin merged PR #21847: URL: https://github.com/apache/flink/pull/21847 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin merged pull request #15797: [FLINK-22484][table] Add built-in MAP_KEYS, MAP_VALUES, MAP_FROM_ARRA…
snuyanzin merged PR #15797: URL: https://github.com/apache/flink/pull/15797 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #22144: [FLINK-31102][table] Add ARRAY_REMOVE function.
snuyanzin commented on code in PR #22144: URL: https://github.com/apache/flink/pull/22144#discussion_r1139402152 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java: ## @@ -1359,6 +1360,16 @@ public OutType arrayDistinct() { return toApiSpecificExpression(unresolvedCall(ARRAY_DISTINCT, toExpr())); } +/** + * Remove all elements that equal to element from array. Review Comment: ```suggestion * Removes all elements that equal to element from array. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #22144: [FLINK-31102][table] Add ARRAY_REMOVE function.
snuyanzin commented on code in PR #22144: URL: https://github.com/apache/flink/pull/22144#discussion_r1139401943 ## flink-python/pyflink/table/expression.py: ## @@ -1487,6 +1487,13 @@ def array_distinct(self) -> 'Expression': """ return _binary_op("arrayDistinct")(self) +def array_remove(self, needle) -> 'Expression': +""" +Remove all elements that equal to element from array. Review Comment: ```suggestion Removes all elements that equal to element from array. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #22144: [FLINK-31102][table] Add ARRAY_REMOVE function.
snuyanzin commented on code in PR #22144: URL: https://github.com/apache/flink/pull/22144#discussion_r1139401795 ## docs/data/sql_functions.yml: ## @@ -617,6 +617,9 @@ collection: - sql: ARRAY_DISTINCT(haystack) table: haystack.arrayDistinct() description: Returns an array with unique elements. If the array itself is null, the function will return null. Keeps ordering of elements. + - sql: ARRAY_REMOVE(haystack, needle) +table: haystack.arrayRemove(needle) +description: Remove all elements that equal to element from array. If the array itself is null, the function will return null. Keeps ordering of elements. Review Comment: Sorry, looks like missed it ```suggestion description: Removes all elements that equal to element from array. If the array itself is null, the function will return null. Keeps ordering of elements. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-31493) helm upgrade does not work, because repo path does not follow helm standards
Emmanuel Leroy created FLINK-31493: -- Summary: helm upgrade does not work, because repo path does not follow helm standards Key: FLINK-31493 URL: https://issues.apache.org/jira/browse/FLINK-31493 Project: Flink Issue Type: Bug Components: Kubernetes Operator Reporter: Emmanuel Leroy the helm repo for flink-operator is a folder that includes the version, which is not following the helm chart repo standards. In a standard helm repo, the repo URL is the name of the product (without version) and then the folder includes the different versions of the chart. This is an issue because the repo itself needs to be installed every time the version is upgraded, as opposed to adding the repo once and then upgrading the version. When attempting to add the latest repo, helm will complain that the repo already exists. It is necessary to first remove the repo, and then add the updated one. When trying to upgrade the chart, it doesn't work, because helm expects the chart of the previous version to be in the same repo, but it cannot be found in the newly added repo. So the chart needs to be uninstalled, then the new one installed. The solution is to use a common path for all versions of the chart, and maintain a manifest with the various versions (instead of different folders with different manifests) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31363) KafkaSink failed to commit transactions under EXACTLY_ONCE semantics
[ https://issues.apache.org/jira/browse/FLINK-31363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai reassigned FLINK-31363: --- Assignee: Tzu-Li (Gordon) Tai > KafkaSink failed to commit transactions under EXACTLY_ONCE semantics > > > Key: FLINK-31363 > URL: https://issues.apache.org/jira/browse/FLINK-31363 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.0, 1.16.1, 1.18.0 >Reporter: lightzhao >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Labels: pull-request-available > Attachments: image-2023-03-08-10-54-51-410.png > > > When KafkaSink starts Exactly once and no data is written to the topic during > a checkpoint, the transaction commit exception is triggered, with the > following exception. > [Transiting to fatal error state due to > org.apache.kafka.common.errors.InvalidTxnStateException: The producer > attempted a transactional operation in an invalid state.] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31363) KafkaSink failed to commit transactions under EXACTLY_ONCE semantics
[ https://issues.apache.org/jira/browse/FLINK-31363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701390#comment-17701390 ] Tzu-Li (Gordon) Tai commented on FLINK-31363: - [~lightzhao] I'm preparing a fix for this. I think the correct fix is to *not* add a txn's metadata into the checkpoint if there were no data written to the transaction. i.e. the checkpoints will only reflect metadata of txns that actually have data that need to be committed. This way, creation of recovery producers can always safely set the {{transactionStarted}} flag to {{true}} > KafkaSink failed to commit transactions under EXACTLY_ONCE semantics > > > Key: FLINK-31363 > URL: https://issues.apache.org/jira/browse/FLINK-31363 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.0, 1.16.1, 1.18.0 >Reporter: lightzhao >Priority: Major > Labels: pull-request-available > Attachments: image-2023-03-08-10-54-51-410.png > > > When KafkaSink starts Exactly once and no data is written to the topic during > a checkpoint, the transaction commit exception is triggered, with the > following exception. > [Transiting to fatal error state due to > org.apache.kafka.common.errors.InvalidTxnStateException: The producer > attempted a transactional operation in an invalid state.] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31451) Flink Table Store Ecosystem: Introduce Presto Reader for table store
[ https://issues.apache.org/jira/browse/FLINK-31451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31451: --- Labels: pull-request-available (was: ) > Flink Table Store Ecosystem: Introduce Presto Reader for table store > > > Key: FLINK-31451 > URL: https://issues.apache.org/jira/browse/FLINK-31451 > Project: Flink > Issue Type: New Feature > Components: Table Store >Affects Versions: table-store-0.4.0 >Reporter: Zaihu Shi >Assignee: Zaihu Shi >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.4.0 > > > Introduce Presto Reader for table store -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31319) Kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not quit
[ https://issues.apache.org/jira/browse/FLINK-31319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701344#comment-17701344 ] Raman Verma commented on FLINK-31319: - [~lemonjing] Can you please link all the back port PRs (1.16 and 1.17) here as well for easier tracking > Kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not > quit > - > > Key: FLINK-31319 > URL: https://issues.apache.org/jira/browse/FLINK-31319 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.0, 1.16.1 >Reporter: Ran Tao >Assignee: Ran Tao >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-03-04-01-37-29-360.png, > image-2023-03-04-01-39-20-352.png, image-2023-03-04-01-40-44-124.png, > image-2023-03-04-01-41-55-664.png > > > As kafka option description, partitionDiscoveryIntervalMs <=0 means disabled. > !image-2023-03-04-01-37-29-360.png|width=781,height=147! > just like start kafka enumerator: > !image-2023-03-04-01-39-20-352.png|width=465,height=311! > but inner > handlePartitionSplitChanges use error if condition( < 0): > !image-2023-03-04-01-40-44-124.png|width=576,height=237! > > it will cause noMoreNewPartitionSplits can not be set to true. > !image-2023-03-04-01-41-55-664.png|width=522,height=610! > Finally cause bounded source can not signalNoMoreSplits, so it will not quit. > Besides,Both ends of the if condition should be mutually exclusive. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] RamanVerma commented on pull request #22193: [BP-1.17][FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not quit
RamanVerma commented on PR #22193: URL: https://github.com/apache/flink/pull/22193#issuecomment-1472611409 @chucheng92 please see my comments in the original PR https://github.com/apache/flink-connector-kafka/pull/8 before merging this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] RamanVerma commented on pull request #22192: [BP-1.16][FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not quit
RamanVerma commented on PR #22192: URL: https://github.com/apache/flink/pull/22192#issuecomment-1472610502 @chucheng92 please see my comments in the original PR https://github.com/apache/flink-connector-kafka/pull/8 before merging this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-kafka] RamanVerma commented on a diff in pull request #8: [FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not qui
RamanVerma commented on code in PR #8: URL: https://github.com/apache/flink-connector-kafka/pull/8#discussion_r1139238012 ## flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java: ## @@ -166,6 +174,68 @@ public void testReaderRegistrationTriggersAssignments() throws Throwable { } } +@Test +public void testRunWithDiscoverPartitionsOnceToCheckNoMoreSplit() throws Throwable { +try (MockSplitEnumeratorContext context = +new MockSplitEnumeratorContext<>(NUM_SUBTASKS); +KafkaSourceEnumerator enumerator = +createEnumerator(context, DISABLE_PERIODIC_PARTITION_DISCOVERY)) { + +// Start the enumerator and it should schedule a one time task to discover and assign +// partitions. +enumerator.start(); + +// Run the partition discover callable and check the partition assignment. +runOneTimePartitionDiscovery(context); + +// enumerator noMoreNewPartitionSplits first will be false, when execute +// handlePartitionSplitChanges will be set true +assertThat((Boolean) Whitebox.getInternalState(enumerator, "noMoreNewPartitionSplits")) +.isTrue(); +} +} + +@Test +public void testRunWithPeriodicPartitionDiscoveryOnceToCheckNoMoreSplit() throws Throwable { +try (MockSplitEnumeratorContext context = +new MockSplitEnumeratorContext<>(NUM_SUBTASKS); +KafkaSourceEnumerator enumerator = +createEnumerator(context, ENABLE_PERIODIC_PARTITION_DISCOVERY)) { + +// Start the enumerator and it should schedule a one time task to discover and assign +// partitions. +enumerator.start(); +assertThat(context.getOneTimeCallables()).isEmpty(); +assertThat(context.getPeriodicCallables()) +.as("A periodic partition discovery callable should have been scheduled") +.hasSize(1); + +// enumerator noMoreNewPartitionSplits first will be false, even when execute Review Comment: I think you haven't done partition discovery in this test. You need to call `runPeriodicPartitionDiscovery` at least once before check on line 215 ## flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java: ## @@ -166,6 +174,68 @@ public void testReaderRegistrationTriggersAssignments() throws Throwable { } } +@Test +public void testRunWithDiscoverPartitionsOnceToCheckNoMoreSplit() throws Throwable { +try (MockSplitEnumeratorContext context = +new MockSplitEnumeratorContext<>(NUM_SUBTASKS); +KafkaSourceEnumerator enumerator = +createEnumerator(context, DISABLE_PERIODIC_PARTITION_DISCOVERY)) { + +// Start the enumerator and it should schedule a one time task to discover and assign +// partitions. +enumerator.start(); + +// Run the partition discover callable and check the partition assignment. +runOneTimePartitionDiscovery(context); + +// enumerator noMoreNewPartitionSplits first will be false, when execute +// handlePartitionSplitChanges will be set true +assertThat((Boolean) Whitebox.getInternalState(enumerator, "noMoreNewPartitionSplits")) +.isTrue(); +} +} + +@Test +public void testRunWithPeriodicPartitionDiscoveryOnceToCheckNoMoreSplit() throws Throwable { +try (MockSplitEnumeratorContext context = +new MockSplitEnumeratorContext<>(NUM_SUBTASKS); +KafkaSourceEnumerator enumerator = +createEnumerator(context, ENABLE_PERIODIC_PARTITION_DISCOVERY)) { + +// Start the enumerator and it should schedule a one time task to discover and assign +// partitions. +enumerator.start(); +assertThat(context.getOneTimeCallables()).isEmpty(); +assertThat(context.getPeriodicCallables()) +.as("A periodic partition discovery callable should have been scheduled") +.hasSize(1); + +// enumerator noMoreNewPartitionSplits first will be false, even when execute +// handlePartitionSplitChanges it still be false +assertThat((Boolean) Whitebox.getInternalState(enumerator, "noMoreNewPartitionSplits")) +.isFalse(); +} +} + +@Test +public void testRunWithDiscoverPartitionsOnceWithZeroMsToCheckNoMoreSplit() throws Throwable { +try (MockSplitEnumeratorContext context = +new MockSplitEnumeratorContext<>(NUM_SUBTASKS); +
[jira] [Updated] (FLINK-31491) Flink table planner NestedLoopJoinTest.testLeftOuterJoinWithFilter failed in branch release1.16
[ https://issues.apache.org/jira/browse/FLINK-31491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-31491: --- Affects Version/s: 1.16.1 > Flink table planner NestedLoopJoinTest.testLeftOuterJoinWithFilter failed in > branch release1.16 > --- > > Key: FLINK-31491 > URL: https://issues.apache.org/jira/browse/FLINK-31491 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.1 >Reporter: Ran Tao >Priority: Major > > branch release1.16 NestedLoopJoinTest.testLeftOuterJoinWithFilter failed. > Mar 16 12:30:00 [ERROR] Failures: > Mar 16 12:30:00 [ERROR] NestedLoopJoinTest.testLeftOuterJoinWithFilter1:37 > optimized exec plan expected:<...[InnerJoin], where=[[true], select=[a, e, > f], build=[left]) > Mar 16 12:30:00 :- Exchange(distribution=[broadcast]) > Mar 16 12:30:00 : +- Calc(select=[a], where=[(a = 10)]) > Mar 16 12:30:00 : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, > b, c]) > Mar 16 12:30:00 +- Calc(select=[e, f], where=[(d = 10])]) > Mar 16 12:30:00 +- LegacyT...> but was:<...[InnerJoin], where=[[(a = > d)], select=[a, d, e, f], build=[left]) > Mar 16 12:30:00 :- Exchange(distribution=[broadcast]) > Mar 16 12:30:00 : +- Calc(select=[a], where=[SEARCH(a, Sarg[10])]) > Mar 16 12:30:00 : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, > b, c]) > Mar 16 12:30:00 +- Calc(select=[d, e, f], where=[SEARCH(d, Sarg[10]])]) > Mar 16 12:30:00 +- LegacyT...> > at > org.apache.flink.table.planner.utils.DiffRepository.assertEquals(DiffRepository.java:438) > at > org.apache.flink.table.planner.utils.TableTestUtilBase.assertEqualsOrExpand(TableTestBase.scala:1075) > at > org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:1008) > at > org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:849) > at > org.apache.flink.table.planner.utils.TableTestUtilBase.verifyExecPlan(TableTestBase.scala:636) > at > org.apache.flink.table.planner.plan.batch.sql.join.NestedLoopJoinTest.testLeftOuterJoinWithFilter1(NestedLoopJoinTest.scala:37) > > azure ci: > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47244=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4] > And it can be reproduced in local. I think the caused commit may be: > f0361c720cb18c4ae7dc669c6a5da5b09bc8f563 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-31491) Flink table planner NestedLoopJoinTest.testLeftOuterJoinWithFilter failed in branch release1.16
[ https://issues.apache.org/jira/browse/FLINK-31491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo closed FLINK-31491. -- Resolution: Duplicate > Flink table planner NestedLoopJoinTest.testLeftOuterJoinWithFilter failed in > branch release1.16 > --- > > Key: FLINK-31491 > URL: https://issues.apache.org/jira/browse/FLINK-31491 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Ran Tao >Priority: Major > > branch release1.16 NestedLoopJoinTest.testLeftOuterJoinWithFilter failed. > Mar 16 12:30:00 [ERROR] Failures: > Mar 16 12:30:00 [ERROR] NestedLoopJoinTest.testLeftOuterJoinWithFilter1:37 > optimized exec plan expected:<...[InnerJoin], where=[[true], select=[a, e, > f], build=[left]) > Mar 16 12:30:00 :- Exchange(distribution=[broadcast]) > Mar 16 12:30:00 : +- Calc(select=[a], where=[(a = 10)]) > Mar 16 12:30:00 : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, > b, c]) > Mar 16 12:30:00 +- Calc(select=[e, f], where=[(d = 10])]) > Mar 16 12:30:00 +- LegacyT...> but was:<...[InnerJoin], where=[[(a = > d)], select=[a, d, e, f], build=[left]) > Mar 16 12:30:00 :- Exchange(distribution=[broadcast]) > Mar 16 12:30:00 : +- Calc(select=[a], where=[SEARCH(a, Sarg[10])]) > Mar 16 12:30:00 : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, > b, c]) > Mar 16 12:30:00 +- Calc(select=[d, e, f], where=[SEARCH(d, Sarg[10]])]) > Mar 16 12:30:00 +- LegacyT...> > at > org.apache.flink.table.planner.utils.DiffRepository.assertEquals(DiffRepository.java:438) > at > org.apache.flink.table.planner.utils.TableTestUtilBase.assertEqualsOrExpand(TableTestBase.scala:1075) > at > org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:1008) > at > org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:849) > at > org.apache.flink.table.planner.utils.TableTestUtilBase.verifyExecPlan(TableTestBase.scala:636) > at > org.apache.flink.table.planner.plan.batch.sql.join.NestedLoopJoinTest.testLeftOuterJoinWithFilter1(NestedLoopJoinTest.scala:37) > > azure ci: > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47244=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4] > And it can be reproduced in local. I think the caused commit may be: > f0361c720cb18c4ae7dc669c6a5da5b09bc8f563 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31492) AWS Firehose Connector misclassifies IAM permission exceptions as retryable
[ https://issues.apache.org/jira/browse/FLINK-31492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701297#comment-17701297 ] Danny Cranmer commented on FLINK-31492: --- Thanks for reporting this [~samuelsiebenmann] . I have assigned the Jira to you. > AWS Firehose Connector misclassifies IAM permission exceptions as retryable > --- > > Key: FLINK-31492 > URL: https://issues.apache.org/jira/browse/FLINK-31492 > Project: Flink > Issue Type: Bug > Components: Connectors / AWS, Connectors / Firehose >Affects Versions: aws-connector-4.1.0 >Reporter: Samuel Siebenmann >Priority: Major > > The AWS Firehose connector uses an exception classification mechanism to > decide if errors writing requests to AWS Firehose are fatal (i.e. > non-retryable) or not (i.e. retryable). > {code:java} > private boolean isRetryable(Throwable err) { > if (!FIREHOSE_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, > getFatalExceptionCons())) { > return false; > } > if (failOnError) { > getFatalExceptionCons() > .accept(new > KinesisFirehoseException.KinesisFirehoseFailFastException(err)); > return false; > } > return true; > } {code} > ([github|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java#L252]) > This exception classification mechanism compares an exception's actual type > with known, fatal exception types (by using Flink's > [FatalExceptionClassifier.withExceptionClassifier|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/throwable/FatalExceptionClassifier.java#L60]). > An exception is considered fatal if it is assignable to a given known fatal > exception > ([code|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java#L479]). > The AWS Firehose SDK throws fatal IAM permission exceptions as > [FirehoseException|https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/firehose/model/FirehoseException.html]s, > e.g. > {code:java} > software.amazon.awssdk.services.firehose.model.FirehoseException: User: > arn:aws:sts:::assumed-role/example-role/kiam-kiam is not > authorized to perform: firehose:PutRecordBatch on resource: > arn:aws:firehose:us-east-1::deliverystream/example-stream because > no identity-based policy allows the firehose:PutRecordBatch action{code} > At the same time, certain subtypes of FirehoseException are retryable and > non-fatal > (e.g.[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/firehose/model/LimitExceededException.html]). > The AWS Firehose connector currently wrongly classifies the fatal IAM > permission exception as non-fatal. However, the current exception > classification mechanism does not easily handle a case where a super-type > should be considered fatal, but its child type shouldn't. > To address this issue, AWS services and the AWS SDK use error codes (see e.g. > [Firehose's error > codes|https://docs.aws.amazon.com/firehose/latest/APIReference/CommonErrors.html] > or [S3's error > codes|https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList], > see API docs > [here|https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/awscore/exception/AwsErrorDetails.html#errorCode()] > and > [here|https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/awscore/exception/AwsServiceException.html#awsErrorDetails()]) > to uniquely identify error conditions and to be used to handle errors by > type. > The AWS Firehose connector (and other AWS connectors) currently log to debug > when retrying fully failed records > ([code|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java#L213]). > This makes it difficult for users to root cause the above issue without > enabling debug logs. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31492) AWS Firehose Connector misclassifies IAM permission exceptions as retryable
[ https://issues.apache.org/jira/browse/FLINK-31492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer reassigned FLINK-31492: - Assignee: Samuel Siebenmann > AWS Firehose Connector misclassifies IAM permission exceptions as retryable > --- > > Key: FLINK-31492 > URL: https://issues.apache.org/jira/browse/FLINK-31492 > Project: Flink > Issue Type: Bug > Components: Connectors / AWS, Connectors / Firehose >Affects Versions: aws-connector-4.1.0 >Reporter: Samuel Siebenmann >Assignee: Samuel Siebenmann >Priority: Major > > The AWS Firehose connector uses an exception classification mechanism to > decide if errors writing requests to AWS Firehose are fatal (i.e. > non-retryable) or not (i.e. retryable). > {code:java} > private boolean isRetryable(Throwable err) { > if (!FIREHOSE_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, > getFatalExceptionCons())) { > return false; > } > if (failOnError) { > getFatalExceptionCons() > .accept(new > KinesisFirehoseException.KinesisFirehoseFailFastException(err)); > return false; > } > return true; > } {code} > ([github|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java#L252]) > This exception classification mechanism compares an exception's actual type > with known, fatal exception types (by using Flink's > [FatalExceptionClassifier.withExceptionClassifier|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/throwable/FatalExceptionClassifier.java#L60]). > An exception is considered fatal if it is assignable to a given known fatal > exception > ([code|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java#L479]). > The AWS Firehose SDK throws fatal IAM permission exceptions as > [FirehoseException|https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/firehose/model/FirehoseException.html]s, > e.g. > {code:java} > software.amazon.awssdk.services.firehose.model.FirehoseException: User: > arn:aws:sts:::assumed-role/example-role/kiam-kiam is not > authorized to perform: firehose:PutRecordBatch on resource: > arn:aws:firehose:us-east-1::deliverystream/example-stream because > no identity-based policy allows the firehose:PutRecordBatch action{code} > At the same time, certain subtypes of FirehoseException are retryable and > non-fatal > (e.g.[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/firehose/model/LimitExceededException.html]). > The AWS Firehose connector currently wrongly classifies the fatal IAM > permission exception as non-fatal. However, the current exception > classification mechanism does not easily handle a case where a super-type > should be considered fatal, but its child type shouldn't. > To address this issue, AWS services and the AWS SDK use error codes (see e.g. > [Firehose's error > codes|https://docs.aws.amazon.com/firehose/latest/APIReference/CommonErrors.html] > or [S3's error > codes|https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList], > see API docs > [here|https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/awscore/exception/AwsErrorDetails.html#errorCode()] > and > [here|https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/awscore/exception/AwsServiceException.html#awsErrorDetails()]) > to uniquely identify error conditions and to be used to handle errors by > type. > The AWS Firehose connector (and other AWS connectors) currently log to debug > when retrying fully failed records > ([code|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java#L213]). > This makes it difficult for users to root cause the above issue without > enabling debug logs. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31490) ZooKeeperLeaderElectionTest.testLeaderShouldBeCorrectedWhenOverwritten times out
[ https://issues.apache.org/jira/browse/FLINK-31490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701290#comment-17701290 ] Matthias Pohl commented on FLINK-31490: --- I extracted the test-related logs and attached it to the issue for further investigation. > ZooKeeperLeaderElectionTest.testLeaderShouldBeCorrectedWhenOverwritten times > out > > > Key: FLINK-31490 > URL: https://issues.apache.org/jira/browse/FLINK-31490 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Priority: Critical > Labels: test-stability > Attachments: FLINK-31490.mvn.log, FLINK-31490.zookeeper-client.log, > FLINK-31490.zookeeper-server.log > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47221=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9448 > {code} > Mar 16 02:00:54 "main" #1 prio=5 os_prio=0 tid=0x7f488800b800 nid=0x5a15 > waiting on condition [0x7f488fe14000] > Mar 16 02:00:54java.lang.Thread.State: WAITING (parking) > Mar 16 02:00:54 at sun.misc.Unsafe.park(Native Method) > Mar 16 02:00:54 - parking to wait for <0xe4065228> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > Mar 16 02:00:54 at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > Mar 16 02:00:54 at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > Mar 16 02:00:54 at > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) > Mar 16 02:00:54 at > org.apache.flink.runtime.leaderelection.TestingRetrievalBase.lambda$waitForNewLeader$0(TestingRetrievalBase.java:50) > Mar 16 02:00:54 at > org.apache.flink.runtime.leaderelection.TestingRetrievalBase$$Lambda$1377/797057570.get(Unknown > Source) > Mar 16 02:00:54 at > org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:150) > Mar 16 02:00:54 at > org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:144) > Mar 16 02:00:54 at > org.apache.flink.runtime.leaderelection.TestingRetrievalBase.waitForNewLeader(TestingRetrievalBase.java:48) > Mar 16 02:00:54 at > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.testLeaderShouldBeCorrectedWhenOverwritten(ZooKeeperLeaderElectionTest.java:479) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31490) ZooKeeperLeaderElectionTest.testLeaderShouldBeCorrectedWhenOverwritten times out
[ https://issues.apache.org/jira/browse/FLINK-31490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-31490: -- Attachment: FLINK-31490.mvn.log FLINK-31490.zookeeper-client.log FLINK-31490.zookeeper-server.log > ZooKeeperLeaderElectionTest.testLeaderShouldBeCorrectedWhenOverwritten times > out > > > Key: FLINK-31490 > URL: https://issues.apache.org/jira/browse/FLINK-31490 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Priority: Critical > Labels: test-stability > Attachments: FLINK-31490.mvn.log, FLINK-31490.zookeeper-client.log, > FLINK-31490.zookeeper-server.log > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47221=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9448 > {code} > Mar 16 02:00:54 "main" #1 prio=5 os_prio=0 tid=0x7f488800b800 nid=0x5a15 > waiting on condition [0x7f488fe14000] > Mar 16 02:00:54java.lang.Thread.State: WAITING (parking) > Mar 16 02:00:54 at sun.misc.Unsafe.park(Native Method) > Mar 16 02:00:54 - parking to wait for <0xe4065228> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > Mar 16 02:00:54 at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > Mar 16 02:00:54 at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > Mar 16 02:00:54 at > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) > Mar 16 02:00:54 at > org.apache.flink.runtime.leaderelection.TestingRetrievalBase.lambda$waitForNewLeader$0(TestingRetrievalBase.java:50) > Mar 16 02:00:54 at > org.apache.flink.runtime.leaderelection.TestingRetrievalBase$$Lambda$1377/797057570.get(Unknown > Source) > Mar 16 02:00:54 at > org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:150) > Mar 16 02:00:54 at > org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:144) > Mar 16 02:00:54 at > org.apache.flink.runtime.leaderelection.TestingRetrievalBase.waitForNewLeader(TestingRetrievalBase.java:48) > Mar 16 02:00:54 at > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.testLeaderShouldBeCorrectedWhenOverwritten(ZooKeeperLeaderElectionTest.java:479) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31492) AWS Firehose Connector misclassifies IAM permission exceptions as retryable
Samuel Siebenmann created FLINK-31492: - Summary: AWS Firehose Connector misclassifies IAM permission exceptions as retryable Key: FLINK-31492 URL: https://issues.apache.org/jira/browse/FLINK-31492 Project: Flink Issue Type: Bug Components: Connectors / AWS, Connectors / Firehose Affects Versions: aws-connector-4.1.0 Reporter: Samuel Siebenmann The AWS Firehose connector uses an exception classification mechanism to decide if errors writing requests to AWS Firehose are fatal (i.e. non-retryable) or not (i.e. retryable). {code:java} private boolean isRetryable(Throwable err) { if (!FIREHOSE_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, getFatalExceptionCons())) { return false; } if (failOnError) { getFatalExceptionCons() .accept(new KinesisFirehoseException.KinesisFirehoseFailFastException(err)); return false; } return true; } {code} ([github|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java#L252]) This exception classification mechanism compares an exception's actual type with known, fatal exception types (by using Flink's [FatalExceptionClassifier.withExceptionClassifier|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/throwable/FatalExceptionClassifier.java#L60]). An exception is considered fatal if it is assignable to a given known fatal exception ([code|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java#L479]). The AWS Firehose SDK throws fatal IAM permission exceptions as [FirehoseException|https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/firehose/model/FirehoseException.html]s, e.g. {code:java} software.amazon.awssdk.services.firehose.model.FirehoseException: User: arn:aws:sts:::assumed-role/example-role/kiam-kiam is not authorized to perform: firehose:PutRecordBatch on resource: arn:aws:firehose:us-east-1::deliverystream/example-stream because no identity-based policy allows the firehose:PutRecordBatch action{code} At the same time, certain subtypes of FirehoseException are retryable and non-fatal (e.g.[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/firehose/model/LimitExceededException.html]). The AWS Firehose connector currently wrongly classifies the fatal IAM permission exception as non-fatal. However, the current exception classification mechanism does not easily handle a case where a super-type should be considered fatal, but its child type shouldn't. To address this issue, AWS services and the AWS SDK use error codes (see e.g. [Firehose's error codes|https://docs.aws.amazon.com/firehose/latest/APIReference/CommonErrors.html] or [S3's error codes|https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList], see API docs [here|https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/awscore/exception/AwsErrorDetails.html#errorCode()] and [here|https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/awscore/exception/AwsServiceException.html#awsErrorDetails()]) to uniquely identify error conditions and to be used to handle errors by type. The AWS Firehose connector (and other AWS connectors) currently log to debug when retrying fully failed records ([code|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java#L213]). This makes it difficult for users to root cause the above issue without enabling debug logs. -- This message was sent by Atlassian Jira (v8.20.10#820010)