[jira] [Closed] (FLINK-30223) Refactor Lock to provide Lock.Factory
[ https://issues.apache.org/jira/browse/FLINK-30223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Caizhi Weng closed FLINK-30223. --- Resolution: Fixed master: 6886303b2f482f2f31c7b98221691a650c1e67d3 > Refactor Lock to provide Lock.Factory > - > > Key: FLINK-30223 > URL: https://issues.apache.org/jira/browse/FLINK-30223 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.3.0 > > > For the core, it should not see too many Flink Table concepts, such as > database and tableName. It only needs to create a Lock. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] tsreaper merged pull request #405: [FLINK-30223] Refactor Lock to provide Lock.Factory
tsreaper merged PR #405: URL: https://github.com/apache/flink-table-store/pull/405 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Zakelly commented on a diff in pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex
Zakelly commented on code in PR #21362: URL: https://github.com/apache/flink/pull/21362#discussion_r1033205960 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java: ## @@ -72,6 +73,10 @@ public void setCurrentKey(@Nonnull K currentKey) { @Override public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) { +if (!keyGroupRange.contains(currentKeyGroupIndex)) { +throw KeyGroupRangeOffsets.newIllegalKeyGroupException( +currentKeyGroupIndex, keyGroupRange); +} Review Comment: > I'm afraid that we cannot avoid the check on accessing state as users might provide a non-deterministic hashCode for the current key. Hi @Myasuka , I checked the code in ```StateTable```, and it seems that in most state accessing cases (expect queryable state) we are checking the key group from ```keyContext.getCurrentKeyGroupIndex()``` instead of calculating it by ```hashCode``` of the partitioned key. So actually we are checking the same value when ```setCurrentKeyGroupIndex``` or state accessing, whether the hashCode implementation is deterministic or not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-30185) Provide the flame graph to the subtask level
[ https://issues.apache.org/jira/browse/FLINK-30185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song reassigned FLINK-30185: Assignee: Rui Fan > Provide the flame graph to the subtask level > > > Key: FLINK-30185 > URL: https://issues.apache.org/jira/browse/FLINK-30185 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST, Runtime / Web Frontend >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Fix For: 1.17.0 > > Attachments: image-2022-11-24-14-49-42-845.png, > image-2022-11-28-14-38-47-145.png, image-2022-11-28-14-48-20-462.png > > > FLINK-13550 supported for CPU FlameGraphs in web UI. > As Flink doc mentioned: > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/flame_graphs/#sampling-process > {code:java} > Note: Stack trace samples from all threads of an operator are combined > together. If a method call consumes 100% of the resources in one of the > parallel tasks but none in the others, the bottleneck might be obscured by > being averaged out. > There are plans to address this limitation in the future by providing “drill > down” visualizations to the task level. {code} > > The flame graph at the subtask level is very useful when a small number of > subtasks are bottlenecked. So we should provide the flame graph to the > subtask level > > !image-2022-11-24-14-49-42-845.png! > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30185) Provide the flame graph to the subtask level
[ https://issues.apache.org/jira/browse/FLINK-30185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639837#comment-17639837 ] Xintong Song commented on FLINK-30185: -- [~fanrui], Sounds good to me. You are assigned. Please move ahead. > Provide the flame graph to the subtask level > > > Key: FLINK-30185 > URL: https://issues.apache.org/jira/browse/FLINK-30185 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST, Runtime / Web Frontend >Reporter: Rui Fan >Priority: Major > Fix For: 1.17.0 > > Attachments: image-2022-11-24-14-49-42-845.png, > image-2022-11-28-14-38-47-145.png, image-2022-11-28-14-48-20-462.png > > > FLINK-13550 supported for CPU FlameGraphs in web UI. > As Flink doc mentioned: > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/flame_graphs/#sampling-process > {code:java} > Note: Stack trace samples from all threads of an operator are combined > together. If a method call consumes 100% of the resources in one of the > parallel tasks but none in the others, the bottleneck might be obscured by > being averaged out. > There are plans to address this limitation in the future by providing “drill > down” visualizations to the task level. {code} > > The flame graph at the subtask level is very useful when a small number of > subtasks are bottlenecked. So we should provide the flame graph to the > subtask level > > !image-2022-11-24-14-49-42-845.png! > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30206) Allow FileStoreScan to read incremental changes from OVERWRITE snapshot in Table Store
[ https://issues.apache.org/jira/browse/FLINK-30206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30206: --- Labels: pull-request-available (was: ) > Allow FileStoreScan to read incremental changes from OVERWRITE snapshot in > Table Store > -- > > Key: FLINK-30206 > URL: https://issues.apache.org/jira/browse/FLINK-30206 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Affects Versions: table-store-0.3.0 >Reporter: Caizhi Weng >Assignee: Caizhi Weng >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.3.0 > > > Currently {{AbstractFileStoreScan}} can only read incremental changes from > APPEND snapshots. However in OVERWRITE snapshots, users will also append new > records to table. These changes must be discovered by compact job source so > that the overwritten partition can be compacted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] tsreaper opened a new pull request, #406: [FLINK-30206] Allow FileStoreScan to read incremental changes from OVERWRITE snapshot in Table Store
tsreaper opened a new pull request, #406: URL: https://github.com/apache/flink-table-store/pull/406 Currently `AbstractFileStoreScan` can only read incremental changes from APPEND snapshots. However in OVERWRITE snapshots, users will also append new records to table. These changes must be discovered by compact job source so that the overwritten partition can be compacted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shuiqiangchen commented on pull request #20745: [FLINK-28988] Don't push filters down into the right table for temporal join
shuiqiangchen commented on PR #20745: URL: https://github.com/apache/flink/pull/20745#issuecomment-1328639959 @lincoln-lil Thanks for your detailed explanation in online and offline discussions. I have simplified the implementation with the consensus we had reached. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30221) Fix the bug of sum(try_cast(string as bigint)) return null when partial elements can't convert to bigint
[ https://issues.apache.org/jira/browse/FLINK-30221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639823#comment-17639823 ] Tony Zhu commented on FLINK-30221: -- Could you assign to me and I'd like to try the fix. > Fix the bug of sum(try_cast(string as bigint)) return null when partial > elements can't convert to bigint > > > Key: FLINK-30221 > URL: https://issues.apache.org/jira/browse/FLINK-30221 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API, Table SQL / Runtime >Affects Versions: 1.17.0 >Reporter: dalongliu >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30158) [Flink SQL][Protobuf] NullPointerException when querying Kafka topic using repeated or map attributes
[ https://issues.apache.org/jira/browse/FLINK-30158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639822#comment-17639822 ] Tony Zhu commented on FLINK-30158: -- [~jamesmcguirepro] could you provide more info? I'd like to take a look the detail. > [Flink SQL][Protobuf] NullPointerException when querying Kafka topic using > repeated or map attributes > - > > Key: FLINK-30158 > URL: https://issues.apache.org/jira/browse/FLINK-30158 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem >Affects Versions: 1.16.0 >Reporter: James Mcguire >Priority: Major > > I am encountering a {{java.lang.NullPointerException}} exception when trying > to use Flink SQL to query a kafka topic that uses either {{repeated}} and/or > {{map}} attributes. > > {*}{*}{*}Replication{*} *steps* > # Use a protobuf definition that either uses repeated and/or map. This > protobuf schema should cover a few of the problematic scenarios I ran into: > > {code:java} > syntax = "proto3"; > package example.message; > option java_package = "com.example.message"; > option java_multiple_files = true; > message NestedType { > int64 nested_first = 1; > oneof nested_second { > int64 one_of_first = 2; > string one_of_second = 3; > } > } > message Test { > repeated int64 first = 1; > map second = 2; > } {code} > 2. Attempt query on topic, even excluding problematic columns: > > {code:java} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.formats.protobuf.PbCodegenException: > java.lang.NullPointerException{code} > > > log file: > > {code:java} > 2022-11-22 15:33:59,510 WARN org.apache.flink.table.client.cli.CliClient > [] - Could not execute SQL > statement.org.apache.flink.table.client.gateway.SqlExecutionException: Error > while retrieving result.at > org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:79) > ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: java.lang.RuntimeException: > Failed to fetch next resultat > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) > ~[flink-dist-1.16.0.jar:1.16.0]at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) > ~[flink-dist-1.16.0.jar:1.16.0]at > org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) > ~[?:?]at > org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) > ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: java.io.IOException: Failed > to fetch job execution resultat > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184) > ~[flink-dist-1.16.0.jar:1.16.0]at > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) > ~[flink-dist-1.16.0.jar:1.16.0]at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) > ~[flink-dist-1.16.0.jar:1.16.0]at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) > ~[flink-dist-1.16.0.jar:1.16.0]at > org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) > ~[?:?]at > org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) > ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: > java.util.concurrent.ExecutionException: > org.apache.flink.client.program.ProgramInvocationException: Job failed > (JobID: bc869097009a92d0601add881a6b920c)at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) > ~[?:?]at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) > ~[?:?]at > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182) > ~[flink-dist-1.16.0.jar:1.16.0]at > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) > ~[flink-dist-1.16.0.jar:1.16.0]at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) > ~[flink-dist-1.16.0.jar:1.16.0]at >
[jira] [Closed] (FLINK-29987) PartialUpdateITCase.testForeignKeyJo is unstable
[ https://issues.apache.org/jira/browse/FLINK-29987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-29987. Resolution: Fixed master: cd0870bab446ad8e91dab3ddd3b3b6e7ef71612f > PartialUpdateITCase.testForeignKeyJo is unstable > > > Key: FLINK-29987 > URL: https://issues.apache.org/jira/browse/FLINK-29987 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Assignee: Alex Sorokoumov >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.3.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] JingsongLi merged pull request #404: [FLINK-29987] Use Awaitility in PartialUpdateITCase#testForeignKeyJoin
JingsongLi merged PR #404: URL: https://github.com/apache/flink-table-store/pull/404 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #405: [FLINK-30223] Refactor Lock to provide Lock.Factory
JingsongLi commented on code in PR #405: URL: https://github.com/apache/flink-table-store/pull/405#discussion_r1033178800 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java: ## @@ -31,13 +32,57 @@ public interface Lock extends AutoCloseable { /** Run with lock. */ T runWithLock(Callable callable) throws Exception; -@Nullable -static Lock fromCatalog(CatalogLock.Factory lockFactory, ObjectPath tablePath) { -if (lockFactory == null) { -return null; +/** A factory to create {@link Lock}. */ +interface Factory extends Serializable { +Lock create(); +} + +static Factory factory(@Nullable CatalogLock.Factory lockFactory, ObjectPath tablePath) { +return lockFactory == null +? new EmptyFactory() +: new CatalogLockFactory(lockFactory, tablePath); +} + +static Factory emptyFactory() { +return new EmptyFactory(); +} + +/** A {@link Factory} creating lock from catalog. */ +class CatalogLockFactory implements Factory { + +private static final long serialVersionUID = 1L; + +private final CatalogLock.Factory lockFactory; +private final ObjectPath tablePath; + +public CatalogLockFactory(CatalogLock.Factory lockFactory, ObjectPath tablePath) { +this.lockFactory = lockFactory; +this.tablePath = tablePath; +} + +@Override +public Lock create() { +return fromCatalog(lockFactory.create(), tablePath); } +} -return fromCatalog(lockFactory.create(), tablePath); +/** A {@link Factory} creating empty lock. */ +class EmptyFactory implements Factory { + +private static final long serialVersionUID = 1L; + +@Override +public Lock create() { +return new Lock() { +@Override +public T runWithLock(Callable callable) throws Exception { +return callable.call(); +} + +@Override +public void close() {} +}; Review Comment: I think we can return a object to avoid null checking. I will modify others to remove null checking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-30205) Modify compact interface for TableWrite and FileStoreWrite to support normal compaction in Table Store
[ https://issues.apache.org/jira/browse/FLINK-30205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-30205. Resolution: Fixed master: e6e62699ef42a90d9eb2ca73b10460eb2764a586 > Modify compact interface for TableWrite and FileStoreWrite to support normal > compaction in Table Store > -- > > Key: FLINK-30205 > URL: https://issues.apache.org/jira/browse/FLINK-30205 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Affects Versions: table-store-0.3.0 >Reporter: Caizhi Weng >Assignee: Caizhi Weng >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.3.0 > > > Currently the {{compact}} interface in {{TableWrite}} and {{FileStoreWrite}} > can only trigger full compaction. However a separated compact job should not > only perform full compaction, but also perform normal compaction once in a > while, just like what the current Table Store sinks do. > We need to modify compact interface for TableWrite and FileStoreWrite to > support normal compaction. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #403: [FLINK-30205] Modify compact interface for TableWrite and FileStoreWrite to support normal compaction in Table Store
JingsongLi commented on code in PR #403: URL: https://github.com/apache/flink-table-store/pull/403#discussion_r1033177161 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java: ## @@ -34,10 +34,12 @@ void write(T record) throws Exception; /** - * Compact all files related to the writer. Note that compaction process is only submitted and - * may not be completed when the method returns. + * Compact files related to the writer. Note that compaction process is only submitted and may + * not be completed when the method returns. + * + * @param fullCompaction whether to trigger full compaction or just normal compaction Review Comment: You are right -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi merged pull request #403: [FLINK-30205] Modify compact interface for TableWrite and FileStoreWrite to support normal compaction in Table Store
JingsongLi merged PR #403: URL: https://github.com/apache/flink-table-store/pull/403 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30185) Provide the flame graph to the subtask level
[ https://issues.apache.org/jira/browse/FLINK-30185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639816#comment-17639816 ] Rui Fan commented on FLINK-30185: - Hi [~xtsong] , thanks for your reply. The improvement mainly includes 2 parts: # How the web frontend show the flame_graph for single subtask? # How the backend save or fetch the thread info sample for single subtask? h2. Web Frontend It's similar with Metrics, we need to add a select box that select subtaskIndex all or one subtaskIndex. And pass the subtaskIndex to backend. !image-2022-11-28-14-48-20-462.png! !image-2022-11-28-14-38-47-145.png|width=783,height=286! h2. Backend h3. 1. Refactor the cache logic Currently, the cache key of ThreadInfo is jobId + JobVertexId. The cache key should be changed to jobId + jobVertexId + subtaskIndex. h3. 2. Add the subtaskIndex Allow request threadInfo from single subtask. If anything is wrong or missed, please let me know, thanks! > Provide the flame graph to the subtask level > > > Key: FLINK-30185 > URL: https://issues.apache.org/jira/browse/FLINK-30185 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST, Runtime / Web Frontend >Reporter: Rui Fan >Priority: Major > Fix For: 1.17.0 > > Attachments: image-2022-11-24-14-49-42-845.png, > image-2022-11-28-14-38-47-145.png, image-2022-11-28-14-48-20-462.png > > > FLINK-13550 supported for CPU FlameGraphs in web UI. > As Flink doc mentioned: > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/flame_graphs/#sampling-process > {code:java} > Note: Stack trace samples from all threads of an operator are combined > together. If a method call consumes 100% of the resources in one of the > parallel tasks but none in the others, the bottleneck might be obscured by > being averaged out. > There are plans to address this limitation in the future by providing “drill > down” visualizations to the task level. {code} > > The flame graph at the subtask level is very useful when a small number of > subtasks are bottlenecked. So we should provide the flame graph to the > subtask level > > !image-2022-11-24-14-49-42-845.png! > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30185) Provide the flame graph to the subtask level
[ https://issues.apache.org/jira/browse/FLINK-30185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan updated FLINK-30185: Attachment: image-2022-11-28-14-48-20-462.png > Provide the flame graph to the subtask level > > > Key: FLINK-30185 > URL: https://issues.apache.org/jira/browse/FLINK-30185 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST, Runtime / Web Frontend >Reporter: Rui Fan >Priority: Major > Fix For: 1.17.0 > > Attachments: image-2022-11-24-14-49-42-845.png, > image-2022-11-28-14-38-47-145.png, image-2022-11-28-14-48-20-462.png > > > FLINK-13550 supported for CPU FlameGraphs in web UI. > As Flink doc mentioned: > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/flame_graphs/#sampling-process > {code:java} > Note: Stack trace samples from all threads of an operator are combined > together. If a method call consumes 100% of the resources in one of the > parallel tasks but none in the others, the bottleneck might be obscured by > being averaged out. > There are plans to address this limitation in the future by providing “drill > down” visualizations to the task level. {code} > > The flame graph at the subtask level is very useful when a small number of > subtasks are bottlenecked. So we should provide the flame graph to the > subtask level > > !image-2022-11-24-14-49-42-845.png! > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30185) Provide the flame graph to the subtask level
[ https://issues.apache.org/jira/browse/FLINK-30185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan updated FLINK-30185: Attachment: image-2022-11-28-14-38-47-145.png > Provide the flame graph to the subtask level > > > Key: FLINK-30185 > URL: https://issues.apache.org/jira/browse/FLINK-30185 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST, Runtime / Web Frontend >Reporter: Rui Fan >Priority: Major > Fix For: 1.17.0 > > Attachments: image-2022-11-24-14-49-42-845.png, > image-2022-11-28-14-38-47-145.png > > > FLINK-13550 supported for CPU FlameGraphs in web UI. > As Flink doc mentioned: > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/flame_graphs/#sampling-process > {code:java} > Note: Stack trace samples from all threads of an operator are combined > together. If a method call consumes 100% of the resources in one of the > parallel tasks but none in the others, the bottleneck might be obscured by > being averaged out. > There are plans to address this limitation in the future by providing “drill > down” visualizations to the task level. {code} > > The flame graph at the subtask level is very useful when a small number of > subtasks are bottlenecked. So we should provide the flame graph to the > subtask level > > !image-2022-11-24-14-49-42-845.png! > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] rkhachatryan commented on a diff in pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex
rkhachatryan commented on code in PR #21362: URL: https://github.com/apache/flink/pull/21362#discussion_r1033165454 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java: ## @@ -556,9 +559,27 @@ public void testScaleUp() throws Exception { 3, 2, operatorSubtaskState3)) { -testHarness1.processElement1(new StreamRecord<>("trigger")); -testHarness2.processElement1(new StreamRecord<>("trigger")); -testHarness3.processElement1(new StreamRecord<>("trigger")); + +// Since there is a keyed operator, we should follow the key partition rules. +Map, KeyGroupRange> +keyGroupPartition = new HashMap<>(); +keyGroupPartition.put(testHarness1, KeyGroupRange.of(0, 3)); +keyGroupPartition.put(testHarness2, KeyGroupRange.of(4, 6)); +keyGroupPartition.put(testHarness3, KeyGroupRange.of(7, 9)); +while (!keyGroupPartition.isEmpty()) { +String triggerKey = String.valueOf(ThreadLocalRandom.current().nextLong()); +for (Map.Entry< +TwoInputStreamOperatorTestHarness, +KeyGroupRange> +entry : keyGroupPartition.entrySet()) { +if (entry.getValue() + .contains(KeyGroupRangeAssignment.assignToKeyGroup(triggerKey, 10))) { +entry.getKey().processElement1(new StreamRecord<>(triggerKey)); +keyGroupPartition.remove(entry.getKey()); +break; +} +} +} Review Comment: Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-15635) Allow passing a ClassLoader to EnvironmentSettings
[ https://issues.apache.org/jira/browse/FLINK-15635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dalongliu updated FLINK-15635: -- Release Note: TableEnvironment introduces a user class loader to have a consistent class loading behavior in table programs, SQL Client and SQL Gateway. The user classloader manages all user jars such as jar added by `ADD JAR` or `CREATE FUNCTION .. USING JAR ..` statements. User-defined functions/connectors/catalogs should replace `Thread.currentThread().getContextClassLoader()` with the user class loader to load classes. Otherwise, ClassNotFoundException maybe thrown. The user class loader can be accessed via `FunctionContext#getUserCodeClassLoader`, `DynamicTableFactory.Context#getClassLoader` and `CatalogFactory.Context#getClassLoader`. If you have used thread context classloader to load your user class before 1.15, after upgrade 1.16, this is an incompatible behavior because of the table planner classloader, so you should change your code by using `ADD JAR` syntax to add customer jar to planner classloader firstly, then the framework helps you to load the class when needed, this simplify your work related with classloader. was:TableEnvironment introduces a user class loader to have a consistent class loading behavior in table programs, SQL Client and SQL Gateway. The user classloader manages all user jars such as jar added by `ADD JAR` or `CREATE FUNCTION .. USING JAR ..` statements. User-defined functions/connectors/catalogs should replace `Thread.currentThread().getContextClassLoader()` with the user class loader to load classes. Otherwise, ClassNotFoundException maybe thrown. The user class loader can be accessed via `FunctionContext#getUserCodeClassLoader`, `DynamicTableFactory.Context#getClassLoader` and `CatalogFactory.Context#getClassLoader`. > Allow passing a ClassLoader to EnvironmentSettings > -- > > Key: FLINK-15635 > URL: https://issues.apache.org/jira/browse/FLINK-15635 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Timo Walther >Assignee: Francesco Guardiani >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > We had a couple of class loading issues in the past because people forgot to > use the right classloader in {{flink-table}}. The SQL Client executor code > hacks a classloader into the planner process by using {{wrapClassLoader}} > that sets the threads context classloader. > Instead we should allow passing a class loader to environment settings. This > class loader can be passed to the planner and can be stored in table > environment, table config, etc. to have a consistent class loading behavior. > Having this in place should replace the need for > {{Thread.currentThread().getContextClassLoader()}} in the entire > {{flink-table}} module. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] Myasuka commented on a diff in pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex
Myasuka commented on code in PR #21362: URL: https://github.com/apache/flink/pull/21362#discussion_r1033158877 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java: ## @@ -72,6 +73,10 @@ public void setCurrentKey(@Nonnull K currentKey) { @Override public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) { +if (!keyGroupRange.contains(currentKeyGroupIndex)) { +throw KeyGroupRangeOffsets.newIllegalKeyGroupException( +currentKeyGroupIndex, keyGroupRange); +} Review Comment: I'm afraid that we cannot avoid the check on accessing state as users might provide a non-deterministic hashCode for the current key. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-14055) Add advanced function DDL syntax "USING JAR"
[ https://issues.apache.org/jira/browse/FLINK-14055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dalongliu updated FLINK-14055: -- Release Note: In 1.16, we introduced the `CREATE FUNCTION ... USING JAR` syntax to support the dynamic loading of the UDF jar in per job, which is convenient for platform developers to easily achieve UDF management. In addition, we also port the `ADD JAR` syntax from SqlClient to `TableEnvironment` side, this allows the syntax is more general to Table API users. However, due to inconsistent classloader in StreamExecutionEnvironment and TableEnvironment, the `ADD JAR` syntax is not available for Table API program currently, it will be resolved by https://issues.apache.org/jira/browse/FLINK-29240. More information about this feature could be found in https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-function. was: In 1.16, we introduced the `CREATE FUNCTION ... USING JAR` syntax to support the dynamic loading of the UDF jar in per job, which is convenient for platform developers to easily achieve UDF management. In addition, we also port the `ADD JAR` syntax from SqlClient to `TableEnvironment` side, this allows the syntax is more general to Table API users. However, due to inconsistent classloader in StreamExecutionEnvironment and TableEnvironment, the `ADD JAR` syntax is not available for Table API program currently, it will be resolved by https://issues.apache.org/jira/browse/FLINK-29240 More information about this feature could be found in https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-function. > Add advanced function DDL syntax "USING JAR" > > > Key: FLINK-14055 > URL: https://issues.apache.org/jira/browse/FLINK-14055 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Reporter: BL >Assignee: dalongliu >Priority: Major > Labels: auto-unassigned, sprint > Fix For: 1.16.0 > > > As > [FLIP-214|https://cwiki.apache.org/confluence/display/FLINK/FLIP-214+Support+Advanced+Function+DDL] > propose, this ticket is to support dynamically loading functions from > external source in function DDL with advanced syntax like: > > {code:java} > CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF NOT EXISTS] > [catalog_name.db_name.]function_name AS class_name [LANGUAGE > JAVA|SCALA|PYTHON] [USING JAR‘resource_path’ [, JAR ‘resource_path’]*]; {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-30184) Save TM/JM thread stack periodically
[ https://issues.apache.org/jira/browse/FLINK-30184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan closed FLINK-30184. --- Resolution: Won't Do > Save TM/JM thread stack periodically > > > Key: FLINK-30184 > URL: https://issues.apache.org/jira/browse/FLINK-30184 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Reporter: Rui Fan >Priority: Major > Fix For: 1.17.0 > > > After FLINK-14816 FLINK-25398 and FLINK-25372 , flink user can view the > thread stack of TM/JM in Flink WebUI. > It can help flink users to find out why the Flink job is stuck, or why the > processing is slow. It is very useful for trouble shooting. > However, sometimes Flink tasks get stuck or process slowly, but when the user > troubleshoots the problem, the job has resumed. It is difficult to find out > what happened to the Flink job at the time and why is it slow? > > So, could we periodically save the thread stack of TM or JM in the TM log > directory? > Define some configurations: > cluster.thread-dump.interval=1min > cluster.thread-dump.cleanup-time=48 hours -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-30184) Save TM/JM thread stack periodically
[ https://issues.apache.org/jira/browse/FLINK-30184 ] Rui Fan deleted comment on FLINK-30184: - was (Author: fanrui): Hi [~xtsong] , please help take a look in your free time. And if it makes sense, please assign it to me, thanks~ > Save TM/JM thread stack periodically > > > Key: FLINK-30184 > URL: https://issues.apache.org/jira/browse/FLINK-30184 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Reporter: Rui Fan >Priority: Major > Fix For: 1.17.0 > > > After FLINK-14816 FLINK-25398 and FLINK-25372 , flink user can view the > thread stack of TM/JM in Flink WebUI. > It can help flink users to find out why the Flink job is stuck, or why the > processing is slow. It is very useful for trouble shooting. > However, sometimes Flink tasks get stuck or process slowly, but when the user > troubleshoots the problem, the job has resumed. It is difficult to find out > what happened to the Flink job at the time and why is it slow? > > So, could we periodically save the thread stack of TM or JM in the TM log > directory? > Define some configurations: > cluster.thread-dump.interval=1min > cluster.thread-dump.cleanup-time=48 hours -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30184) Save TM/JM thread stack periodically
[ https://issues.apache.org/jira/browse/FLINK-30184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639802#comment-17639802 ] Rui Fan commented on FLINK-30184: - Hi [~xtsong] , thanks for your explanation. It sounds reasonable, I will close this JIRA. > Save TM/JM thread stack periodically > > > Key: FLINK-30184 > URL: https://issues.apache.org/jira/browse/FLINK-30184 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Reporter: Rui Fan >Priority: Major > Fix For: 1.17.0 > > > After FLINK-14816 FLINK-25398 and FLINK-25372 , flink user can view the > thread stack of TM/JM in Flink WebUI. > It can help flink users to find out why the Flink job is stuck, or why the > processing is slow. It is very useful for trouble shooting. > However, sometimes Flink tasks get stuck or process slowly, but when the user > troubleshoots the problem, the job has resumed. It is difficult to find out > what happened to the Flink job at the time and why is it slow? > > So, could we periodically save the thread stack of TM or JM in the TM log > directory? > Define some configurations: > cluster.thread-dump.interval=1min > cluster.thread-dump.cleanup-time=48 hours -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] 1996fanrui commented on pull request #21398: [FLINK-30214][mini_cluster] MiniCluster adapt the jobvertex-parallelism-overrides
1996fanrui commented on PR #21398: URL: https://github.com/apache/flink/pull/21398#issuecomment-1328573385 Hi @gyfora @mxm As we know, when the flink job run on IDEA, it will use the MiniCluster. And `PerJobMiniClusterFactory#getMiniClusterConfig` generate the `numSlotsPerTaskManager` by the JobGraph. However, the `numSlotsPerTaskManager` may be wrong after using `jobvertex-parallelism-overrides`. So the `numSlotsPerTaskManager` should be generated according to the JobGraph and `jobvertex-parallelism-overrides`. Please help take a look this PR in your free time, thanks~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] SmirAlex commented on pull request #20919: [FLINK-29405] Fix unstable test InputFormatCacheLoaderTest
SmirAlex commented on PR #20919: URL: https://github.com/apache/flink/pull/20919#issuecomment-1328553285 Hi @XComp, thanks for the review! I answered on your comments and added a commit with fix. Have a look when you will be available, please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] SmirAlex commented on a diff in pull request #20919: [FLINK-29405] Fix unstable test InputFormatCacheLoaderTest
SmirAlex commented on code in PR #20919: URL: https://github.com/apache/flink/pull/20919#discussion_r1033131501 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java: ## @@ -107,7 +109,11 @@ protected void reloadCache() throws Exception { } catch (InterruptedException ignored) { // we use interrupt to close reload thread } finally { if (cacheLoadTaskService != null) { +// if main cache reload thread encountered an exception, +// it interrupts underlying InputSplitCacheLoadTasks threads cacheLoadTaskService.shutdownNow(); Review Comment: To be honest, during development I didn't think about relying on ForkJoinPool. As I understand, it's very useful when we have many little tasks, which can create other little subtasks. But in our case we have fixed amount ( = number of splits) of long running tasks, that won't create other subtasks. So simple fixed thread pool looked as a pretty straightforward way to implement cache loading. Plus, if we will use commonForkPool, there can be a situation when we will utilize all threads in this pool (our tasks are long-running), and other tasks that relies on commonForkPool with be starving, which is undesirable behavior, as I understand. Correct me, if I'm wrong, please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] SmirAlex commented on a diff in pull request #20919: [FLINK-29405] Fix unstable test InputFormatCacheLoaderTest
SmirAlex commented on code in PR #20919: URL: https://github.com/apache/flink/pull/20919#discussion_r1033131501 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java: ## @@ -107,7 +109,11 @@ protected void reloadCache() throws Exception { } catch (InterruptedException ignored) { // we use interrupt to close reload thread } finally { if (cacheLoadTaskService != null) { +// if main cache reload thread encountered an exception, +// it interrupts underlying InputSplitCacheLoadTasks threads cacheLoadTaskService.shutdownNow(); Review Comment: To be honest, during development I didn't think about relying on ForkJoinPool. As I understand, it's very useful when we have many little tasks, that also can be created in already running tasks. But in our case we have fixed amount ( = number of splits) of long running tasks, that won't create other subtasks. So simple fixed thread pool looked as a pretty straightforward way to implement cache loading. Plus, if we will use commonForkPool, there can be a situation when we will utilize all threads in this pool (our tasks are long-running), and other tasks that relies on commonForkPool with be starving, which is undesirable behavior, as I understand. Correct me, if I'm wrong, please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #21406: [FLINK-30088][runtime] Fix excessive state updates for TtlMapState and TtlListState
flinkbot commented on PR #21406: URL: https://github.com/apache/flink/pull/21406#issuecomment-1328531809 ## CI report: * cf8b7c1de5e3dfbe83106a4de89423e17e36e50e UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30088) Excessive state updates for TtlMapState and TtlListState
[ https://issues.apache.org/jira/browse/FLINK-30088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30088: --- Labels: pull-request-available (was: ) > Excessive state updates for TtlMapState and TtlListState > > > Key: FLINK-30088 > URL: https://issues.apache.org/jira/browse/FLINK-30088 > Project: Flink > Issue Type: Bug >Reporter: Roman Boyko >Assignee: Roman Boyko >Priority: Minor > Labels: pull-request-available > Attachments: image-2022-11-18-20-25-14-466.png, > image-2022-11-18-20-27-24-054.png > > > After merging the FLINK-21413 every ttl check for cleanup for TtlMapState and > TtlListState (even without expired elements) leads to whole state update. > This is because: > - comparison by link inside `TtlIncrementalCleanup`: > !image-2022-11-18-20-25-14-466.png|width=450,height=288! > - and creating new map or list inside TtlMapState or TtlListState: > !image-2022-11-18-20-27-24-054.png|width=477,height=365! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] rovboyko opened a new pull request, #21406: [FLINK-30088][runtime] Fix excessive state updates for TtlMapState and TtlListState
rovboyko opened a new pull request, #21406: URL: https://github.com/apache/flink/pull/21406 …d TtlListState ## What is the purpose of the change Avoid unnecessary state updates for TtlMapState and TtlListState in case they are not changed. This affects only HashMapStateBackend, MemoryStateBackend and FsStateBackend. ## Brief change log - add if condition to return original List if no records were expired for TtlListState - add if condition to return original Map if no records were expired for TtlMapState ## Verifying this change This change added tests and can be verified as follows: - add testStateNotChangedWithoutCleanup to TtlStateTestBase ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] SmirAlex commented on a diff in pull request #20919: [FLINK-29405] Fix unstable test InputFormatCacheLoaderTest
SmirAlex commented on code in PR #20919: URL: https://github.com/apache/flink/pull/20919#discussion_r1033120828 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java: ## @@ -39,12 +39,14 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** {@link CacheLoader} that used {@link InputFormat} for loading data into the cache. */ public class InputFormatCacheLoader extends CacheLoader { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(InputFormatCacheLoader.class); +private static final long TIMEOUT_AFTER_INTERRUPT = 10; // 10 sec Review Comment: Agree, fixed ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java: ## @@ -107,7 +109,11 @@ protected void reloadCache() throws Exception { } catch (InterruptedException ignored) { // we use interrupt to close reload thread } finally { if (cacheLoadTaskService != null) { +// if main cache reload thread encountered an exception, +// it interrupts underlying InputSplitCacheLoadTasks threads cacheLoadTaskService.shutdownNow(); +// timeout 10 sec should definitely be enough to wait for finish after interrupt Review Comment: Agree, fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30185) Provide the flame graph to the subtask level
[ https://issues.apache.org/jira/browse/FLINK-30185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639778#comment-17639778 ] Xintong Song commented on FLINK-30185: -- I think this makes a nice improvement. Could you explain a bit more in detail how do you plan to do this? > Provide the flame graph to the subtask level > > > Key: FLINK-30185 > URL: https://issues.apache.org/jira/browse/FLINK-30185 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST, Runtime / Web Frontend >Reporter: Rui Fan >Priority: Major > Fix For: 1.17.0 > > Attachments: image-2022-11-24-14-49-42-845.png > > > FLINK-13550 supported for CPU FlameGraphs in web UI. > As Flink doc mentioned: > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/flame_graphs/#sampling-process > {code:java} > Note: Stack trace samples from all threads of an operator are combined > together. If a method call consumes 100% of the resources in one of the > parallel tasks but none in the others, the bottleneck might be obscured by > being averaged out. > There are plans to address this limitation in the future by providing “drill > down” visualizations to the task level. {code} > > The flame graph at the subtask level is very useful when a small number of > subtasks are bottlenecked. So we should provide the flame graph to the > subtask level > > !image-2022-11-24-14-49-42-845.png! > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-23035) Add explicit method to StateChangelogWriter to write metadata
[ https://issues.apache.org/jira/browse/FLINK-23035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639777#comment-17639777 ] Yanfei Lei commented on FLINK-23035: [~binh] of course, welcome to help review. > Add explicit method to StateChangelogWriter to write metadata > - > > Key: FLINK-23035 > URL: https://issues.apache.org/jira/browse/FLINK-23035 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / State Backends >Reporter: Roman Khachatryan >Assignee: Xinbin Huang >Priority: Minor > Labels: pull-request-available > Fix For: 1.17.0 > > > Currently, metadata is written to the state changelog using the same > StateChangelogWriter.append() method as data. > However, it doesn't belong to a specific group, and should be read first on > recovery. Because of that, -1 is used. > An explicit append() without keygroup would be less fragile (probably still > using -1 under the hood). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30184) Save TM/JM thread stack periodically
[ https://issues.apache.org/jira/browse/FLINK-30184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639775#comment-17639775 ] Xintong Song commented on FLINK-30184: -- [~fanrui], sorry for the late response. I agree with [~wangyang0918] that this is probably more suitable for an external service that manages / monitors Flink. Thread dumps are for debugging and should not be activated constantly given the performance impact. Flink already offers rest api for capturing thread stacks of [jobmanager|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobmanager-thread-dump] and [taskmanager|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#taskmanagers-taskmanagerid-thread-dump]. It should be easy for an external monitoring system to capture the dumps when the job is detected to be slow. > Save TM/JM thread stack periodically > > > Key: FLINK-30184 > URL: https://issues.apache.org/jira/browse/FLINK-30184 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Reporter: Rui Fan >Priority: Major > Fix For: 1.17.0 > > > After FLINK-14816 FLINK-25398 and FLINK-25372 , flink user can view the > thread stack of TM/JM in Flink WebUI. > It can help flink users to find out why the Flink job is stuck, or why the > processing is slow. It is very useful for trouble shooting. > However, sometimes Flink tasks get stuck or process slowly, but when the user > troubleshoots the problem, the job has resumed. It is difficult to find out > what happened to the Flink job at the time and why is it slow? > > So, could we periodically save the thread stack of TM or JM in the TM log > directory? > Define some configurations: > cluster.thread-dump.interval=1min > cluster.thread-dump.cleanup-time=48 hours -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-23035) Add explicit method to StateChangelogWriter to write metadata
[ https://issues.apache.org/jira/browse/FLINK-23035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639773#comment-17639773 ] Xinbin Huang commented on FLINK-23035: -- hey [~Yanfei Lei] , sorry I missed the message from roman, and I can pick it up in the next week. But I just saw you've opened a PR for this, so I think we can follow up on your PR? > Add explicit method to StateChangelogWriter to write metadata > - > > Key: FLINK-23035 > URL: https://issues.apache.org/jira/browse/FLINK-23035 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / State Backends >Reporter: Roman Khachatryan >Assignee: Xinbin Huang >Priority: Minor > Labels: pull-request-available > Fix For: 1.17.0 > > > Currently, metadata is written to the state changelog using the same > StateChangelogWriter.append() method as data. > However, it doesn't belong to a specific group, and should be read first on > recovery. Because of that, -1 is used. > An explicit append() without keygroup would be less fragile (probably still > using -1 under the hood). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-24870) Cannot cast "java.util.Date" to "java.time.Instant"
[ https://issues.apache.org/jira/browse/FLINK-24870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639769#comment-17639769 ] dalongliu commented on FLINK-24870: --- [~wangbaohua] I pull your code from gitee, but test can't be run because your project has some extra jar dependency about `com.asap.rule`, can you also share this code with me? So I can reproduce the problem. > Cannot cast "java.util.Date" to "java.time.Instant" > --- > > Key: FLINK-24870 > URL: https://issues.apache.org/jira/browse/FLINK-24870 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.13.1 >Reporter: wangbaohua >Priority: Major > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:76) > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:80) > ... 11 more > Caused by: > org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:74) > ... 12 more > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:89) > at > org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:74) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) > ... 15 more > Caused by: org.codehaus.commons.compiler.CompileException: Line 120, Column > 101: Cannot cast "java.util.Date" to "java.time.Instant" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211) > at > org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5051) > at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4418) > at > org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4396) > at org.codehaus.janino.Java$Cast.accept(Java.java:4898) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396) > at > org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5057) > at org.codehaus.janino.UnitCompiler.access$8100(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4409) > at > org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4400) > at > org.codehaus.janino.Java$ParenthesizedExpression.accept(Java.java:4924) > at >
[GitHub] [flink] Zakelly commented on a diff in pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex
Zakelly commented on code in PR #21362: URL: https://github.com/apache/flink/pull/21362#discussion_r1033104544 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java: ## @@ -72,6 +73,10 @@ public void setCurrentKey(@Nonnull K currentKey) { @Override public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) { +if (!keyGroupRange.contains(currentKeyGroupIndex)) { +throw KeyGroupRangeOffsets.newIllegalKeyGroupException( +currentKeyGroupIndex, keyGroupRange); +} Review Comment: Actually I don't like the idea of checking if current key group is valid when accessing the state, since the problem is from setting the key group instead of accessing the state. Besides, user may set key group once and access the state several times, so for performance concern, I'd rather remove the check in each state access and only keep the check in this PR. WDYT? @Myasuka -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #21405: [FLINK-23035][state/changelog] Add explicit append() to StateChangelogWriter to write metadata
flinkbot commented on PR #21405: URL: https://github.com/apache/flink/pull/21405#issuecomment-1328493977 ## CI report: * fdc39492b315dbecc5b4cff8f222caa53eebee87 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-23035) Add explicit method to StateChangelogWriter to write metadata
[ https://issues.apache.org/jira/browse/FLINK-23035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-23035: --- Labels: pull-request-available (was: ) > Add explicit method to StateChangelogWriter to write metadata > - > > Key: FLINK-23035 > URL: https://issues.apache.org/jira/browse/FLINK-23035 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / State Backends >Reporter: Roman Khachatryan >Assignee: Xinbin Huang >Priority: Minor > Labels: pull-request-available > Fix For: 1.17.0 > > > Currently, metadata is written to the state changelog using the same > StateChangelogWriter.append() method as data. > However, it doesn't belong to a specific group, and should be read first on > recovery. Because of that, -1 is used. > An explicit append() without keygroup would be less fragile (probably still > using -1 under the hood). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] fredia opened a new pull request, #21405: [FLINK-23035][state/changelog] Add explicit append() to StateChangelogWriter to write metadata
fredia opened a new pull request, #21405: URL: https://github.com/apache/flink/pull/21405 ## What is the purpose of the change Add explicit append() to StateChangelogWriter to write metadata. ## Brief change log - Add explicit `StateChange(byte[] value)` - Add explicit `append(byte[] value)` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fsk119 commented on a diff in pull request #21133: [FLINK-29732][sql-gateway] support configuring session with SQL statement.
fsk119 commented on code in PR #21133: URL: https://github.com/apache/flink/pull/21133#discussion_r1033065242 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java: ## @@ -79,6 +79,23 @@ public void closeSession(SessionHandle sessionHandle) throws SqlGatewayException } } +@Override +public void configureSession( +SessionHandle sessionHandle, String statement, long executionTimeoutMs) +throws SqlGatewayException { +try { +if (executionTimeoutMs > 0) { +// TODO: support the feature in FLINK-27838 +throw new UnsupportedOperationException( +"SqlGatewayService doesn't support timeout mechanism now."); +} + getSession(sessionHandle).createExecutor().configureSession(statement); Review Comment: It's not thread-safe here. ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java: ## @@ -84,6 +91,39 @@ public OperationExecutor(SessionContext context, Configuration executionConfig) this.executionConfig = executionConfig; } +public void configureSession(String statement) throws ExecutionException, InterruptedException { +TableEnvironmentInternal tableEnv = getTableEnvironment(); +List parsedOperations = tableEnv.getParser().parse(statement); +if (parsedOperations.size() > 1) { +throw new UnsupportedOperationException( +"Unsupported SQL statement! Execute statement only accepts a single SQL statement or " ++ "multiple 'INSERT INTO' statements wrapped in a 'STATEMENT SET' block."); +} +Operation op = parsedOperations.get(0); Review Comment: Can we reuse the code as we do in the CliClient? I think we can introduce an enum `ExecutionMode` and validate the statement kind in the `INITIALIZATION` mode. ## flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java: ## @@ -66,6 +66,14 @@ public interface SqlGatewayService { */ void closeSession(SessionHandle sessionHandle) throws SqlGatewayException; +/** + * Using the statement to initialize the Session. It's only allowed to execute + * SET/RESET/CREATE/DROP/USE/ALTER/LOAD MODULE/UNLOAD MODULE/ADD JAR. The execution should be + * finished before returning because jobs submitted later may depend on it. + */ Review Comment: It's better if we can align with other java docs. ``` /** * Configure the basic settings for the session, including configuring the settings, preparing the catalog and so * on. * * @param sessionHandle handle to identify the Session needs to be closed. * */ ``` ## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java: ## @@ -157,6 +168,83 @@ public void testOpenSessionWithEnvironment() throws Exception { assertThat(tableEnv.listModules()).contains(moduleName); } +@Test +public void testConfigureSessionWithLegalStatement(@TempDir java.nio.file.Path tmpDir) +throws Exception { +SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); + +// SET & RESET +service.configureSession(sessionHandle, "SET 'key1' = 'value1';", 0); +Map config = new HashMap<>(); +config.put("key1", "value1"); + assertThat(service.getSessionConfig(sessionHandle)).containsAllEntriesOf(config); + +service.configureSession(sessionHandle, "RESET 'key1';", 0); + assertThat(service.getSessionConfig(sessionHandle)).doesNotContainEntry("key1", "value1"); + +// CREATE & USE & ALTER & DROP +service.configureSession( +sessionHandle, +"CREATE CATALOG mycat with ('type' = 'generic_in_memory', 'default-database' = 'db');", +0); + +service.configureSession(sessionHandle, "USE CATALOG mycat;", 0); + assertThat(service.getCurrentCatalog(sessionHandle)).isEqualTo("mycat"); + +service.configureSession( +sessionHandle, +"CREATE TABLE db.tbl (score INT) WITH ('connector' = 'datagen');", +0); + +Set tableKinds = new HashSet<>(); +tableKinds.add(TableKind.TABLE); +assertThat(service.listTables(sessionHandle, "mycat", "db", tableKinds)) +.contains( +new TableInfo(ObjectIdentifier.of("mycat", "db", "tbl"), TableKind.TABLE)); + +service.configureSession(sessionHandle, "ALTER TABLE db.tbl RENAME TO tbl1;", 0); +assertThat(service.listTables(sessionHandle, "mycat", "db", tableKinds)) +
[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #403: [FLINK-30205] Modify compact interface for TableWrite and FileStoreWrite to support normal compaction in Table Store
tsreaper commented on code in PR #403: URL: https://github.com/apache/flink-table-store/pull/403#discussion_r1033078213 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java: ## @@ -34,10 +34,12 @@ void write(T record) throws Exception; /** - * Compact all files related to the writer. Note that compaction process is only submitted and - * may not be completed when the method returns. + * Compact files related to the writer. Note that compaction process is only submitted and may + * not be completed when the method returns. + * + * @param fullCompaction whether to trigger full compaction or just normal compaction Review Comment: From the point of a separated compact job, we should always wait for last compaction to complete. Otherwise new changes may never be compacted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #405: [FLINK-30223] Refactor Lock to provide Lock.Factory
tsreaper commented on code in PR #405: URL: https://github.com/apache/flink-table-store/pull/405#discussion_r1033077832 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java: ## @@ -31,13 +32,57 @@ public interface Lock extends AutoCloseable { /** Run with lock. */ T runWithLock(Callable callable) throws Exception; -@Nullable -static Lock fromCatalog(CatalogLock.Factory lockFactory, ObjectPath tablePath) { -if (lockFactory == null) { -return null; +/** A factory to create {@link Lock}. */ +interface Factory extends Serializable { +Lock create(); +} + +static Factory factory(@Nullable CatalogLock.Factory lockFactory, ObjectPath tablePath) { +return lockFactory == null +? new EmptyFactory() +: new CatalogLockFactory(lockFactory, tablePath); +} + +static Factory emptyFactory() { +return new EmptyFactory(); +} + +/** A {@link Factory} creating lock from catalog. */ +class CatalogLockFactory implements Factory { + +private static final long serialVersionUID = 1L; + +private final CatalogLock.Factory lockFactory; +private final ObjectPath tablePath; + +public CatalogLockFactory(CatalogLock.Factory lockFactory, ObjectPath tablePath) { +this.lockFactory = lockFactory; +this.tablePath = tablePath; +} + +@Override +public Lock create() { +return fromCatalog(lockFactory.create(), tablePath); } +} -return fromCatalog(lockFactory.create(), tablePath); +/** A {@link Factory} creating empty lock. */ +class EmptyFactory implements Factory { + +private static final long serialVersionUID = 1L; + +@Override +public Lock create() { +return new Lock() { +@Override +public T runWithLock(Callable callable) throws Exception { +return callable.call(); +} + +@Override +public void close() {} +}; Review Comment: Just return `null`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #403: [FLINK-30205] Modify compact interface for TableWrite and FileStoreWrite to support normal compaction in Table Store
JingsongLi commented on code in PR #403: URL: https://github.com/apache/flink-table-store/pull/403#discussion_r1033075278 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java: ## @@ -34,10 +34,12 @@ void write(T record) throws Exception; /** - * Compact all files related to the writer. Note that compaction process is only submitted and - * may not be completed when the method returns. + * Compact files related to the writer. Note that compaction process is only submitted and may + * not be completed when the method returns. + * + * @param fullCompaction whether to trigger full compaction or just normal compaction Review Comment: I think there are two things: - Is this compaction full or normal. - Should we wait latest compaction. I think we can separate these two things. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on pull request #405: [FLINK-30223] Refactor Lock to provide Lock.Factory
JingsongLi commented on PR #405: URL: https://github.com/apache/flink-table-store/pull/405#issuecomment-1328450069 This is second refactor PR for https://github.com/apache/flink-table-store/pull/394 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30223) Refactor Lock to provide Lock.Factory
[ https://issues.apache.org/jira/browse/FLINK-30223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30223: --- Labels: pull-request-available (was: ) > Refactor Lock to provide Lock.Factory > - > > Key: FLINK-30223 > URL: https://issues.apache.org/jira/browse/FLINK-30223 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.3.0 > > > For the core, it should not see too many Flink Table concepts, such as > database and tableName. It only needs to create a Lock. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] JingsongLi opened a new pull request, #405: [FLINK-30223] Refactor Lock to provide Lock.Factory
JingsongLi opened a new pull request, #405: URL: https://github.com/apache/flink-table-store/pull/405 For the core, it should not see too many Flink Table concepts, such as database and tableName. It only needs to create a Lock. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-30223) Refactor Lock to provide Lock.Factory
Jingsong Lee created FLINK-30223: Summary: Refactor Lock to provide Lock.Factory Key: FLINK-30223 URL: https://issues.apache.org/jira/browse/FLINK-30223 Project: Flink Issue Type: Improvement Components: Table Store Reporter: Jingsong Lee Assignee: Jingsong Lee Fix For: table-store-0.3.0 For the core, it should not see too many Flink Table concepts, such as database and tableName. It only needs to create a Lock. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] yangjf2019 commented on pull request #451: [FLINK-30186] Bump Flink version to 1.15.3
yangjf2019 commented on PR #451: URL: https://github.com/apache/flink-kubernetes-operator/pull/451#issuecomment-1328445247 Hi, @gyfora The central repository has generated the corresponding dependencies, please trigger the CI again, thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30164) Expose BucketComputer from SupportsWrite
[ https://issues.apache.org/jira/browse/FLINK-30164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639748#comment-17639748 ] Caizhi Weng commented on FLINK-30164: - master: d8eb796f035f35e1ac85ff3f657452dd2a41e644 > Expose BucketComputer from SupportsWrite > > > Key: FLINK-30164 > URL: https://issues.apache.org/jira/browse/FLINK-30164 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.3.0 > > > When other engines dock with Sink, they need to know the corresponding bucket > rules before they can be correctly distributed to each bucket. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30164) Expose BucketComputer from SupportsWrite
[ https://issues.apache.org/jira/browse/FLINK-30164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Caizhi Weng updated FLINK-30164: Release Note: (was: master: d8eb796f035f35e1ac85ff3f657452dd2a41e644) > Expose BucketComputer from SupportsWrite > > > Key: FLINK-30164 > URL: https://issues.apache.org/jira/browse/FLINK-30164 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.3.0 > > > When other engines dock with Sink, they need to know the corresponding bucket > rules before they can be correctly distributed to each bucket. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-30164) Expose BucketComputer from SupportsWrite
[ https://issues.apache.org/jira/browse/FLINK-30164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Caizhi Weng closed FLINK-30164. --- Release Note: master: d8eb796f035f35e1ac85ff3f657452dd2a41e644 Resolution: Fixed > Expose BucketComputer from SupportsWrite > > > Key: FLINK-30164 > URL: https://issues.apache.org/jira/browse/FLINK-30164 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.3.0 > > > When other engines dock with Sink, they need to know the corresponding bucket > rules before they can be correctly distributed to each bucket. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] tsreaper merged pull request #400: [FLINK-30164] Expose BucketComputer from SupportsWrite
tsreaper merged PR #400: URL: https://github.com/apache/flink-table-store/pull/400 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-29987) PartialUpdateITCase.testForeignKeyJo is unstable
[ https://issues.apache.org/jira/browse/FLINK-29987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-29987: Assignee: Alex Sorokoumov > PartialUpdateITCase.testForeignKeyJo is unstable > > > Key: FLINK-29987 > URL: https://issues.apache.org/jira/browse/FLINK-29987 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Assignee: Alex Sorokoumov >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.3.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] 1996fanrui commented on pull request #21368: [FLINK-30165][runtime][JUnit5 Migration] Migrate unaligned checkpoint related tests under flink-runtime module to junit5
1996fanrui commented on PR #21368: URL: https://github.com/apache/flink/pull/21368#issuecomment-1328427197 Hi @XComp @snuyanzin , I have addressed all comments, please help take a look, thanks~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] sethsaperstein-lyft commented on a diff in pull request #20844: [FLINK-29099][connectors/kinesis] Update global watermark for idle subtask
sethsaperstein-lyft commented on code in PR #20844: URL: https://github.com/apache/flink/pull/20844#discussion_r1033036319 ## flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java: ## @@ -92,6 +104,7 @@ public long getUpdateTimeoutCount() { protected static class WatermarkUpdate implements Serializable { protected long watermark = Long.MIN_VALUE; protected String id; +protected boolean updateLocalWatermark = true; Review Comment: good suggestion. Thanks for taking a look. Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #21404: [hotfix][docs] Update Zookeper version infromation
flinkbot commented on PR #21404: URL: https://github.com/apache/flink/pull/21404#issuecomment-1328366465 ## CI report: * 1f3ac78db1fb4ae3562196f4a9bd8d51b4c383a8 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] t0560r91 opened a new pull request, #21404: [hotfix][docs] Update Zookeper version infromation
t0560r91 opened a new pull request, #21404: URL: https://github.com/apache/flink/pull/21404 Updating the doc as Flink 1.15.2 ships with Zookeeper 3.5.9 in `lib` and 3.6.3 in `opt` folder in reality. And removing a sentence making notes about Zookeeper 3.4 not being compatible with 3.5 because 3.4 is not included in Flink 1.15.2. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session
[ https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639718#comment-17639718 ] Steven Zhen Wu edited comment on FLINK-30035 at 11/27/22 10:02 PM: --- [~fsk119] here are the steps to reproduce with 1.16.0. note that jar import works fine for 1.15.2. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client with importing external jar {code} ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar {code} * run SQL cmd {code} CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); {code} * Then we shall see exception {code} [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] {code} * We have to put the jar inside Flink `lib/` dir for the jar to be loaded. Then the same SQL cmd will execute fine. {code} cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded {code} was (Author: stevenz3wu): [~fsk119] here are the steps to reproduce with 1.16.0. note that jar import works fine for 1.15.2. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client with importing external jar {code} ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar {code} * run SQL cmd {code} CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); {code} * Then we shall see exception {code} [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] {code} * Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. {code} cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded {code} > ./bin/sql-client.sh won't import external jar into the session > -- > > Key: FLINK-30035 > URL: https://issues.apache.org/jira/browse/FLINK-30035 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.16.0 >Reporter: Steven Zhen Wu >Priority: Major > > I used to be able to run the sql-client with iceberg-flink-runtime jar using > the `-j,--jar ` option (e.g. with 1.15.2). > {code} > ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar > {code} > With 1.16.0, this doesn't work anymore. As a result, I am seeing > ClassNotFoundException. > {code} > java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog > {code} > I have to put the iceberg-flink-runtime-1.16-1.1.0.jar file inside the > `flink/lib` directory to make the jar loaded. This seems like a regression of > 1.16.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] sethsaperstein-lyft commented on pull request #20844: [FLINK-29099][connectors/kinesis] Update global watermark for idle subtask
sethsaperstein-lyft commented on PR #20844: URL: https://github.com/apache/flink/pull/20844#issuecomment-1328347988 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session
[ https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639718#comment-17639718 ] Steven Zhen Wu edited comment on FLINK-30035 at 11/27/22 8:56 PM: -- [~fsk119] here are the steps to reproduce with 1.16.0. note that jar import works fine for 1.15.2. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client with importing external jar {code} ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar {code} * run SQL cmd {code} CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); {code} * Then we shall see exception {code} [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] {code} * Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. {code} cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded {code} was (Author: stevenz3wu): [~fsk119] here are the steps to reproduce with 1.16.0. note that jar import works fine for 1.15.2. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client with importing external jar {code} ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar {code} * run SQL cmd {code} CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); {code} * Then we shall see exception {code} [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] {code} * Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. {code} cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded {code} > ./bin/sql-client.sh won't import external jar into the session > -- > > Key: FLINK-30035 > URL: https://issues.apache.org/jira/browse/FLINK-30035 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.16.0 >Reporter: Steven Zhen Wu >Priority: Major > > I used to be able to run the sql-client with iceberg-flink-runtime jar using > the `-j,--jar ` option (e.g. with 1.15.2). > {code} > ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar > {code} > With 1.16.0, this doesn't work anymore. As a result, I am seeing > ClassNotFoundException. > {code} > java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog > {code} > I have to put the iceberg-flink-runtime-1.16-1.1.0.jar file inside the > `flink/lib` directory to make the jar loaded. This seems like a regression of > 1.16.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session
[ https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639718#comment-17639718 ] Steven Zhen Wu edited comment on FLINK-30035 at 11/27/22 8:55 PM: -- [~fsk119] here are the steps to reproduce with 1.16.0. note that jar import works fine for 1.15.2. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client with importing external jar {code} ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar {code} * run SQL cmd {code} CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); {code} Then we shall see exception {code} [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] {code} Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. {code} cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded {code} was (Author: stevenz3wu): [~fsk119] here are the steps to reproduce. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client with importing external jar {code} ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar {code} * run SQL cmd {code} CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); {code} Then we shall see exception {code} [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] {code} Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. {code} cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded {code} jar import works fine for 1.15.2. > ./bin/sql-client.sh won't import external jar into the session > -- > > Key: FLINK-30035 > URL: https://issues.apache.org/jira/browse/FLINK-30035 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.16.0 >Reporter: Steven Zhen Wu >Priority: Major > > I used to be able to run the sql-client with iceberg-flink-runtime jar using > the `-j,--jar ` option (e.g. with 1.15.2). > {code} > ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar > {code} > With 1.16.0, this doesn't work anymore. As a result, I am seeing > ClassNotFoundException. > {code} > java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog > {code} > I have to put the iceberg-flink-runtime-1.16-1.1.0.jar file inside the > `flink/lib` directory to make the jar loaded. This seems like a regression of > 1.16.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session
[ https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639718#comment-17639718 ] Steven Zhen Wu edited comment on FLINK-30035 at 11/27/22 8:55 PM: -- [~fsk119] here are the steps to reproduce with 1.16.0. note that jar import works fine for 1.15.2. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client with importing external jar {code} ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar {code} * run SQL cmd {code} CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); {code} * Then we shall see exception {code} [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] {code} * Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. {code} cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded {code} was (Author: stevenz3wu): [~fsk119] here are the steps to reproduce with 1.16.0. note that jar import works fine for 1.15.2. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client with importing external jar {code} ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar {code} * run SQL cmd {code} CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); {code} Then we shall see exception {code} [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] {code} Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. {code} cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded {code} > ./bin/sql-client.sh won't import external jar into the session > -- > > Key: FLINK-30035 > URL: https://issues.apache.org/jira/browse/FLINK-30035 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.16.0 >Reporter: Steven Zhen Wu >Priority: Major > > I used to be able to run the sql-client with iceberg-flink-runtime jar using > the `-j,--jar ` option (e.g. with 1.15.2). > {code} > ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar > {code} > With 1.16.0, this doesn't work anymore. As a result, I am seeing > ClassNotFoundException. > {code} > java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog > {code} > I have to put the iceberg-flink-runtime-1.16-1.1.0.jar file inside the > `flink/lib` directory to make the jar loaded. This seems like a regression of > 1.16.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session
[ https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639718#comment-17639718 ] Steven Zhen Wu edited comment on FLINK-30035 at 11/27/22 8:54 PM: -- [~fsk119] here are the steps to reproduce. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client with importing external jar {code} ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar {code} * run SQL cmd {code} CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); {code} Then we shall see exception {code} [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] {code} Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. {code} cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded {code} jar import works fine for 1.15.2. was (Author: stevenz3wu): [~fsk119] here are the steps to reproduce. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client: ` ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar` * run SQL cmd {code} CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); {code} Then we shall see exception {code} [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] {code} Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. {code} cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded {code} > ./bin/sql-client.sh won't import external jar into the session > -- > > Key: FLINK-30035 > URL: https://issues.apache.org/jira/browse/FLINK-30035 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.16.0 >Reporter: Steven Zhen Wu >Priority: Major > > I used to be able to run the sql-client with iceberg-flink-runtime jar using > the `-j,--jar ` option (e.g. with 1.15.2). > {code} > ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar > {code} > With 1.16.0, this doesn't work anymore. As a result, I am seeing > ClassNotFoundException. > {code} > java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog > {code} > I have to put the iceberg-flink-runtime-1.16-1.1.0.jar file inside the > `flink/lib` directory to make the jar loaded. This seems like a regression of > 1.16.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session
[ https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639718#comment-17639718 ] Steven Zhen Wu edited comment on FLINK-30035 at 11/27/22 8:53 PM: -- [~fsk119] here are the steps to reproduce. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client: ` ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar` * run SQL cmd {code} CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); {code} Then we shall see exception {code} [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] {code} Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. {code} cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded {code} was (Author: stevenz3wu): [~fsk119] here are the steps to reproduce. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client: ` ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar` * run SQL cmd ``` CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); ``` Then we shall see exception ``` [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] ``` Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. ``` cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded ``` > ./bin/sql-client.sh won't import external jar into the session > -- > > Key: FLINK-30035 > URL: https://issues.apache.org/jira/browse/FLINK-30035 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.16.0 >Reporter: Steven Zhen Wu >Priority: Major > > I used to be able to run the sql-client with iceberg-flink-runtime jar using > the `-j,--jar ` option (e.g. with 1.15.2). > {code} > ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar > {code} > With 1.16.0, this doesn't work anymore. As a result, I am seeing > ClassNotFoundException. > {code} > java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog > {code} > I have to put the iceberg-flink-runtime-1.16-1.1.0.jar file inside the > `flink/lib` directory to make the jar loaded. This seems like a regression of > 1.16.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session
[ https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639718#comment-17639718 ] Steven Zhen Wu commented on FLINK-30035: [~fsk119] here are the steps to reproduce. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client: ` ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar` * run SQL cmd ``` CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); ``` Then we shall see exception ``` [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] ``` Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. ``` cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded ``` > ./bin/sql-client.sh won't import external jar into the session > -- > > Key: FLINK-30035 > URL: https://issues.apache.org/jira/browse/FLINK-30035 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.16.0 >Reporter: Steven Zhen Wu >Priority: Major > > I used to be able to run the sql-client with iceberg-flink-runtime jar using > the `-j,--jar ` option (e.g. with 1.15.2). > {code} > ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar > {code} > With 1.16.0, this doesn't work anymore. As a result, I am seeing > ClassNotFoundException. > {code} > java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog > {code} > I have to put the iceberg-flink-runtime-1.16-1.1.0.jar file inside the > `flink/lib` directory to make the jar loaded. This seems like a regression of > 1.16.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector
Jiabao-Sun commented on code in PR #1: URL: https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032994738 ## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSampleSplitter.java: ## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.source.enumerator.splitter; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.mongodb.source.config.MongoReadOptions; +import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit; + +import com.mongodb.MongoNamespace; +import com.mongodb.client.model.Aggregates; +import com.mongodb.client.model.Projections; +import com.mongodb.client.model.Sorts; +import org.bson.BsonDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_KEY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_KEY; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_HINT; + +/** + * Sample Partitioner + * + * Samples the collection to generate partitions. + * + * Uses the average document size to split the collection into average sized chunks + * + * The partitioner samples the collection, projects and sorts by the partition fields. Then uses + * every {@code samplesPerPartition} as the value to use to calculate the partition boundaries. + * + * + * scan.partition.size: The average size (MB) for each partition. Note: Uses the average + * document size to determine the number of documents per partition so may not be even. + * Defaults to: 64mb. + * scan.partition.samples: The number of samples to take per partition. Defaults to: 10. The + * total number of samples taken is calculated as: {@code samples per partition * (count of + * documents / number of documents per partition)}. Review Comment: > If multiple samples are taken per partition then somewhere in here we'd have to merge sample to arrive at a single partition again, but afaict that doesn't happen. We merge samples in the following code. ```java List sourceSplits = new ArrayList<>(); BsonDocument partitionStart = new BsonDocument(ID_FIELD, BSON_MIN_KEY); int splitNum = 0; for (int i = 0; i < samples.size(); i++) { if (i % samplesPerPartition == 0 || i == samples.size() - 1) { sourceSplits.add( createSplit(namespace, splitNum++, partitionStart, samples.get(i))); partitionStart = samples.get(i); } } ``` > Instead we have some strange formula that determines the number of samples (read: partitions), and I have no idea how the resulting partitions could correlate with the desired partition size. > > Why isnt the number of sample (again: partitions) not count / numDocumentsPerPartition? 1. numDocumentsPerPartition = partitionSizeInBytes / avgObjSizeInBytes 2. samplingRate = samplesPerPartition / numDocumentsPerPartition 3. samplesCount = samplingRate * count 4. merge samples by samplesPerPartition We calculate the sampling rate through samples per partition and partition size. We can also be accomplished directly by setting the sampling rate. @zentol `scan.partition.samples` or `scan.partition.sampling-rate` which do you think is better? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector
Jiabao-Sun commented on code in PR #1: URL: https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032991900 ## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoRowDataLookupFunction.java: ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions; +import org.apache.flink.connector.mongodb.table.converter.BsonToRowDataConverters; +import org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.LookupFunction; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import com.mongodb.MongoException; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import org.bson.BsonDocument; +import org.bson.conversions.Bson; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static com.mongodb.client.model.Filters.and; +import static com.mongodb.client.model.Filters.eq; +import static org.apache.flink.connector.mongodb.common.utils.MongoUtils.project; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A lookup function for {@link MongoDynamicTableSource}. */ +@Internal +public class MongoRowDataLookupFunction extends LookupFunction { + +private static final Logger LOG = LoggerFactory.getLogger(MongoRowDataLookupFunction.class); +private static final long serialVersionUID = 1L; + +private final MongoConnectionOptions connectionOptions; +private final int maxRetries; +private final long retryIntervalMs; + +private final List fieldNames; +private final List keyNames; + +private final BsonToRowDataConverters.BsonToRowDataConverter mongoRowConverter; +private final RowDataToBsonConverters.RowDataToBsonConverter lookupKeyRowConverter; + +private transient MongoClient mongoClient; + +public MongoRowDataLookupFunction( +MongoConnectionOptions connectionOptions, +int maxRetries, +long retryIntervalMs, +List fieldNames, +List fieldTypes, +List keyNames, +RowType rowType) { +checkNotNull(fieldNames, "No fieldNames supplied."); +checkNotNull(fieldTypes, "No fieldTypes supplied."); +checkNotNull(keyNames, "No keyNames supplied."); +this.connectionOptions = checkNotNull(connectionOptions); +this.maxRetries = maxRetries; +this.retryIntervalMs = retryIntervalMs; +this.fieldNames = fieldNames; +this.mongoRowConverter = BsonToRowDataConverters.createNullableConverter(rowType); + +this.keyNames = keyNames; +LogicalType[] keyTypes = +this.keyNames.stream() +.map( +s -> { +checkArgument( +fieldNames.contains(s), Review Comment: Thanks. Table API guarantees this to be the case. I'll remove that check. > org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 131 to line 1, column 133: Column 'f18' not found in table 'D' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector
Jiabao-Sun commented on code in PR #1: URL: https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032644671 ## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSink.java: ## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions; +import org.apache.flink.connector.mongodb.sink.MongoSink; +import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions; +import org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters; +import org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters.RowDataToBsonConverter; +import org.apache.flink.connector.mongodb.table.serialization.MongoRowDataSerializationSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkV2Provider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.function.SerializableFunction; + +import org.bson.BsonValue; + +import javax.annotation.Nullable; + +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A {@link DynamicTableSink} for MongoDB. */ +@Internal +public class MongoDynamicTableSink implements DynamicTableSink { + +private final MongoConnectionOptions connectionOptions; +private final MongoWriteOptions writeOptions; +@Nullable private final Integer parallelism; +private final DataType physicalRowDataType; +private final SerializableFunction keyExtractor; + +public MongoDynamicTableSink( +MongoConnectionOptions connectionOptions, +MongoWriteOptions writeOptions, +@Nullable Integer parallelism, +DataType physicalRowDataType, +SerializableFunction keyExtractor) { +this.connectionOptions = checkNotNull(connectionOptions); +this.writeOptions = checkNotNull(writeOptions); +this.parallelism = parallelism; +this.physicalRowDataType = checkNotNull(physicalRowDataType); +this.keyExtractor = checkNotNull(keyExtractor); +} + +@Override +public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { +// UPSERT mode +ChangelogMode.Builder builder = ChangelogMode.newBuilder(); +for (RowKind kind : requestedMode.getContainedKinds()) { +if (kind != RowKind.UPDATE_BEFORE) { Review Comment: ~~This connector can support writing in both append-only and upsert modes. I'm not sure if an explicitly upsert will force a primary key to be defined.~~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector
Jiabao-Sun commented on code in PR #1: URL: https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032989565 ## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSink.java: ## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions; +import org.apache.flink.connector.mongodb.sink.MongoSink; +import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions; +import org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters; +import org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters.RowDataToBsonConverter; +import org.apache.flink.connector.mongodb.table.serialization.MongoRowDataSerializationSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkV2Provider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.function.SerializableFunction; + +import org.bson.BsonValue; + +import javax.annotation.Nullable; + +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A {@link DynamicTableSink} for MongoDB. */ +@Internal +public class MongoDynamicTableSink implements DynamicTableSink { + +private final MongoConnectionOptions connectionOptions; +private final MongoWriteOptions writeOptions; +@Nullable private final Integer parallelism; +private final DataType physicalRowDataType; +private final SerializableFunction keyExtractor; + +public MongoDynamicTableSink( +MongoConnectionOptions connectionOptions, +MongoWriteOptions writeOptions, +@Nullable Integer parallelism, +DataType physicalRowDataType, +SerializableFunction keyExtractor) { +this.connectionOptions = checkNotNull(connectionOptions); +this.writeOptions = checkNotNull(writeOptions); +this.parallelism = parallelism; +this.physicalRowDataType = checkNotNull(physicalRowDataType); +this.keyExtractor = checkNotNull(keyExtractor); +} + +@Override +public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { +// UPSERT mode +ChangelogMode.Builder builder = ChangelogMode.newBuilder(); +for (RowKind kind : requestedMode.getContainedKinds()) { +if (kind != RowKind.UPDATE_BEFORE) { Review Comment: We have added tests for these two scenarios in E2E. Explicitly set an upsert `ChangelogMode` won't force a primary key to be defined. So change the `ChangelogMode` to upsert mode here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector
Jiabao-Sun commented on code in PR #1: URL: https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032983609 ## flink-connector-mongodb-e2e-tests/src/test/java/org/apache/flink/tests/util/mongodb/MongoE2ECase.java: ## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.mongodb; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.FlinkContainersSettings; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; +import org.apache.flink.test.resources.ResourceTestUtils; +import org.apache.flink.test.util.SQLJobSubmission; + +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoDatabase; +import org.bson.Document; +import org.bson.types.ObjectId; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** End-to-end test for the MongoDB connectors. */ +@Testcontainers +class MongoE2ECase { + +private static final Logger LOG = LoggerFactory.getLogger(MongoE2ECase.class); + +private static final String MONGODB_HOSTNAME = "mongodb"; + +private static final String MONGO_4_0 = "mongo:4.0.10"; Review Comment: In the previous modification, we ignored the test-jar compilation of `flink-connector-mongodb`. In order to reuse `MongoTestUtil` in `e2e-tests` module, do we need to compile test-jar? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector
Jiabao-Sun commented on code in PR #1: URL: https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032983150 ## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.source.reader.split; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions; +import org.apache.flink.connector.mongodb.source.config.MongoReadOptions; +import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit; +import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit; +import org.apache.flink.util.CollectionUtil; + +import com.mongodb.MongoException; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoCursor; +import org.bson.BsonDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; + +import static org.apache.flink.connector.mongodb.common.utils.MongoUtils.project; + +/** An split reader implements {@link SplitReader} for {@link MongoScanSourceSplit}. */ +@Internal +public class MongoScanSourceSplitReader implements MongoSourceSplitReader { + +private static final Logger LOG = LoggerFactory.getLogger(MongoScanSourceSplitReader.class); + +private final MongoConnectionOptions connectionOptions; +private final MongoReadOptions readOptions; +private final SourceReaderContext readerContext; +@Nullable private final List projectedFields; +private final int limit; + +private boolean closed = false; +private boolean finished = false; +private MongoClient mongoClient; +private MongoCursor currentCursor; +private MongoScanSourceSplit currentSplit; + +public MongoScanSourceSplitReader( +MongoConnectionOptions connectionOptions, +MongoReadOptions readOptions, +@Nullable List projectedFields, +int limit, +SourceReaderContext context) { +this.connectionOptions = connectionOptions; +this.readOptions = readOptions; +this.projectedFields = projectedFields; +this.limit = limit; +this.readerContext = context; +} + +@Override +public RecordsWithSplitIds fetch() throws IOException { +if (closed) { +throw new IllegalStateException("Cannot fetch records from a closed split reader"); +} + +RecordsBySplits.Builder builder = new RecordsBySplits.Builder<>(); + +// Return when no split registered to this reader. +if (currentSplit == null) { +return builder.build(); +} + +currentCursor = getOrCreateCursor(); +int fetchSize = readOptions.getFetchSize(); + +try { +for (int recordNum = 0; recordNum < fetchSize; recordNum++) { +if (currentCursor.hasNext()) { +builder.add(currentSplit, currentCursor.next()); +} else { +builder.addFinishedSplit(currentSplit.splitId()); +finished = true; +break; +} +} +return builder.build(); +} catch (MongoException e) { +throw new IOException("Scan records form MongoDB failed", e); +} finally { +if (finished) { +currentSplit = null; +releaseCursor(); +} +} +} + +@Override +public void
[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector
Jiabao-Sun commented on code in PR #1: URL: https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032983150 ## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.source.reader.split; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions; +import org.apache.flink.connector.mongodb.source.config.MongoReadOptions; +import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit; +import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit; +import org.apache.flink.util.CollectionUtil; + +import com.mongodb.MongoException; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoCursor; +import org.bson.BsonDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; + +import static org.apache.flink.connector.mongodb.common.utils.MongoUtils.project; + +/** An split reader implements {@link SplitReader} for {@link MongoScanSourceSplit}. */ +@Internal +public class MongoScanSourceSplitReader implements MongoSourceSplitReader { + +private static final Logger LOG = LoggerFactory.getLogger(MongoScanSourceSplitReader.class); + +private final MongoConnectionOptions connectionOptions; +private final MongoReadOptions readOptions; +private final SourceReaderContext readerContext; +@Nullable private final List projectedFields; +private final int limit; + +private boolean closed = false; +private boolean finished = false; +private MongoClient mongoClient; +private MongoCursor currentCursor; +private MongoScanSourceSplit currentSplit; + +public MongoScanSourceSplitReader( +MongoConnectionOptions connectionOptions, +MongoReadOptions readOptions, +@Nullable List projectedFields, +int limit, +SourceReaderContext context) { +this.connectionOptions = connectionOptions; +this.readOptions = readOptions; +this.projectedFields = projectedFields; +this.limit = limit; +this.readerContext = context; +} + +@Override +public RecordsWithSplitIds fetch() throws IOException { +if (closed) { +throw new IllegalStateException("Cannot fetch records from a closed split reader"); +} + +RecordsBySplits.Builder builder = new RecordsBySplits.Builder<>(); + +// Return when no split registered to this reader. +if (currentSplit == null) { +return builder.build(); +} + +currentCursor = getOrCreateCursor(); +int fetchSize = readOptions.getFetchSize(); + +try { +for (int recordNum = 0; recordNum < fetchSize; recordNum++) { +if (currentCursor.hasNext()) { +builder.add(currentSplit, currentCursor.next()); +} else { +builder.addFinishedSplit(currentSplit.splitId()); +finished = true; +break; +} +} +return builder.build(); +} catch (MongoException e) { +throw new IOException("Scan records form MongoDB failed", e); +} finally { +if (finished) { +currentSplit = null; +releaseCursor(); +} +} +} + +@Override +public void
[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector
Jiabao-Sun commented on code in PR #1: URL: https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032977743 ## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/assigner/MongoSplitAssigner.java: ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.source.enumerator.assigner; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.mongodb.source.enumerator.MongoSourceEnumState; +import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; +import java.util.Optional; + +/** The split assigner for {@link MongoSourceSplit}. */ +@Internal +public interface MongoSplitAssigner extends Serializable { + +/** + * Called to open the assigner to acquire any resources, like threads or network connections. + */ +void open(); + +/** + * Called to close the assigner, in case it holds on to any resources, like threads or network + * connections. + */ +void close() throws IOException; + +/** Gets the next split. */ +Optional getNext(); Review Comment: When all splits have been assigned, but the enumerator has not signaled no more splits yet. When the enumerator receives an empty split, it will check whether it should notify the reader to close. `MongoSourceEnumerator#assignSplits` ```java Optional split = splitAssigner.getNext(); if (split.isPresent()) { final MongoSourceSplit mongoSplit = split.get(); context.assignSplit(mongoSplit, nextAwaiting); awaitingReader.remove(); LOG.info("Assign split {} to subtask {}", mongoSplit, nextAwaiting); break; } else if (splitAssigner.noMoreSplits() && boundedness == Boundedness.BOUNDED) { LOG.info("All splits have been assigned"); context.registeredReaders().keySet().forEach(context::signalNoMoreSplits); break; } else { // there is no available splits by now, skip assigning break; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector
Jiabao-Sun commented on code in PR #1: URL: https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032971595 ## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoShardedSplitter.java: ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.source.enumerator.splitter; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit; +import org.apache.flink.util.FlinkRuntimeException; + +import com.mongodb.MongoException; +import com.mongodb.MongoNamespace; +import com.mongodb.client.MongoClient; +import org.bson.BsonDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; + +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.KEY_FIELD; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.MAX_FIELD; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.MIN_FIELD; +import static org.apache.flink.connector.mongodb.common.utils.MongoUtils.isShardedCollectionDropped; +import static org.apache.flink.connector.mongodb.common.utils.MongoUtils.readChunks; +import static org.apache.flink.connector.mongodb.common.utils.MongoUtils.readCollectionMetadata; + +/** + * Sharded Partitioner + * + * Uses the chunks collection and partitions the collection based on the sharded collections + * chunk ranges. + * + * The following config collections' read privilege is required. + * + * + * config.collections + * config.chunks + * + */ +@Internal +public class MongoShardedSplitter { + +private static final Logger LOG = LoggerFactory.getLogger(MongoShardedSplitter.class); + +public static final MongoShardedSplitter INSTANCE = new MongoShardedSplitter(); + +private MongoShardedSplitter() {} + +public Collection split(MongoSplitContext splitContext) { +MongoNamespace namespace = splitContext.getMongoNamespace(); +MongoClient mongoClient = splitContext.getMongoClient(); + +List chunks; +Optional collectionMetadata; +try { +collectionMetadata = readCollectionMetadata(mongoClient, namespace); +if (!collectionMetadata.isPresent()) { +LOG.error( +"Do sharded split failed, collection {} does not appear to be sharded.", +namespace); +throw new FlinkRuntimeException( +String.format( +"Do sharded split failed, %s is not a sharded collection.", +namespace)); +} + +if (isShardedCollectionDropped(collectionMetadata.get())) { +LOG.error("Do sharded split failed, collection {} was dropped.", namespace); +throw new FlinkRuntimeException( +String.format("Do sharded split failed, %s was dropped.", namespace)); +} + +chunks = readChunks(mongoClient, collectionMetadata.get()); +if (chunks.isEmpty()) { +LOG.error("Do sharded split failed, chunks of {} is empty.", namespace); +throw new FlinkRuntimeException( +String.format( +"Do sharded split failed, chunks of %s is empty.", namespace)); +} +} catch (MongoException e) { +LOG.error( +"Read chunks from {} failed with error message: {}", namespace, e.getMessage()); +throw new FlinkRuntimeException(e); +} + +List sourceSplits = new ArrayList<>(chunks.size()); +for (int i = 0; i < chunks.size(); i++) { +BsonDocument chunk = chunks.get(i); +sourceSplits.add( +new MongoScanSourceSplit( +String.format("%s_%d", namespace, i), Review Comment: How about we use the primary key (`_id` field mentioned above) of `config.chunks`?
[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector
Jiabao-Sun commented on code in PR #1: URL: https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032963748 ## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoShardedSplitter.java: ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.source.enumerator.splitter; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit; +import org.apache.flink.util.FlinkRuntimeException; + +import com.mongodb.MongoException; +import com.mongodb.MongoNamespace; +import com.mongodb.client.MongoClient; +import org.bson.BsonDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; + +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.KEY_FIELD; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.MAX_FIELD; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.MIN_FIELD; +import static org.apache.flink.connector.mongodb.common.utils.MongoUtils.isShardedCollectionDropped; +import static org.apache.flink.connector.mongodb.common.utils.MongoUtils.readChunks; +import static org.apache.flink.connector.mongodb.common.utils.MongoUtils.readCollectionMetadata; + +/** + * Sharded Partitioner + * + * Uses the chunks collection and partitions the collection based on the sharded collections + * chunk ranges. + * + * The following config collections' read privilege is required. + * + * + * config.collections + * config.chunks + * + */ +@Internal +public class MongoShardedSplitter { + +private static final Logger LOG = LoggerFactory.getLogger(MongoShardedSplitter.class); + +public static final MongoShardedSplitter INSTANCE = new MongoShardedSplitter(); + +private MongoShardedSplitter() {} + +public Collection split(MongoSplitContext splitContext) { +MongoNamespace namespace = splitContext.getMongoNamespace(); +MongoClient mongoClient = splitContext.getMongoClient(); + +List chunks; +Optional collectionMetadata; +try { +collectionMetadata = readCollectionMetadata(mongoClient, namespace); +if (!collectionMetadata.isPresent()) { +LOG.error( +"Do sharded split failed, collection {} does not appear to be sharded.", +namespace); +throw new FlinkRuntimeException( +String.format( +"Do sharded split failed, %s is not a sharded collection.", +namespace)); +} + +if (isShardedCollectionDropped(collectionMetadata.get())) { +LOG.error("Do sharded split failed, collection {} was dropped.", namespace); +throw new FlinkRuntimeException( +String.format("Do sharded split failed, %s was dropped.", namespace)); +} + +chunks = readChunks(mongoClient, collectionMetadata.get()); +if (chunks.isEmpty()) { +LOG.error("Do sharded split failed, chunks of {} is empty.", namespace); +throw new FlinkRuntimeException( +String.format( +"Do sharded split failed, chunks of %s is empty.", namespace)); +} +} catch (MongoException e) { +LOG.error( +"Read chunks from {} failed with error message: {}", namespace, e.getMessage()); +throw new FlinkRuntimeException(e); +} + +List sourceSplits = new ArrayList<>(chunks.size()); +for (int i = 0; i < chunks.size(); i++) { +BsonDocument chunk = chunks.get(i); Review Comment: Yes, there will be some extra fields. Do we need to use them to encode the split name of `MongoScanSourceSplit`? - `_id` field represents the primary key recorded in
[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector
Jiabao-Sun commented on code in PR #1: URL: https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032957642 ## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoShardedSplitter.java: ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.source.enumerator.splitter; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit; +import org.apache.flink.util.FlinkRuntimeException; + +import com.mongodb.MongoException; +import com.mongodb.MongoNamespace; +import com.mongodb.client.MongoClient; +import org.bson.BsonDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; + +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.KEY_FIELD; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.MAX_FIELD; +import static org.apache.flink.connector.mongodb.common.utils.MongoConstants.MIN_FIELD; +import static org.apache.flink.connector.mongodb.common.utils.MongoUtils.isShardedCollectionDropped; +import static org.apache.flink.connector.mongodb.common.utils.MongoUtils.readChunks; +import static org.apache.flink.connector.mongodb.common.utils.MongoUtils.readCollectionMetadata; + +/** + * Sharded Partitioner + * + * Uses the chunks collection and partitions the collection based on the sharded collections + * chunk ranges. + * + * The following config collections' read privilege is required. + * + * + * config.collections + * config.chunks + * + */ +@Internal +public class MongoShardedSplitter { + +private static final Logger LOG = LoggerFactory.getLogger(MongoShardedSplitter.class); + +public static final MongoShardedSplitter INSTANCE = new MongoShardedSplitter(); + +private MongoShardedSplitter() {} + +public Collection split(MongoSplitContext splitContext) { +MongoNamespace namespace = splitContext.getMongoNamespace(); +MongoClient mongoClient = splitContext.getMongoClient(); + +List chunks; +Optional collectionMetadata; +try { +collectionMetadata = readCollectionMetadata(mongoClient, namespace); +if (!collectionMetadata.isPresent()) { +LOG.error( +"Do sharded split failed, collection {} does not appear to be sharded.", +namespace); +throw new FlinkRuntimeException( +String.format( +"Do sharded split failed, %s is not a sharded collection.", +namespace)); +} + +if (isShardedCollectionDropped(collectionMetadata.get())) { +LOG.error("Do sharded split failed, collection {} was dropped.", namespace); +throw new FlinkRuntimeException( +String.format("Do sharded split failed, %s was dropped.", namespace)); +} + +chunks = readChunks(mongoClient, collectionMetadata.get()); +if (chunks.isEmpty()) { +LOG.error("Do sharded split failed, chunks of {} is empty.", namespace); +throw new FlinkRuntimeException( +String.format( +"Do sharded split failed, chunks of %s is empty.", namespace)); +} Review Comment: As long as we shard the collection, even if it is empty, a record will be generated in `config.chunks`. ```javascript { "_id" : ObjectId("63838c89ae7bc37861d753a7"), "uuid" : UUID("cce0b7c9-4c67-4d01-ad1f-ddc13d91dc49"), "min" : { "user_id" : { "$minKey" : 1 }, "product_no" : { "$minKey" : 1 }, "product_kind" : { "$minKey" : 1 } }, "max" : { "user_id" : { "$maxKey" : 1 }, "product_no" : { "$maxKey" : 1 }, "product_kind" : { "$maxKey" : 1 } }, "shard" : "rs0-shard", "lastmod" :
[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector
Jiabao-Sun commented on code in PR #1: URL: https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032955663 ## flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.source.reader.split; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions; +import org.apache.flink.connector.mongodb.source.config.MongoReadOptions; +import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit; +import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit; +import org.apache.flink.util.CollectionUtil; + +import com.mongodb.MongoException; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoCursor; +import org.bson.BsonDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; + +import static org.apache.flink.connector.mongodb.common.utils.MongoUtils.project; + +/** An split reader implements {@link SplitReader} for {@link MongoScanSourceSplit}. */ +@Internal +public class MongoScanSourceSplitReader implements MongoSourceSplitReader { + +private static final Logger LOG = LoggerFactory.getLogger(MongoScanSourceSplitReader.class); + +private final MongoConnectionOptions connectionOptions; +private final MongoReadOptions readOptions; +private final SourceReaderContext readerContext; +@Nullable private final List projectedFields; +private final int limit; + +private boolean closed = false; +private boolean finished = false; +private MongoClient mongoClient; +private MongoCursor currentCursor; +private MongoScanSourceSplit currentSplit; + +public MongoScanSourceSplitReader( +MongoConnectionOptions connectionOptions, +MongoReadOptions readOptions, +@Nullable List projectedFields, +int limit, +SourceReaderContext context) { +this.connectionOptions = connectionOptions; +this.readOptions = readOptions; +this.projectedFields = projectedFields; +this.limit = limit; +this.readerContext = context; +} + +@Override +public RecordsWithSplitIds fetch() throws IOException { +if (closed) { +throw new IllegalStateException("Cannot fetch records from a closed split reader"); +} + +RecordsBySplits.Builder builder = new RecordsBySplits.Builder<>(); + +// Return when no split registered to this reader. +if (currentSplit == null) { +return builder.build(); +} + +currentCursor = getOrCreateCursor(); +int fetchSize = readOptions.getFetchSize(); + +try { +for (int recordNum = 0; recordNum < fetchSize; recordNum++) { +if (currentCursor.hasNext()) { +builder.add(currentSplit, currentCursor.next()); +} else { +builder.addFinishedSplit(currentSplit.splitId()); +finished = true; +break; +} +} Review Comment: We use a [cursor](https://www.mongodb.com/docs/manual/reference/method/cursor.batchSize/#mongodb-method-cursor.batchSize) to request a batch of data from mongodb, the size of the batch depends on the configuration of `scan.cursor.batch-size`. No request will be made to mongodb until a batch of data in the cursor has
[GitHub] [flink] flinkbot commented on pull request #21403: [FLINK-29984][flink-metrics] Prometheus histogram minmax
flinkbot commented on PR #21403: URL: https://github.com/apache/flink/pull/21403#issuecomment-1328234175 ## CI report: * d916abd22d6bb510b39df1c51b04f74ad92c8d59 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] qingwei91 commented on a diff in pull request #21403: [FLINK-29984][flink-metrics] Prometheus histogram minmax
qingwei91 commented on code in PR #21403: URL: https://github.com/apache/flink/pull/21403#discussion_r1032920245 ## flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java: ## @@ -378,6 +390,22 @@ private void addSamples( addToList(labelValues, quantile.toString()), statistics.getQuantile(quantile))); } +if (this.histogramMaxEnabled) { +samples.add( +new MetricFamilySamples.Sample( +metricName, +labelNamesWithQuantile, +addToList(labelValues, "1.0"), +statistics.getMax())); +} +if (this.histogramMinEnabled) { +samples.add( +new MetricFamilySamples.Sample( +metricName, +labelNamesWithQuantile, +addToList(labelValues, "0.0"), +statistics.getMin())); +} Review Comment: Core of the changes, emit max as 1.0 and min as 0.0 when reporting to prometheus -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-29984) Flink Histogram not emitting min and max when using Prometheus Reporter
[ https://issues.apache.org/jira/browse/FLINK-29984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-29984: --- Labels: pull-request-available (was: ) > Flink Histogram not emitting min and max when using Prometheus Reporter > --- > > Key: FLINK-29984 > URL: https://issues.apache.org/jira/browse/FLINK-29984 > Project: Flink > Issue Type: New Feature > Components: Runtime / Metrics >Affects Versions: 1.16.0, 1.15.3 >Reporter: Lim Qing Wei >Assignee: Lim Qing Wei >Priority: Major > Labels: pull-request-available > > Flink Histogram when using the Prometheus Metrics Reporter only produces > * quantiles of 0.5, 0,75, 0.95, 0.98, 0.99, 0.999 > * count > > I think it would be a good idea to also produce min and max, as they are > already available in the state, we can model it as p0 and p1.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] qingwei91 opened a new pull request, #21403: [FLINK-29984][flink-metrics] Prometheus histogram minmax
qingwei91 opened a new pull request, #21403: URL: https://github.com/apache/flink/pull/21403 ## What is the purpose of the change Expose Histogram min and max when exporting to Prometheus, the data is already available, we just need to export it. This is enabled in both Prometheus and Prometheus Gateway reporter. ## Verifying this change This change added tests and can be verified as follows: * Added test to make sure min and max are emitted in prometheus reporter. ## Does this pull request potentially affect one of the following parts - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (docs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30222) Suspended a job in last-state mode bug
[ https://issues.apache.org/jira/browse/FLINK-30222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora updated FLINK-30222: --- Priority: Blocker (was: Major) > Suspended a job in last-state mode bug > -- > > Key: FLINK-30222 > URL: https://issues.apache.org/jira/browse/FLINK-30222 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: 1.16.0, kubernetes-operator-1.2.0 >Reporter: tanjialiang >Priority: Blocker > Fix For: kubernetes-operator-1.3.0 > > Attachments: image-2022-11-27-16-48-08-445.png > > > In flink 1.16.0, it support set kubernetes HA with options 'kubernetes', such > as 'high-availability: kubernetes'. But in kubernetes operator 1.2.0, I try > to suspended a job in last-state mode, it validate fail, because of 'Job > could not be upgraded with last-state while Kubernetes HA disabled'. > > I try to use kubectl patch to supsended a job with last-state > {code:sh} > kubectl -nbigdata-flink patch > flinkdeployments.flink.apache.org/streaming-638223bf650ac869689faa62-flink > --type=merge -p '{"spec": {"job": > {"state": "suspended", "upgradeMode": "last-state"}{code} > it found an error, because my kubernetes HA is disabled > {code:java} > Error from server: admission webhook "flinkoperator.flink.apache.org" denied > the request: Job could not be upgraded with last-state while Kubernetes HA > disabled {code} > but i enabled kubernetes HA with this follow options: > {code:yaml} > kubernetes.cluster-id: > high-availability: kubernetes > high-availability.storageDir: hdfs:///flink/recovery {code} > and i found flink kubernetes operator 1.2.0 validate the kubernetes HA in the > old options: > {code:yaml} > high-availability: > org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory > {code} > it may be in the > org.apache.flink.kubernetes.operator.utils.FlinkUtils#isKubernetesHAActivated > to judge. > !image-2022-11-27-16-48-08-445.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30222) Suspended a job in last-state mode bug
[ https://issues.apache.org/jira/browse/FLINK-30222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora updated FLINK-30222: --- Fix Version/s: kubernetes-operator-1.3.0 > Suspended a job in last-state mode bug > -- > > Key: FLINK-30222 > URL: https://issues.apache.org/jira/browse/FLINK-30222 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: 1.16.0, kubernetes-operator-1.2.0 >Reporter: tanjialiang >Priority: Major > Fix For: kubernetes-operator-1.3.0 > > Attachments: image-2022-11-27-16-48-08-445.png > > > In flink 1.16.0, it support set kubernetes HA with options 'kubernetes', such > as 'high-availability: kubernetes'. But in kubernetes operator 1.2.0, I try > to suspended a job in last-state mode, it validate fail, because of 'Job > could not be upgraded with last-state while Kubernetes HA disabled'. > > I try to use kubectl patch to supsended a job with last-state > {code:sh} > kubectl -nbigdata-flink patch > flinkdeployments.flink.apache.org/streaming-638223bf650ac869689faa62-flink > --type=merge -p '{"spec": {"job": > {"state": "suspended", "upgradeMode": "last-state"}{code} > it found an error, because my kubernetes HA is disabled > {code:java} > Error from server: admission webhook "flinkoperator.flink.apache.org" denied > the request: Job could not be upgraded with last-state while Kubernetes HA > disabled {code} > but i enabled kubernetes HA with this follow options: > {code:yaml} > kubernetes.cluster-id: > high-availability: kubernetes > high-availability.storageDir: hdfs:///flink/recovery {code} > and i found flink kubernetes operator 1.2.0 validate the kubernetes HA in the > old options: > {code:yaml} > high-availability: > org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory > {code} > it may be in the > org.apache.flink.kubernetes.operator.utils.FlinkUtils#isKubernetesHAActivated > to judge. > !image-2022-11-27-16-48-08-445.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30222) Suspended a job in last-state mode bug
[ https://issues.apache.org/jira/browse/FLINK-30222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639640#comment-17639640 ] Gyula Fora commented on FLINK-30222: Good catch! Would you like to work on this ticket? The current workaround is to set: ``` high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory ``` instead of simply `kubernetes` > Suspended a job in last-state mode bug > -- > > Key: FLINK-30222 > URL: https://issues.apache.org/jira/browse/FLINK-30222 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: 1.16.0, kubernetes-operator-1.2.0 >Reporter: tanjialiang >Priority: Major > Attachments: image-2022-11-27-16-48-08-445.png > > > In flink 1.16.0, it support set kubernetes HA with options 'kubernetes', such > as 'high-availability: kubernetes'. But in kubernetes operator 1.2.0, I try > to suspended a job in last-state mode, it validate fail, because of 'Job > could not be upgraded with last-state while Kubernetes HA disabled'. > > I try to use kubectl patch to supsended a job with last-state > {code:sh} > kubectl -nbigdata-flink patch > flinkdeployments.flink.apache.org/streaming-638223bf650ac869689faa62-flink > --type=merge -p '{"spec": {"job": > {"state": "suspended", "upgradeMode": "last-state"}{code} > it found an error, because my kubernetes HA is disabled > {code:java} > Error from server: admission webhook "flinkoperator.flink.apache.org" denied > the request: Job could not be upgraded with last-state while Kubernetes HA > disabled {code} > but i enabled kubernetes HA with this follow options: > {code:yaml} > kubernetes.cluster-id: > high-availability: kubernetes > high-availability.storageDir: hdfs:///flink/recovery {code} > and i found flink kubernetes operator 1.2.0 validate the kubernetes HA in the > old options: > {code:yaml} > high-availability: > org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory > {code} > it may be in the > org.apache.flink.kubernetes.operator.utils.FlinkUtils#isKubernetesHAActivated > to judge. > !image-2022-11-27-16-48-08-445.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30222) Suspended a job in last-state mode bug
tanjialiang created FLINK-30222: --- Summary: Suspended a job in last-state mode bug Key: FLINK-30222 URL: https://issues.apache.org/jira/browse/FLINK-30222 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.2.0, 1.16.0 Reporter: tanjialiang Attachments: image-2022-11-27-16-48-08-445.png In flink 1.16.0, it support set kubernetes HA with options 'kubernetes', such as 'high-availability: kubernetes'. But in kubernetes operator 1.2.0, I try to suspended a job in last-state mode, it validate fail, because of 'Job could not be upgraded with last-state while Kubernetes HA disabled'. I try to use kubectl patch to supsended a job with last-state {code:sh} kubectl -nbigdata-flink patch flinkdeployments.flink.apache.org/streaming-638223bf650ac869689faa62-flink --type=merge -p '{"spec": {"job": {"state": "suspended", "upgradeMode": "last-state"}{code} it found an error, because my kubernetes HA is disabled {code:java} Error from server: admission webhook "flinkoperator.flink.apache.org" denied the request: Job could not be upgraded with last-state while Kubernetes HA disabled {code} but i enabled kubernetes HA with this follow options: {code:yaml} kubernetes.cluster-id: high-availability: kubernetes high-availability.storageDir: hdfs:///flink/recovery {code} and i found flink kubernetes operator 1.2.0 validate the kubernetes HA in the old options: {code:yaml} high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory {code} it may be in the org.apache.flink.kubernetes.operator.utils.FlinkUtils#isKubernetesHAActivated to judge. !image-2022-11-27-16-48-08-445.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)