[jira] [Closed] (FLINK-30223) Refactor Lock to provide Lock.Factory

2022-11-27 Thread Caizhi Weng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Caizhi Weng closed FLINK-30223.
---
Resolution: Fixed

master: 6886303b2f482f2f31c7b98221691a650c1e67d3

> Refactor Lock to provide Lock.Factory
> -
>
> Key: FLINK-30223
> URL: https://issues.apache.org/jira/browse/FLINK-30223
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.3.0
>
>
> For the core, it should not see too many Flink Table concepts, such as 
> database and tableName. It only needs to create a Lock.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] tsreaper merged pull request #405: [FLINK-30223] Refactor Lock to provide Lock.Factory

2022-11-27 Thread GitBox


tsreaper merged PR #405:
URL: https://github.com/apache/flink-table-store/pull/405


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Zakelly commented on a diff in pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

2022-11-27 Thread GitBox


Zakelly commented on code in PR #21362:
URL: https://github.com/apache/flink/pull/21362#discussion_r1033205960


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java:
##
@@ -72,6 +73,10 @@ public void setCurrentKey(@Nonnull K currentKey) {
 
 @Override
 public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) {
+if (!keyGroupRange.contains(currentKeyGroupIndex)) {
+throw KeyGroupRangeOffsets.newIllegalKeyGroupException(
+currentKeyGroupIndex, keyGroupRange);
+}

Review Comment:
   > I'm afraid that we cannot avoid the check on accessing state as users 
might provide a non-deterministic hashCode for the current key.
   
   Hi @Myasuka , I checked the code in ```StateTable```, and it seems that in 
most state accessing cases (expect queryable state) we are checking the key 
group from ```keyContext.getCurrentKeyGroupIndex()``` instead of calculating it 
by ```hashCode``` of the partitioned key. So actually we are checking the same 
value when ```setCurrentKeyGroupIndex``` or state accessing, whether the 
hashCode implementation is deterministic or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-30185) Provide the flame graph to the subtask level

2022-11-27 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song reassigned FLINK-30185:


Assignee: Rui Fan

> Provide the flame graph to the subtask level
> 
>
> Key: FLINK-30185
> URL: https://issues.apache.org/jira/browse/FLINK-30185
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST, Runtime / Web Frontend
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
> Fix For: 1.17.0
>
> Attachments: image-2022-11-24-14-49-42-845.png, 
> image-2022-11-28-14-38-47-145.png, image-2022-11-28-14-48-20-462.png
>
>
> FLINK-13550 supported for CPU FlameGraphs in web UI.
> As Flink doc mentioned:
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/flame_graphs/#sampling-process
> {code:java}
> Note: Stack trace samples from all threads of an operator are combined 
> together. If a method call consumes 100% of the resources in one of the 
> parallel tasks but none in the others, the bottleneck might be obscured by 
> being averaged out.
> There are plans to address this limitation in the future by providing “drill 
> down” visualizations to the task level. {code}
>  
> The flame graph at the subtask level is very useful when a small number of 
> subtasks are bottlenecked. So we should provide the flame graph to the 
> subtask level
>  
> !image-2022-11-24-14-49-42-845.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30185) Provide the flame graph to the subtask level

2022-11-27 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639837#comment-17639837
 ] 

Xintong Song commented on FLINK-30185:
--

[~fanrui],

Sounds good to me. You are assigned. Please move ahead.

> Provide the flame graph to the subtask level
> 
>
> Key: FLINK-30185
> URL: https://issues.apache.org/jira/browse/FLINK-30185
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST, Runtime / Web Frontend
>Reporter: Rui Fan
>Priority: Major
> Fix For: 1.17.0
>
> Attachments: image-2022-11-24-14-49-42-845.png, 
> image-2022-11-28-14-38-47-145.png, image-2022-11-28-14-48-20-462.png
>
>
> FLINK-13550 supported for CPU FlameGraphs in web UI.
> As Flink doc mentioned:
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/flame_graphs/#sampling-process
> {code:java}
> Note: Stack trace samples from all threads of an operator are combined 
> together. If a method call consumes 100% of the resources in one of the 
> parallel tasks but none in the others, the bottleneck might be obscured by 
> being averaged out.
> There are plans to address this limitation in the future by providing “drill 
> down” visualizations to the task level. {code}
>  
> The flame graph at the subtask level is very useful when a small number of 
> subtasks are bottlenecked. So we should provide the flame graph to the 
> subtask level
>  
> !image-2022-11-24-14-49-42-845.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30206) Allow FileStoreScan to read incremental changes from OVERWRITE snapshot in Table Store

2022-11-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30206:
---
Labels: pull-request-available  (was: )

> Allow FileStoreScan to read incremental changes from OVERWRITE snapshot in 
> Table Store
> --
>
> Key: FLINK-30206
> URL: https://issues.apache.org/jira/browse/FLINK-30206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Affects Versions: table-store-0.3.0
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.3.0
>
>
> Currently {{AbstractFileStoreScan}} can only read incremental changes from 
> APPEND snapshots. However in OVERWRITE snapshots, users will also append new 
> records to table. These changes must be discovered by compact job source so 
> that the overwritten partition can be compacted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] tsreaper opened a new pull request, #406: [FLINK-30206] Allow FileStoreScan to read incremental changes from OVERWRITE snapshot in Table Store

2022-11-27 Thread GitBox


tsreaper opened a new pull request, #406:
URL: https://github.com/apache/flink-table-store/pull/406

   Currently `AbstractFileStoreScan` can only read incremental changes from 
APPEND snapshots. However in OVERWRITE snapshots, users will also append new 
records to table. These changes must be discovered by compact job source so 
that the overwritten partition can be compacted.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] shuiqiangchen commented on pull request #20745: [FLINK-28988] Don't push filters down into the right table for temporal join

2022-11-27 Thread GitBox


shuiqiangchen commented on PR #20745:
URL: https://github.com/apache/flink/pull/20745#issuecomment-1328639959

   @lincoln-lil Thanks for your detailed explanation in online and offline 
discussions. I have simplified the implementation with the consensus we had 
reached.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30221) Fix the bug of sum(try_cast(string as bigint)) return null when partial elements can't convert to bigint

2022-11-27 Thread Tony Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639823#comment-17639823
 ] 

Tony Zhu commented on FLINK-30221:
--

Could you assign to me and I'd like to try the fix.

> Fix the bug of sum(try_cast(string as bigint)) return null when partial 
> elements can't convert to bigint
> 
>
> Key: FLINK-30221
> URL: https://issues.apache.org/jira/browse/FLINK-30221
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Runtime
>Affects Versions: 1.17.0
>Reporter: dalongliu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30158) [Flink SQL][Protobuf] NullPointerException when querying Kafka topic using repeated or map attributes

2022-11-27 Thread Tony Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639822#comment-17639822
 ] 

Tony Zhu commented on FLINK-30158:
--

[~jamesmcguirepro]  could you provide more info? I'd like to take a look the 
detail.

> [Flink SQL][Protobuf] NullPointerException when querying Kafka topic using 
> repeated or map attributes
> -
>
> Key: FLINK-30158
> URL: https://issues.apache.org/jira/browse/FLINK-30158
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>Affects Versions: 1.16.0
>Reporter: James Mcguire
>Priority: Major
>
> I am encountering a {{java.lang.NullPointerException}} exception when trying 
> to use Flink SQL to query a kafka topic that uses either {{repeated}} and/or 
> {{map}} attributes.
>  
> {*}{*}{*}Replication{*} *steps*
>  # Use a protobuf definition that either uses repeated and/or map.  This 
> protobuf schema should cover a few of the problematic scenarios I ran into:
>  
> {code:java}
> syntax = "proto3";
> package example.message;
> option java_package = "com.example.message";
> option java_multiple_files = true;
> message NestedType {
>   int64 nested_first = 1;
>   oneof nested_second {
> int64 one_of_first = 2;
> string one_of_second = 3;
>   }
> }
> message Test {
>   repeated int64 first = 1;
>   map second = 2;
> } {code}
> 2. Attempt query on topic, even excluding problematic columns:
>  
> {code:java}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.formats.protobuf.PbCodegenException: 
> java.lang.NullPointerException{code}
>  
>  
> log file:
>  
> {code:java}
> 2022-11-22 15:33:59,510 WARN  org.apache.flink.table.client.cli.CliClient 
>  [] - Could not execute SQL 
> statement.org.apache.flink.table.client.gateway.SqlExecutionException: Error 
> while retrieving result.at 
> org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:79)
>  ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: java.lang.RuntimeException: 
> Failed to fetch next resultat 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
>  ~[flink-dist-1.16.0.jar:1.16.0]at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
>  ~[flink-dist-1.16.0.jar:1.16.0]at 
> org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
>  ~[?:?]at 
> org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75)
>  ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: java.io.IOException: Failed 
> to fetch job execution resultat 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184)
>  ~[flink-dist-1.16.0.jar:1.16.0]at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
>  ~[flink-dist-1.16.0.jar:1.16.0]at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
>  ~[flink-dist-1.16.0.jar:1.16.0]at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
>  ~[flink-dist-1.16.0.jar:1.16.0]at 
> org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
>  ~[?:?]at 
> org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75)
>  ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: 
> java.util.concurrent.ExecutionException: 
> org.apache.flink.client.program.ProgramInvocationException: Job failed 
> (JobID: bc869097009a92d0601add881a6b920c)at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
> ~[?:?]at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) 
> ~[?:?]at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182)
>  ~[flink-dist-1.16.0.jar:1.16.0]at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
>  ~[flink-dist-1.16.0.jar:1.16.0]at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
>  ~[flink-dist-1.16.0.jar:1.16.0]at 
> 

[jira] [Closed] (FLINK-29987) PartialUpdateITCase.testForeignKeyJo is unstable

2022-11-27 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-29987.

Resolution: Fixed

master: cd0870bab446ad8e91dab3ddd3b3b6e7ef71612f

> PartialUpdateITCase.testForeignKeyJo is unstable
> 
>
> Key: FLINK-29987
> URL: https://issues.apache.org/jira/browse/FLINK-29987
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Alex Sorokoumov
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.3.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi merged pull request #404: [FLINK-29987] Use Awaitility in PartialUpdateITCase#testForeignKeyJoin

2022-11-27 Thread GitBox


JingsongLi merged PR #404:
URL: https://github.com/apache/flink-table-store/pull/404


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #405: [FLINK-30223] Refactor Lock to provide Lock.Factory

2022-11-27 Thread GitBox


JingsongLi commented on code in PR #405:
URL: https://github.com/apache/flink-table-store/pull/405#discussion_r1033178800


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java:
##
@@ -31,13 +32,57 @@ public interface Lock extends AutoCloseable {
 /** Run with lock. */
  T runWithLock(Callable callable) throws Exception;
 
-@Nullable
-static Lock fromCatalog(CatalogLock.Factory lockFactory, ObjectPath 
tablePath) {
-if (lockFactory == null) {
-return null;
+/** A factory to create {@link Lock}. */
+interface Factory extends Serializable {
+Lock create();
+}
+
+static Factory factory(@Nullable CatalogLock.Factory lockFactory, 
ObjectPath tablePath) {
+return lockFactory == null
+? new EmptyFactory()
+: new CatalogLockFactory(lockFactory, tablePath);
+}
+
+static Factory emptyFactory() {
+return new EmptyFactory();
+}
+
+/** A {@link Factory} creating lock from catalog. */
+class CatalogLockFactory implements Factory {
+
+private static final long serialVersionUID = 1L;
+
+private final CatalogLock.Factory lockFactory;
+private final ObjectPath tablePath;
+
+public CatalogLockFactory(CatalogLock.Factory lockFactory, ObjectPath 
tablePath) {
+this.lockFactory = lockFactory;
+this.tablePath = tablePath;
+}
+
+@Override
+public Lock create() {
+return fromCatalog(lockFactory.create(), tablePath);
 }
+}
 
-return fromCatalog(lockFactory.create(), tablePath);
+/** A {@link Factory} creating empty lock. */
+class EmptyFactory implements Factory {
+
+private static final long serialVersionUID = 1L;
+
+@Override
+public Lock create() {
+return new Lock() {
+@Override
+public  T runWithLock(Callable callable) throws 
Exception {
+return callable.call();
+}
+
+@Override
+public void close() {}
+};

Review Comment:
   I think we can return a object to avoid null checking.
   I will modify others to remove null checking.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-30205) Modify compact interface for TableWrite and FileStoreWrite to support normal compaction in Table Store

2022-11-27 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-30205.

Resolution: Fixed

master: e6e62699ef42a90d9eb2ca73b10460eb2764a586

> Modify compact interface for TableWrite and FileStoreWrite to support normal 
> compaction in Table Store
> --
>
> Key: FLINK-30205
> URL: https://issues.apache.org/jira/browse/FLINK-30205
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Affects Versions: table-store-0.3.0
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.3.0
>
>
> Currently the {{compact}} interface in {{TableWrite}} and {{FileStoreWrite}} 
> can only trigger full compaction. However a separated compact job should not 
> only perform full compaction, but also perform normal compaction once in a 
> while, just like what the current Table Store sinks do.
> We need to modify compact interface for TableWrite and FileStoreWrite to 
> support normal compaction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #403: [FLINK-30205] Modify compact interface for TableWrite and FileStoreWrite to support normal compaction in Table Store

2022-11-27 Thread GitBox


JingsongLi commented on code in PR #403:
URL: https://github.com/apache/flink-table-store/pull/403#discussion_r1033177161


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java:
##
@@ -34,10 +34,12 @@
 void write(T record) throws Exception;
 
 /**
- * Compact all files related to the writer. Note that compaction process 
is only submitted and
- * may not be completed when the method returns.
+ * Compact files related to the writer. Note that compaction process is 
only submitted and may
+ * not be completed when the method returns.
+ *
+ * @param fullCompaction whether to trigger full compaction or just normal 
compaction

Review Comment:
   You are right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] JingsongLi merged pull request #403: [FLINK-30205] Modify compact interface for TableWrite and FileStoreWrite to support normal compaction in Table Store

2022-11-27 Thread GitBox


JingsongLi merged PR #403:
URL: https://github.com/apache/flink-table-store/pull/403


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30185) Provide the flame graph to the subtask level

2022-11-27 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639816#comment-17639816
 ] 

Rui Fan commented on FLINK-30185:
-

Hi [~xtsong] , thanks for your reply.

The improvement mainly includes 2 parts: 
 # How the web frontend show the flame_graph for single subtask?
 # How the backend save or fetch the thread info sample for single subtask?

h2. Web Frontend

It's similar with Metrics, we need to add a select box that select subtaskIndex 
all or one subtaskIndex.

And pass the subtaskIndex to backend.

!image-2022-11-28-14-48-20-462.png!

!image-2022-11-28-14-38-47-145.png|width=783,height=286!

 
h2. Backend
h3. 1. Refactor the cache logic

Currently, the cache key of ThreadInfo is jobId + JobVertexId. The cache key 
should be changed to jobId + jobVertexId + subtaskIndex.
h3. 2. Add the subtaskIndex

Allow request threadInfo from single subtask.

 

If anything is wrong or missed, please let me know, thanks!

> Provide the flame graph to the subtask level
> 
>
> Key: FLINK-30185
> URL: https://issues.apache.org/jira/browse/FLINK-30185
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST, Runtime / Web Frontend
>Reporter: Rui Fan
>Priority: Major
> Fix For: 1.17.0
>
> Attachments: image-2022-11-24-14-49-42-845.png, 
> image-2022-11-28-14-38-47-145.png, image-2022-11-28-14-48-20-462.png
>
>
> FLINK-13550 supported for CPU FlameGraphs in web UI.
> As Flink doc mentioned:
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/flame_graphs/#sampling-process
> {code:java}
> Note: Stack trace samples from all threads of an operator are combined 
> together. If a method call consumes 100% of the resources in one of the 
> parallel tasks but none in the others, the bottleneck might be obscured by 
> being averaged out.
> There are plans to address this limitation in the future by providing “drill 
> down” visualizations to the task level. {code}
>  
> The flame graph at the subtask level is very useful when a small number of 
> subtasks are bottlenecked. So we should provide the flame graph to the 
> subtask level
>  
> !image-2022-11-24-14-49-42-845.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30185) Provide the flame graph to the subtask level

2022-11-27 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-30185:

Attachment: image-2022-11-28-14-48-20-462.png

> Provide the flame graph to the subtask level
> 
>
> Key: FLINK-30185
> URL: https://issues.apache.org/jira/browse/FLINK-30185
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST, Runtime / Web Frontend
>Reporter: Rui Fan
>Priority: Major
> Fix For: 1.17.0
>
> Attachments: image-2022-11-24-14-49-42-845.png, 
> image-2022-11-28-14-38-47-145.png, image-2022-11-28-14-48-20-462.png
>
>
> FLINK-13550 supported for CPU FlameGraphs in web UI.
> As Flink doc mentioned:
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/flame_graphs/#sampling-process
> {code:java}
> Note: Stack trace samples from all threads of an operator are combined 
> together. If a method call consumes 100% of the resources in one of the 
> parallel tasks but none in the others, the bottleneck might be obscured by 
> being averaged out.
> There are plans to address this limitation in the future by providing “drill 
> down” visualizations to the task level. {code}
>  
> The flame graph at the subtask level is very useful when a small number of 
> subtasks are bottlenecked. So we should provide the flame graph to the 
> subtask level
>  
> !image-2022-11-24-14-49-42-845.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30185) Provide the flame graph to the subtask level

2022-11-27 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-30185:

Attachment: image-2022-11-28-14-38-47-145.png

> Provide the flame graph to the subtask level
> 
>
> Key: FLINK-30185
> URL: https://issues.apache.org/jira/browse/FLINK-30185
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST, Runtime / Web Frontend
>Reporter: Rui Fan
>Priority: Major
> Fix For: 1.17.0
>
> Attachments: image-2022-11-24-14-49-42-845.png, 
> image-2022-11-28-14-38-47-145.png
>
>
> FLINK-13550 supported for CPU FlameGraphs in web UI.
> As Flink doc mentioned:
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/flame_graphs/#sampling-process
> {code:java}
> Note: Stack trace samples from all threads of an operator are combined 
> together. If a method call consumes 100% of the resources in one of the 
> parallel tasks but none in the others, the bottleneck might be obscured by 
> being averaged out.
> There are plans to address this limitation in the future by providing “drill 
> down” visualizations to the task level. {code}
>  
> The flame graph at the subtask level is very useful when a small number of 
> subtasks are bottlenecked. So we should provide the flame graph to the 
> subtask level
>  
> !image-2022-11-24-14-49-42-845.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] rkhachatryan commented on a diff in pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

2022-11-27 Thread GitBox


rkhachatryan commented on code in PR #21362:
URL: https://github.com/apache/flink/pull/21362#discussion_r1033165454


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java:
##
@@ -556,9 +559,27 @@ public void testScaleUp() throws Exception {
 3,
 2,
 operatorSubtaskState3)) {
-testHarness1.processElement1(new StreamRecord<>("trigger"));
-testHarness2.processElement1(new StreamRecord<>("trigger"));
-testHarness3.processElement1(new StreamRecord<>("trigger"));
+
+// Since there is a keyed operator, we should follow the key 
partition rules.
+Map, 
KeyGroupRange>
+keyGroupPartition = new HashMap<>();
+keyGroupPartition.put(testHarness1, KeyGroupRange.of(0, 3));
+keyGroupPartition.put(testHarness2, KeyGroupRange.of(4, 6));
+keyGroupPartition.put(testHarness3, KeyGroupRange.of(7, 9));
+while (!keyGroupPartition.isEmpty()) {
+String triggerKey = 
String.valueOf(ThreadLocalRandom.current().nextLong());
+for (Map.Entry<
+TwoInputStreamOperatorTestHarness,
+KeyGroupRange>
+entry : keyGroupPartition.entrySet()) {
+if (entry.getValue()
+
.contains(KeyGroupRangeAssignment.assignToKeyGroup(triggerKey, 10))) {
+entry.getKey().processElement1(new 
StreamRecord<>(triggerKey));
+keyGroupPartition.remove(entry.getKey());
+break;
+}
+}
+}

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-15635) Allow passing a ClassLoader to EnvironmentSettings

2022-11-27 Thread dalongliu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dalongliu updated FLINK-15635:
--
Release Note: 
TableEnvironment introduces a user class loader to have a consistent class 
loading behavior in table programs, SQL Client and SQL Gateway. The user 
classloader manages all user jars such as jar added by `ADD JAR` or `CREATE 
FUNCTION .. USING JAR ..` statements. User-defined 
functions/connectors/catalogs should replace 
`Thread.currentThread().getContextClassLoader()` with the user class loader to 
load classes. Otherwise, ClassNotFoundException maybe thrown. The user class 
loader can be accessed via `FunctionContext#getUserCodeClassLoader`, 
`DynamicTableFactory.Context#getClassLoader` and 
`CatalogFactory.Context#getClassLoader`.  
If you have used thread context classloader to load your user class before 
1.15, after upgrade 1.16, this is an incompatible behavior because of the table 
planner classloader, so you should change your code by using `ADD JAR` syntax 
to add customer jar to planner classloader firstly, then the framework helps 
you to load the class when needed, this simplify your work related with 
classloader.

  was:TableEnvironment introduces a user class loader to have a consistent 
class loading behavior in table programs, SQL Client and SQL Gateway. The user 
classloader manages all user jars such as jar added by `ADD JAR` or `CREATE 
FUNCTION .. USING JAR ..` statements. User-defined 
functions/connectors/catalogs should replace 
`Thread.currentThread().getContextClassLoader()` with the user class loader to 
load classes. Otherwise, ClassNotFoundException maybe thrown. The user class 
loader can be accessed via `FunctionContext#getUserCodeClassLoader`, 
`DynamicTableFactory.Context#getClassLoader` and 
`CatalogFactory.Context#getClassLoader`.


> Allow passing a ClassLoader to EnvironmentSettings
> --
>
> Key: FLINK-15635
> URL: https://issues.apache.org/jira/browse/FLINK-15635
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> We had a couple of class loading issues in the past because people forgot to 
> use the right classloader in {{flink-table}}. The SQL Client executor code 
> hacks a classloader into the planner process by using {{wrapClassLoader}} 
> that sets the threads context classloader.
> Instead we should allow passing a class loader to environment settings. This 
> class loader can be passed to the planner and can be stored in table 
> environment, table config, etc. to have a consistent class loading behavior.
> Having this in place should replace the need for 
> {{Thread.currentThread().getContextClassLoader()}} in the entire 
> {{flink-table}} module.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] Myasuka commented on a diff in pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

2022-11-27 Thread GitBox


Myasuka commented on code in PR #21362:
URL: https://github.com/apache/flink/pull/21362#discussion_r1033158877


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java:
##
@@ -72,6 +73,10 @@ public void setCurrentKey(@Nonnull K currentKey) {
 
 @Override
 public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) {
+if (!keyGroupRange.contains(currentKeyGroupIndex)) {
+throw KeyGroupRangeOffsets.newIllegalKeyGroupException(
+currentKeyGroupIndex, keyGroupRange);
+}

Review Comment:
   I'm afraid that we cannot avoid the check on accessing state as users might 
provide a non-deterministic hashCode for the current key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-14055) Add advanced function DDL syntax "USING JAR"

2022-11-27 Thread dalongliu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dalongliu updated FLINK-14055:
--
Release Note: 
In 1.16, we introduced the `CREATE FUNCTION ... USING JAR` syntax to support 
the dynamic loading of the UDF jar in per job, which is convenient for platform 
developers to easily achieve UDF management. In addition, we also port the `ADD 
JAR` syntax from SqlClient to `TableEnvironment` side,  this allows the syntax 
is more general to Table API users. However, due to inconsistent classloader in 
StreamExecutionEnvironment and TableEnvironment, the `ADD JAR` syntax is not 
available for Table API program currently, it will be resolved by 
https://issues.apache.org/jira/browse/FLINK-29240.



More information about this feature could be found in 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-function.

  was:
In 1.16, we introduced the `CREATE FUNCTION ... USING JAR` syntax to support 
the dynamic loading of the UDF jar in per job, which is convenient for platform 
developers to easily achieve UDF management. In addition, we also port the `ADD 
JAR` syntax from SqlClient to `TableEnvironment` side,  this allows the syntax 
is more general to Table API users. However, due to inconsistent classloader in 
StreamExecutionEnvironment and TableEnvironment, the `ADD JAR` syntax is not 
available for Table API program currently, it will be resolved by 
https://issues.apache.org/jira/browse/FLINK-29240

More information about this feature could be found in 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-function.


> Add advanced function DDL syntax "USING JAR"
> 
>
> Key: FLINK-14055
> URL: https://issues.apache.org/jira/browse/FLINK-14055
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: BL
>Assignee: dalongliu
>Priority: Major
>  Labels: auto-unassigned, sprint
> Fix For: 1.16.0
>
>
> As 
> [FLIP-214|https://cwiki.apache.org/confluence/display/FLINK/FLIP-214+Support+Advanced+Function+DDL]
>   propose, this ticket is to support dynamically loading functions from 
> external source in function DDL with advanced syntax like:
>  
> {code:java}
> CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF NOT EXISTS] 
> [catalog_name.db_name.]function_name AS class_name [LANGUAGE 
> JAVA|SCALA|PYTHON] [USING JAR‘resource_path’ [, JAR ‘resource_path’]*]; {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30184) Save TM/JM thread stack periodically

2022-11-27 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan closed FLINK-30184.
---
Resolution: Won't Do

> Save TM/JM thread stack periodically
> 
>
> Key: FLINK-30184
> URL: https://issues.apache.org/jira/browse/FLINK-30184
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Reporter: Rui Fan
>Priority: Major
> Fix For: 1.17.0
>
>
> After FLINK-14816 FLINK-25398 and FLINK-25372 , flink user can view the 
> thread stack of TM/JM in Flink WebUI. 
> It can help flink users to find out why the Flink job is stuck, or why the 
> processing is slow. It is very useful for trouble shooting.
> However, sometimes Flink tasks get stuck or process slowly, but when the user 
> troubleshoots the problem, the job has resumed. It is difficult to find out 
> what happened to the Flink job at the time and why is it slow?
>  
> So, could we periodically save the thread stack of TM or JM in the TM log 
> directory?
> Define some configurations:
> cluster.thread-dump.interval=1min
> cluster.thread-dump.cleanup-time=48 hours



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-30184) Save TM/JM thread stack periodically

2022-11-27 Thread Rui Fan (Jira)


[ https://issues.apache.org/jira/browse/FLINK-30184 ]


Rui Fan deleted comment on FLINK-30184:
-

was (Author: fanrui):
Hi [~xtsong] , please help take a look in your free time. And if it makes 
sense, please assign it to me, thanks~

> Save TM/JM thread stack periodically
> 
>
> Key: FLINK-30184
> URL: https://issues.apache.org/jira/browse/FLINK-30184
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Reporter: Rui Fan
>Priority: Major
> Fix For: 1.17.0
>
>
> After FLINK-14816 FLINK-25398 and FLINK-25372 , flink user can view the 
> thread stack of TM/JM in Flink WebUI. 
> It can help flink users to find out why the Flink job is stuck, or why the 
> processing is slow. It is very useful for trouble shooting.
> However, sometimes Flink tasks get stuck or process slowly, but when the user 
> troubleshoots the problem, the job has resumed. It is difficult to find out 
> what happened to the Flink job at the time and why is it slow?
>  
> So, could we periodically save the thread stack of TM or JM in the TM log 
> directory?
> Define some configurations:
> cluster.thread-dump.interval=1min
> cluster.thread-dump.cleanup-time=48 hours



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30184) Save TM/JM thread stack periodically

2022-11-27 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639802#comment-17639802
 ] 

Rui Fan commented on FLINK-30184:
-

Hi [~xtsong] , thanks for your explanation.

It sounds reasonable, I will close this JIRA.

> Save TM/JM thread stack periodically
> 
>
> Key: FLINK-30184
> URL: https://issues.apache.org/jira/browse/FLINK-30184
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Reporter: Rui Fan
>Priority: Major
> Fix For: 1.17.0
>
>
> After FLINK-14816 FLINK-25398 and FLINK-25372 , flink user can view the 
> thread stack of TM/JM in Flink WebUI. 
> It can help flink users to find out why the Flink job is stuck, or why the 
> processing is slow. It is very useful for trouble shooting.
> However, sometimes Flink tasks get stuck or process slowly, but when the user 
> troubleshoots the problem, the job has resumed. It is difficult to find out 
> what happened to the Flink job at the time and why is it slow?
>  
> So, could we periodically save the thread stack of TM or JM in the TM log 
> directory?
> Define some configurations:
> cluster.thread-dump.interval=1min
> cluster.thread-dump.cleanup-time=48 hours



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] 1996fanrui commented on pull request #21398: [FLINK-30214][mini_cluster] MiniCluster adapt the jobvertex-parallelism-overrides

2022-11-27 Thread GitBox


1996fanrui commented on PR #21398:
URL: https://github.com/apache/flink/pull/21398#issuecomment-1328573385

   Hi @gyfora @mxm 
   
   As we know, when the flink job run on IDEA, it will use the MiniCluster. And 
`PerJobMiniClusterFactory#getMiniClusterConfig` generate the 
`numSlotsPerTaskManager` by the JobGraph. However, the `numSlotsPerTaskManager` 
may be wrong after using `jobvertex-parallelism-overrides`. So the 
`numSlotsPerTaskManager` should be generated according to the JobGraph and 
`jobvertex-parallelism-overrides`.
   
   Please help take a look this PR in your free time, thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] SmirAlex commented on pull request #20919: [FLINK-29405] Fix unstable test InputFormatCacheLoaderTest

2022-11-27 Thread GitBox


SmirAlex commented on PR #20919:
URL: https://github.com/apache/flink/pull/20919#issuecomment-1328553285

   Hi @XComp, thanks for the review! I answered on your comments and added a 
commit with fix. Have a look when you will be available, please.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] SmirAlex commented on a diff in pull request #20919: [FLINK-29405] Fix unstable test InputFormatCacheLoaderTest

2022-11-27 Thread GitBox


SmirAlex commented on code in PR #20919:
URL: https://github.com/apache/flink/pull/20919#discussion_r1033131501


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java:
##
@@ -107,7 +109,11 @@ protected void reloadCache() throws Exception {
 } catch (InterruptedException ignored) { // we use interrupt to close 
reload thread
 } finally {
 if (cacheLoadTaskService != null) {
+// if main cache reload thread encountered an exception,
+// it interrupts underlying InputSplitCacheLoadTasks threads
 cacheLoadTaskService.shutdownNow();

Review Comment:
   To be honest, during development I didn't think about relying on 
ForkJoinPool. As I understand, it's very useful when we have many little tasks, 
which can create other little subtasks. But in our case we have fixed amount ( 
= number of splits) of long running tasks, that won't create other subtasks. So 
simple fixed thread pool looked as a pretty straightforward way to implement 
cache loading. Plus, if we will use commonForkPool, there can be a situation 
when we will utilize all threads in this pool (our tasks are long-running), and 
other tasks that relies on commonForkPool with be starving, which is 
undesirable behavior, as I understand. Correct me, if I'm wrong, please



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] SmirAlex commented on a diff in pull request #20919: [FLINK-29405] Fix unstable test InputFormatCacheLoaderTest

2022-11-27 Thread GitBox


SmirAlex commented on code in PR #20919:
URL: https://github.com/apache/flink/pull/20919#discussion_r1033131501


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java:
##
@@ -107,7 +109,11 @@ protected void reloadCache() throws Exception {
 } catch (InterruptedException ignored) { // we use interrupt to close 
reload thread
 } finally {
 if (cacheLoadTaskService != null) {
+// if main cache reload thread encountered an exception,
+// it interrupts underlying InputSplitCacheLoadTasks threads
 cacheLoadTaskService.shutdownNow();

Review Comment:
   To be honest, during development I didn't think about relying on 
ForkJoinPool. As I understand, it's very useful when we have many little tasks, 
that also can be created in already running tasks. But in our case we have 
fixed amount ( = number of splits) of long running tasks, that won't create 
other subtasks. So simple fixed thread pool looked as a pretty straightforward 
way to implement cache loading. Plus, if we will use commonForkPool, there can 
be a situation when we will utilize all threads in this pool (our tasks are 
long-running), and other tasks that relies on commonForkPool with be starving, 
which is undesirable behavior, as I understand. Correct me, if I'm wrong, please



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21406: [FLINK-30088][runtime] Fix excessive state updates for TtlMapState and TtlListState

2022-11-27 Thread GitBox


flinkbot commented on PR #21406:
URL: https://github.com/apache/flink/pull/21406#issuecomment-1328531809

   
   ## CI report:
   
   * cf8b7c1de5e3dfbe83106a4de89423e17e36e50e UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30088) Excessive state updates for TtlMapState and TtlListState

2022-11-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30088:
---
Labels: pull-request-available  (was: )

> Excessive state updates for TtlMapState and TtlListState
> 
>
> Key: FLINK-30088
> URL: https://issues.apache.org/jira/browse/FLINK-30088
> Project: Flink
>  Issue Type: Bug
>Reporter: Roman Boyko
>Assignee: Roman Boyko
>Priority: Minor
>  Labels: pull-request-available
> Attachments: image-2022-11-18-20-25-14-466.png, 
> image-2022-11-18-20-27-24-054.png
>
>
> After merging the FLINK-21413 every ttl check for cleanup for TtlMapState and 
> TtlListState (even without expired elements) leads to whole state update.
> This is because:
> - comparison by link inside `TtlIncrementalCleanup`:
> !image-2022-11-18-20-25-14-466.png|width=450,height=288!
> - and creating new map or list inside TtlMapState or TtlListState:
> !image-2022-11-18-20-27-24-054.png|width=477,height=365!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] rovboyko opened a new pull request, #21406: [FLINK-30088][runtime] Fix excessive state updates for TtlMapState and TtlListState

2022-11-27 Thread GitBox


rovboyko opened a new pull request, #21406:
URL: https://github.com/apache/flink/pull/21406

   …d TtlListState
   
   
   
   ## What is the purpose of the change
   Avoid unnecessary state updates for TtlMapState and TtlListState in case 
they are not changed. This affects only HashMapStateBackend, MemoryStateBackend 
and FsStateBackend.
   
   
   
   ## Brief change log
   
   - add if condition to return original List if no records were expired for 
TtlListState
   - add if condition to return original Map if no records were expired for 
TtlMapState
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   - add testStateNotChangedWithoutCleanup to TtlStateTestBase
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): yes
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] SmirAlex commented on a diff in pull request #20919: [FLINK-29405] Fix unstable test InputFormatCacheLoaderTest

2022-11-27 Thread GitBox


SmirAlex commented on code in PR #20919:
URL: https://github.com/apache/flink/pull/20919#discussion_r1033120828


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java:
##
@@ -39,12 +39,14 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /** {@link CacheLoader} that used {@link InputFormat} for loading data into 
the cache. */
 public class InputFormatCacheLoader extends CacheLoader {
 private static final long serialVersionUID = 1L;
 private static final Logger LOG = 
LoggerFactory.getLogger(InputFormatCacheLoader.class);
+private static final long TIMEOUT_AFTER_INTERRUPT = 10; // 10 sec

Review Comment:
   Agree, fixed



##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java:
##
@@ -107,7 +109,11 @@ protected void reloadCache() throws Exception {
 } catch (InterruptedException ignored) { // we use interrupt to close 
reload thread
 } finally {
 if (cacheLoadTaskService != null) {
+// if main cache reload thread encountered an exception,
+// it interrupts underlying InputSplitCacheLoadTasks threads
 cacheLoadTaskService.shutdownNow();
+// timeout 10 sec should definitely be enough to wait for 
finish after interrupt

Review Comment:
   Agree, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30185) Provide the flame graph to the subtask level

2022-11-27 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639778#comment-17639778
 ] 

Xintong Song commented on FLINK-30185:
--

I think this makes a nice improvement. Could you explain a bit more in detail 
how do you plan to do this?

> Provide the flame graph to the subtask level
> 
>
> Key: FLINK-30185
> URL: https://issues.apache.org/jira/browse/FLINK-30185
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST, Runtime / Web Frontend
>Reporter: Rui Fan
>Priority: Major
> Fix For: 1.17.0
>
> Attachments: image-2022-11-24-14-49-42-845.png
>
>
> FLINK-13550 supported for CPU FlameGraphs in web UI.
> As Flink doc mentioned:
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/flame_graphs/#sampling-process
> {code:java}
> Note: Stack trace samples from all threads of an operator are combined 
> together. If a method call consumes 100% of the resources in one of the 
> parallel tasks but none in the others, the bottleneck might be obscured by 
> being averaged out.
> There are plans to address this limitation in the future by providing “drill 
> down” visualizations to the task level. {code}
>  
> The flame graph at the subtask level is very useful when a small number of 
> subtasks are bottlenecked. So we should provide the flame graph to the 
> subtask level
>  
> !image-2022-11-24-14-49-42-845.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-23035) Add explicit method to StateChangelogWriter to write metadata

2022-11-27 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639777#comment-17639777
 ] 

Yanfei Lei commented on FLINK-23035:


[~binh] of course, welcome to help review.

> Add explicit method to StateChangelogWriter to write metadata
> -
>
> Key: FLINK-23035
> URL: https://issues.apache.org/jira/browse/FLINK-23035
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / State Backends
>Reporter: Roman Khachatryan
>Assignee: Xinbin Huang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Currently, metadata is written to the state changelog using the same 
> StateChangelogWriter.append() method as data.
> However, it doesn't belong to a specific group, and should be read first on 
> recovery. Because of that, -1 is used.
> An explicit append() without keygroup would be less fragile (probably still 
> using -1 under the hood).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30184) Save TM/JM thread stack periodically

2022-11-27 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639775#comment-17639775
 ] 

Xintong Song commented on FLINK-30184:
--

[~fanrui], sorry for the late response.

I agree with [~wangyang0918] that this is probably more suitable for an 
external service that manages / monitors Flink.

Thread dumps are for debugging and should not be activated constantly given the 
performance impact. Flink already offers rest api for capturing thread stacks 
of 
[jobmanager|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobmanager-thread-dump]
 and 
[taskmanager|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#taskmanagers-taskmanagerid-thread-dump].
 It should be easy for an external monitoring system to capture the dumps when 
the job is detected to be slow.

> Save TM/JM thread stack periodically
> 
>
> Key: FLINK-30184
> URL: https://issues.apache.org/jira/browse/FLINK-30184
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Reporter: Rui Fan
>Priority: Major
> Fix For: 1.17.0
>
>
> After FLINK-14816 FLINK-25398 and FLINK-25372 , flink user can view the 
> thread stack of TM/JM in Flink WebUI. 
> It can help flink users to find out why the Flink job is stuck, or why the 
> processing is slow. It is very useful for trouble shooting.
> However, sometimes Flink tasks get stuck or process slowly, but when the user 
> troubleshoots the problem, the job has resumed. It is difficult to find out 
> what happened to the Flink job at the time and why is it slow?
>  
> So, could we periodically save the thread stack of TM or JM in the TM log 
> directory?
> Define some configurations:
> cluster.thread-dump.interval=1min
> cluster.thread-dump.cleanup-time=48 hours



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-23035) Add explicit method to StateChangelogWriter to write metadata

2022-11-27 Thread Xinbin Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639773#comment-17639773
 ] 

Xinbin Huang commented on FLINK-23035:
--

hey [~Yanfei Lei] , sorry I missed the message from roman, and I can pick it up 
in the next week. But I just saw you've opened a PR for this, so I think we can 
follow up on your PR?

 

> Add explicit method to StateChangelogWriter to write metadata
> -
>
> Key: FLINK-23035
> URL: https://issues.apache.org/jira/browse/FLINK-23035
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / State Backends
>Reporter: Roman Khachatryan
>Assignee: Xinbin Huang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Currently, metadata is written to the state changelog using the same 
> StateChangelogWriter.append() method as data.
> However, it doesn't belong to a specific group, and should be read first on 
> recovery. Because of that, -1 is used.
> An explicit append() without keygroup would be less fragile (probably still 
> using -1 under the hood).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-24870) Cannot cast "java.util.Date" to "java.time.Instant"

2022-11-27 Thread dalongliu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639769#comment-17639769
 ] 

dalongliu commented on FLINK-24870:
---

[~wangbaohua] I pull your code from gitee, but test can't be run because your 
project has some extra jar dependency about `com.asap.rule`, can you also share 
this code with me? So I can reproduce the problem.

> Cannot cast "java.util.Date" to "java.time.Instant"
> ---
>
> Key: FLINK-24870
> URL: https://issues.apache.org/jira/browse/FLINK-24870
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.13.1
>Reporter: wangbaohua
>Priority: Major
>
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>         at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:76)
>         at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:80)
>         ... 11 more
> Caused by: 
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
>  org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
>         at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:74)
>         ... 12 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
>         at 
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:89)
>         at 
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:74)
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
>         ... 15 more
> Caused by: org.codehaus.commons.compiler.CompileException: Line 120, Column 
> 101: Cannot cast "java.util.Date" to "java.time.Instant"
>         at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211)
>         at 
> org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5051)
>         at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
>         at 
> org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4418)
>         at 
> org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4396)
>         at org.codehaus.janino.Java$Cast.accept(Java.java:4898)
>         at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
>         at 
> org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5057)
>         at org.codehaus.janino.UnitCompiler.access$8100(UnitCompiler.java:215)
>         at 
> org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4409)
>         at 
> org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4400)
>         at 
> org.codehaus.janino.Java$ParenthesizedExpression.accept(Java.java:4924)
>         at 
> 

[GitHub] [flink] Zakelly commented on a diff in pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

2022-11-27 Thread GitBox


Zakelly commented on code in PR #21362:
URL: https://github.com/apache/flink/pull/21362#discussion_r1033104544


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java:
##
@@ -72,6 +73,10 @@ public void setCurrentKey(@Nonnull K currentKey) {
 
 @Override
 public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) {
+if (!keyGroupRange.contains(currentKeyGroupIndex)) {
+throw KeyGroupRangeOffsets.newIllegalKeyGroupException(
+currentKeyGroupIndex, keyGroupRange);
+}

Review Comment:
   Actually I don't like the idea of checking if current key group is valid 
when accessing the state, since the problem is from setting the key group 
instead of accessing the state. Besides, user may set key group once and access 
the state several times, so for performance concern, I'd rather remove the 
check in each state access and only keep the check in this PR. WDYT? @Myasuka 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21405: [FLINK-23035][state/changelog] Add explicit append() to StateChangelogWriter to write metadata

2022-11-27 Thread GitBox


flinkbot commented on PR #21405:
URL: https://github.com/apache/flink/pull/21405#issuecomment-1328493977

   
   ## CI report:
   
   * fdc39492b315dbecc5b4cff8f222caa53eebee87 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-23035) Add explicit method to StateChangelogWriter to write metadata

2022-11-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-23035:
---
Labels: pull-request-available  (was: )

> Add explicit method to StateChangelogWriter to write metadata
> -
>
> Key: FLINK-23035
> URL: https://issues.apache.org/jira/browse/FLINK-23035
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / State Backends
>Reporter: Roman Khachatryan
>Assignee: Xinbin Huang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Currently, metadata is written to the state changelog using the same 
> StateChangelogWriter.append() method as data.
> However, it doesn't belong to a specific group, and should be read first on 
> recovery. Because of that, -1 is used.
> An explicit append() without keygroup would be less fragile (probably still 
> using -1 under the hood).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] fredia opened a new pull request, #21405: [FLINK-23035][state/changelog] Add explicit append() to StateChangelogWriter to write metadata

2022-11-27 Thread GitBox


fredia opened a new pull request, #21405:
URL: https://github.com/apache/flink/pull/21405

   
   
   ## What is the purpose of the change
   
   Add explicit append() to StateChangelogWriter to write metadata.
   
   
   ## Brief change log
   
 - Add explicit `StateChange(byte[] value)`
 - Add explicit `append(byte[] value)`

   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fsk119 commented on a diff in pull request #21133: [FLINK-29732][sql-gateway] support configuring session with SQL statement.

2022-11-27 Thread GitBox


fsk119 commented on code in PR #21133:
URL: https://github.com/apache/flink/pull/21133#discussion_r1033065242


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java:
##
@@ -79,6 +79,23 @@ public void closeSession(SessionHandle sessionHandle) throws 
SqlGatewayException
 }
 }
 
+@Override
+public void configureSession(
+SessionHandle sessionHandle, String statement, long 
executionTimeoutMs)
+throws SqlGatewayException {
+try {
+if (executionTimeoutMs > 0) {
+// TODO: support the feature in FLINK-27838
+throw new UnsupportedOperationException(
+"SqlGatewayService doesn't support timeout mechanism 
now.");
+}
+
getSession(sessionHandle).createExecutor().configureSession(statement);

Review Comment:
   It's not thread-safe here.



##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##
@@ -84,6 +91,39 @@ public OperationExecutor(SessionContext context, 
Configuration executionConfig)
 this.executionConfig = executionConfig;
 }
 
+public void configureSession(String statement) throws ExecutionException, 
InterruptedException {
+TableEnvironmentInternal tableEnv = getTableEnvironment();
+List parsedOperations = 
tableEnv.getParser().parse(statement);
+if (parsedOperations.size() > 1) {
+throw new UnsupportedOperationException(
+"Unsupported SQL statement! Execute statement only accepts 
a single SQL statement or "
++ "multiple 'INSERT INTO' statements wrapped in a 
'STATEMENT SET' block.");
+}
+Operation op = parsedOperations.get(0);

Review Comment:
   Can we reuse the code as we do in the CliClient? I think we can introduce an 
enum `ExecutionMode` and validate the statement kind in the `INITIALIZATION` 
mode.



##
flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java:
##
@@ -66,6 +66,14 @@ public interface SqlGatewayService {
  */
 void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;
 
+/**
+ * Using the statement to initialize the Session. It's only allowed to 
execute
+ * SET/RESET/CREATE/DROP/USE/ALTER/LOAD MODULE/UNLOAD MODULE/ADD JAR. The 
execution should be
+ * finished before returning because jobs submitted later may depend on it.
+ */

Review Comment:
   It's better if we can align with other java docs.
   
   ```
/**
  * Configure the basic settings for the session, including configuring the 
settings, preparing the catalog and so 
  *  on.
  *
  * @param sessionHandle handle to identify the Session needs to be closed.
  * 
  */
   ```



##
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##
@@ -157,6 +168,83 @@ public void testOpenSessionWithEnvironment() throws 
Exception {
 assertThat(tableEnv.listModules()).contains(moduleName);
 }
 
+@Test
+public void testConfigureSessionWithLegalStatement(@TempDir 
java.nio.file.Path tmpDir)
+throws Exception {
+SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+
+// SET & RESET
+service.configureSession(sessionHandle, "SET 'key1' = 'value1';", 0);
+Map config = new HashMap<>();
+config.put("key1", "value1");
+
assertThat(service.getSessionConfig(sessionHandle)).containsAllEntriesOf(config);
+
+service.configureSession(sessionHandle, "RESET 'key1';", 0);
+
assertThat(service.getSessionConfig(sessionHandle)).doesNotContainEntry("key1", 
"value1");
+
+// CREATE & USE & ALTER & DROP
+service.configureSession(
+sessionHandle,
+"CREATE CATALOG mycat with ('type' = 'generic_in_memory', 
'default-database' = 'db');",
+0);
+
+service.configureSession(sessionHandle, "USE CATALOG mycat;", 0);
+
assertThat(service.getCurrentCatalog(sessionHandle)).isEqualTo("mycat");
+
+service.configureSession(
+sessionHandle,
+"CREATE TABLE db.tbl (score INT) WITH ('connector' = 
'datagen');",
+0);
+
+Set tableKinds = new HashSet<>();
+tableKinds.add(TableKind.TABLE);
+assertThat(service.listTables(sessionHandle, "mycat", "db", 
tableKinds))
+.contains(
+new TableInfo(ObjectIdentifier.of("mycat", "db", 
"tbl"), TableKind.TABLE));
+
+service.configureSession(sessionHandle, "ALTER TABLE db.tbl RENAME TO 
tbl1;", 0);
+assertThat(service.listTables(sessionHandle, "mycat", "db", 
tableKinds))
+

[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #403: [FLINK-30205] Modify compact interface for TableWrite and FileStoreWrite to support normal compaction in Table Store

2022-11-27 Thread GitBox


tsreaper commented on code in PR #403:
URL: https://github.com/apache/flink-table-store/pull/403#discussion_r1033078213


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java:
##
@@ -34,10 +34,12 @@
 void write(T record) throws Exception;
 
 /**
- * Compact all files related to the writer. Note that compaction process 
is only submitted and
- * may not be completed when the method returns.
+ * Compact files related to the writer. Note that compaction process is 
only submitted and may
+ * not be completed when the method returns.
+ *
+ * @param fullCompaction whether to trigger full compaction or just normal 
compaction

Review Comment:
   From the point of a separated compact job, we should always wait for last 
compaction to complete. Otherwise new changes may never be compacted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #405: [FLINK-30223] Refactor Lock to provide Lock.Factory

2022-11-27 Thread GitBox


tsreaper commented on code in PR #405:
URL: https://github.com/apache/flink-table-store/pull/405#discussion_r1033077832


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java:
##
@@ -31,13 +32,57 @@ public interface Lock extends AutoCloseable {
 /** Run with lock. */
  T runWithLock(Callable callable) throws Exception;
 
-@Nullable
-static Lock fromCatalog(CatalogLock.Factory lockFactory, ObjectPath 
tablePath) {
-if (lockFactory == null) {
-return null;
+/** A factory to create {@link Lock}. */
+interface Factory extends Serializable {
+Lock create();
+}
+
+static Factory factory(@Nullable CatalogLock.Factory lockFactory, 
ObjectPath tablePath) {
+return lockFactory == null
+? new EmptyFactory()
+: new CatalogLockFactory(lockFactory, tablePath);
+}
+
+static Factory emptyFactory() {
+return new EmptyFactory();
+}
+
+/** A {@link Factory} creating lock from catalog. */
+class CatalogLockFactory implements Factory {
+
+private static final long serialVersionUID = 1L;
+
+private final CatalogLock.Factory lockFactory;
+private final ObjectPath tablePath;
+
+public CatalogLockFactory(CatalogLock.Factory lockFactory, ObjectPath 
tablePath) {
+this.lockFactory = lockFactory;
+this.tablePath = tablePath;
+}
+
+@Override
+public Lock create() {
+return fromCatalog(lockFactory.create(), tablePath);
 }
+}
 
-return fromCatalog(lockFactory.create(), tablePath);
+/** A {@link Factory} creating empty lock. */
+class EmptyFactory implements Factory {
+
+private static final long serialVersionUID = 1L;
+
+@Override
+public Lock create() {
+return new Lock() {
+@Override
+public  T runWithLock(Callable callable) throws 
Exception {
+return callable.call();
+}
+
+@Override
+public void close() {}
+};

Review Comment:
   Just return `null`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #403: [FLINK-30205] Modify compact interface for TableWrite and FileStoreWrite to support normal compaction in Table Store

2022-11-27 Thread GitBox


JingsongLi commented on code in PR #403:
URL: https://github.com/apache/flink-table-store/pull/403#discussion_r1033075278


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java:
##
@@ -34,10 +34,12 @@
 void write(T record) throws Exception;
 
 /**
- * Compact all files related to the writer. Note that compaction process 
is only submitted and
- * may not be completed when the method returns.
+ * Compact files related to the writer. Note that compaction process is 
only submitted and may
+ * not be completed when the method returns.
+ *
+ * @param fullCompaction whether to trigger full compaction or just normal 
compaction

Review Comment:
   I think there are two things:
   - Is this compaction full or normal.
   - Should we wait latest compaction.
   
   I think we can separate these two things.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on pull request #405: [FLINK-30223] Refactor Lock to provide Lock.Factory

2022-11-27 Thread GitBox


JingsongLi commented on PR #405:
URL: 
https://github.com/apache/flink-table-store/pull/405#issuecomment-1328450069

   This is second refactor PR for 
https://github.com/apache/flink-table-store/pull/394


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30223) Refactor Lock to provide Lock.Factory

2022-11-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30223:
---
Labels: pull-request-available  (was: )

> Refactor Lock to provide Lock.Factory
> -
>
> Key: FLINK-30223
> URL: https://issues.apache.org/jira/browse/FLINK-30223
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.3.0
>
>
> For the core, it should not see too many Flink Table concepts, such as 
> database and tableName. It only needs to create a Lock.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi opened a new pull request, #405: [FLINK-30223] Refactor Lock to provide Lock.Factory

2022-11-27 Thread GitBox


JingsongLi opened a new pull request, #405:
URL: https://github.com/apache/flink-table-store/pull/405

   For the core, it should not see too many Flink Table concepts, such as 
database and tableName. It only needs to create a Lock.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-30223) Refactor Lock to provide Lock.Factory

2022-11-27 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-30223:


 Summary: Refactor Lock to provide Lock.Factory
 Key: FLINK-30223
 URL: https://issues.apache.org/jira/browse/FLINK-30223
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Reporter: Jingsong Lee
Assignee: Jingsong Lee
 Fix For: table-store-0.3.0


For the core, it should not see too many Flink Table concepts, such as database 
and tableName. It only needs to create a Lock.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] yangjf2019 commented on pull request #451: [FLINK-30186] Bump Flink version to 1.15.3

2022-11-27 Thread GitBox


yangjf2019 commented on PR #451:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/451#issuecomment-1328445247

   Hi, @gyfora The central repository has generated the corresponding 
dependencies, please trigger the CI again, thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30164) Expose BucketComputer from SupportsWrite

2022-11-27 Thread Caizhi Weng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639748#comment-17639748
 ] 

Caizhi Weng commented on FLINK-30164:
-

master: d8eb796f035f35e1ac85ff3f657452dd2a41e644

> Expose BucketComputer from SupportsWrite
> 
>
> Key: FLINK-30164
> URL: https://issues.apache.org/jira/browse/FLINK-30164
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.3.0
>
>
> When other engines dock with Sink, they need to know the corresponding bucket 
> rules before they can be correctly distributed to each bucket.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30164) Expose BucketComputer from SupportsWrite

2022-11-27 Thread Caizhi Weng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Caizhi Weng updated FLINK-30164:

Release Note:   (was: master: d8eb796f035f35e1ac85ff3f657452dd2a41e644)

> Expose BucketComputer from SupportsWrite
> 
>
> Key: FLINK-30164
> URL: https://issues.apache.org/jira/browse/FLINK-30164
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.3.0
>
>
> When other engines dock with Sink, they need to know the corresponding bucket 
> rules before they can be correctly distributed to each bucket.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30164) Expose BucketComputer from SupportsWrite

2022-11-27 Thread Caizhi Weng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Caizhi Weng closed FLINK-30164.
---
Release Note: master: d8eb796f035f35e1ac85ff3f657452dd2a41e644
  Resolution: Fixed

> Expose BucketComputer from SupportsWrite
> 
>
> Key: FLINK-30164
> URL: https://issues.apache.org/jira/browse/FLINK-30164
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.3.0
>
>
> When other engines dock with Sink, they need to know the corresponding bucket 
> rules before they can be correctly distributed to each bucket.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] tsreaper merged pull request #400: [FLINK-30164] Expose BucketComputer from SupportsWrite

2022-11-27 Thread GitBox


tsreaper merged PR #400:
URL: https://github.com/apache/flink-table-store/pull/400


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-29987) PartialUpdateITCase.testForeignKeyJo is unstable

2022-11-27 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-29987:


Assignee: Alex Sorokoumov

> PartialUpdateITCase.testForeignKeyJo is unstable
> 
>
> Key: FLINK-29987
> URL: https://issues.apache.org/jira/browse/FLINK-29987
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Alex Sorokoumov
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.3.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] 1996fanrui commented on pull request #21368: [FLINK-30165][runtime][JUnit5 Migration] Migrate unaligned checkpoint related tests under flink-runtime module to junit5

2022-11-27 Thread GitBox


1996fanrui commented on PR #21368:
URL: https://github.com/apache/flink/pull/21368#issuecomment-1328427197

   Hi @XComp @snuyanzin , I have addressed all comments, please help take a 
look, thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] sethsaperstein-lyft commented on a diff in pull request #20844: [FLINK-29099][connectors/kinesis] Update global watermark for idle subtask

2022-11-27 Thread GitBox


sethsaperstein-lyft commented on code in PR #20844:
URL: https://github.com/apache/flink/pull/20844#discussion_r1033036319


##
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java:
##
@@ -92,6 +104,7 @@ public long getUpdateTimeoutCount() {
 protected static class WatermarkUpdate implements Serializable {
 protected long watermark = Long.MIN_VALUE;
 protected String id;
+protected boolean updateLocalWatermark = true;

Review Comment:
   good suggestion. Thanks for taking a look. Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21404: [hotfix][docs] Update Zookeper version infromation

2022-11-27 Thread GitBox


flinkbot commented on PR #21404:
URL: https://github.com/apache/flink/pull/21404#issuecomment-1328366465

   
   ## CI report:
   
   * 1f3ac78db1fb4ae3562196f4a9bd8d51b4c383a8 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] t0560r91 opened a new pull request, #21404: [hotfix][docs] Update Zookeper version infromation

2022-11-27 Thread GitBox


t0560r91 opened a new pull request, #21404:
URL: https://github.com/apache/flink/pull/21404

   Updating the doc as Flink 1.15.2 ships with Zookeeper 3.5.9 in `lib` and 
3.6.3 in `opt` folder in reality.
   And removing a sentence making notes about Zookeeper 3.4 not being 
compatible with 3.5 because 3.4 is not included in Flink 1.15.2.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session

2022-11-27 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639718#comment-17639718
 ] 

Steven Zhen Wu edited comment on FLINK-30035 at 11/27/22 10:02 PM:
---

[~fsk119] here are the steps to reproduce with 1.16.0. note that jar import 
works fine for 1.15.2.

* download the [iceberg-flink-runtime 
jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar]
 without putting it into  Flink`lib` dir
* start sql-client with importing external jar
{code}
./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar
{code}
* run SQL cmd
{code}
CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='file:/Users/stevenwu/runtime/hdfs',
  'property-version'='1'
);
{code}

* Then we shall see exception
{code}
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodException: Cannot find constructor for interface 
org.apache.iceberg.catalog.Catalog
Missing org.apache.iceberg.hadoop.HadoopCatalog 
[java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog]
{code}

* We have to put the jar inside Flink `lib/` dir for the jar to be loaded. Then 
the same SQL cmd will execute fine.
{code}
cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/
./bin/sql-client.sh embedded
{code}





was (Author: stevenz3wu):
[~fsk119] here are the steps to reproduce with 1.16.0. note that jar import 
works fine for 1.15.2.

* download the [iceberg-flink-runtime 
jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar]
 without putting it into  Flink`lib` dir
* start sql-client with importing external jar
{code}
./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar
{code}
* run SQL cmd
{code}
CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='file:/Users/stevenwu/runtime/hdfs',
  'property-version'='1'
);
{code}

* Then we shall see exception
{code}
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodException: Cannot find constructor for interface 
org.apache.iceberg.catalog.Catalog
Missing org.apache.iceberg.hadoop.HadoopCatalog 
[java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog]
{code}

* Now if we put the jar inside Flink `lib/` dir. the external jar was loaded 
fine. The same SQL cmd will execute fine.
{code}
cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/
./bin/sql-client.sh embedded
{code}




> ./bin/sql-client.sh won't import external jar into the session
> --
>
> Key: FLINK-30035
> URL: https://issues.apache.org/jira/browse/FLINK-30035
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.16.0
>Reporter: Steven Zhen Wu
>Priority: Major
>
> I used to be able to run the sql-client with iceberg-flink-runtime jar using 
> the `-j,--jar ` option (e.g. with 1.15.2). 
> {code}
> ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar
> {code}
> With 1.16.0, this doesn't work anymore. As a result, I am seeing 
> ClassNotFoundException.
> {code}
> java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog
> {code}
> I have to put the iceberg-flink-runtime-1.16-1.1.0.jar file inside the 
> `flink/lib` directory to make the jar loaded. This seems like a regression of 
> 1.16.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] sethsaperstein-lyft commented on pull request #20844: [FLINK-29099][connectors/kinesis] Update global watermark for idle subtask

2022-11-27 Thread GitBox


sethsaperstein-lyft commented on PR #20844:
URL: https://github.com/apache/flink/pull/20844#issuecomment-1328347988

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session

2022-11-27 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639718#comment-17639718
 ] 

Steven Zhen Wu edited comment on FLINK-30035 at 11/27/22 8:56 PM:
--

[~fsk119] here are the steps to reproduce with 1.16.0. note that jar import 
works fine for 1.15.2.

* download the [iceberg-flink-runtime 
jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar]
 without putting it into  Flink`lib` dir
* start sql-client with importing external jar
{code}
./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar
{code}
* run SQL cmd
{code}
CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='file:/Users/stevenwu/runtime/hdfs',
  'property-version'='1'
);
{code}

* Then we shall see exception
{code}
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodException: Cannot find constructor for interface 
org.apache.iceberg.catalog.Catalog
Missing org.apache.iceberg.hadoop.HadoopCatalog 
[java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog]
{code}

* Now if we put the jar inside Flink `lib/` dir. the external jar was loaded 
fine. The same SQL cmd will execute fine.
{code}
cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/
./bin/sql-client.sh embedded
{code}





was (Author: stevenz3wu):
[~fsk119] here are the steps to reproduce with 1.16.0. note that jar import 
works fine for 1.15.2.

* download the [iceberg-flink-runtime 
jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar]
 without putting it into  Flink`lib` dir
* start sql-client with importing external jar
{code}
./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar
{code}
* run SQL cmd
{code}
CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='file:/Users/stevenwu/runtime/hdfs',
  'property-version'='1'
);
{code}

* Then we shall see exception

{code}
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodException: Cannot find constructor for interface 
org.apache.iceberg.catalog.Catalog
Missing org.apache.iceberg.hadoop.HadoopCatalog 
[java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog]
{code}

* Now if we put the jar inside Flink `lib/` dir. the external jar was loaded 
fine. The same SQL cmd will execute fine.
{code}
cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/
./bin/sql-client.sh embedded
{code}




> ./bin/sql-client.sh won't import external jar into the session
> --
>
> Key: FLINK-30035
> URL: https://issues.apache.org/jira/browse/FLINK-30035
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.16.0
>Reporter: Steven Zhen Wu
>Priority: Major
>
> I used to be able to run the sql-client with iceberg-flink-runtime jar using 
> the `-j,--jar ` option (e.g. with 1.15.2). 
> {code}
> ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar
> {code}
> With 1.16.0, this doesn't work anymore. As a result, I am seeing 
> ClassNotFoundException.
> {code}
> java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog
> {code}
> I have to put the iceberg-flink-runtime-1.16-1.1.0.jar file inside the 
> `flink/lib` directory to make the jar loaded. This seems like a regression of 
> 1.16.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session

2022-11-27 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639718#comment-17639718
 ] 

Steven Zhen Wu edited comment on FLINK-30035 at 11/27/22 8:55 PM:
--

[~fsk119] here are the steps to reproduce with 1.16.0. note that jar import 
works fine for 1.15.2.

* download the [iceberg-flink-runtime 
jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar]
 without putting it into  Flink`lib` dir
* start sql-client with importing external jar
{code}
./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar
{code}
* run SQL cmd
{code}
CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='file:/Users/stevenwu/runtime/hdfs',
  'property-version'='1'
);
{code}

Then we shall see exception
{code}
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodException: Cannot find constructor for interface 
org.apache.iceberg.catalog.Catalog
Missing org.apache.iceberg.hadoop.HadoopCatalog 
[java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog]
{code}

Now if we put the jar inside Flink `lib/` dir. the external jar was loaded 
fine. The same SQL cmd will execute fine.
{code}
cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/
./bin/sql-client.sh embedded
{code}





was (Author: stevenz3wu):
[~fsk119] here are the steps to reproduce.

* download the [iceberg-flink-runtime 
jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar]
 without putting it into  Flink`lib` dir
* start sql-client with importing external jar
{code}
./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar
{code}
* run SQL cmd
{code}
CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='file:/Users/stevenwu/runtime/hdfs',
  'property-version'='1'
);
{code}

Then we shall see exception
{code}
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodException: Cannot find constructor for interface 
org.apache.iceberg.catalog.Catalog
Missing org.apache.iceberg.hadoop.HadoopCatalog 
[java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog]
{code}

Now if we put the jar inside Flink `lib/` dir. the external jar was loaded 
fine. The same SQL cmd will execute fine.
{code}
cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/
./bin/sql-client.sh embedded
{code}

jar import works fine for 1.15.2.


> ./bin/sql-client.sh won't import external jar into the session
> --
>
> Key: FLINK-30035
> URL: https://issues.apache.org/jira/browse/FLINK-30035
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.16.0
>Reporter: Steven Zhen Wu
>Priority: Major
>
> I used to be able to run the sql-client with iceberg-flink-runtime jar using 
> the `-j,--jar ` option (e.g. with 1.15.2). 
> {code}
> ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar
> {code}
> With 1.16.0, this doesn't work anymore. As a result, I am seeing 
> ClassNotFoundException.
> {code}
> java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog
> {code}
> I have to put the iceberg-flink-runtime-1.16-1.1.0.jar file inside the 
> `flink/lib` directory to make the jar loaded. This seems like a regression of 
> 1.16.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session

2022-11-27 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639718#comment-17639718
 ] 

Steven Zhen Wu edited comment on FLINK-30035 at 11/27/22 8:55 PM:
--

[~fsk119] here are the steps to reproduce with 1.16.0. note that jar import 
works fine for 1.15.2.

* download the [iceberg-flink-runtime 
jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar]
 without putting it into  Flink`lib` dir
* start sql-client with importing external jar
{code}
./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar
{code}
* run SQL cmd
{code}
CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='file:/Users/stevenwu/runtime/hdfs',
  'property-version'='1'
);
{code}

* Then we shall see exception

{code}
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodException: Cannot find constructor for interface 
org.apache.iceberg.catalog.Catalog
Missing org.apache.iceberg.hadoop.HadoopCatalog 
[java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog]
{code}

* Now if we put the jar inside Flink `lib/` dir. the external jar was loaded 
fine. The same SQL cmd will execute fine.
{code}
cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/
./bin/sql-client.sh embedded
{code}





was (Author: stevenz3wu):
[~fsk119] here are the steps to reproduce with 1.16.0. note that jar import 
works fine for 1.15.2.

* download the [iceberg-flink-runtime 
jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar]
 without putting it into  Flink`lib` dir
* start sql-client with importing external jar
{code}
./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar
{code}
* run SQL cmd
{code}
CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='file:/Users/stevenwu/runtime/hdfs',
  'property-version'='1'
);
{code}

Then we shall see exception
{code}
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodException: Cannot find constructor for interface 
org.apache.iceberg.catalog.Catalog
Missing org.apache.iceberg.hadoop.HadoopCatalog 
[java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog]
{code}

Now if we put the jar inside Flink `lib/` dir. the external jar was loaded 
fine. The same SQL cmd will execute fine.
{code}
cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/
./bin/sql-client.sh embedded
{code}




> ./bin/sql-client.sh won't import external jar into the session
> --
>
> Key: FLINK-30035
> URL: https://issues.apache.org/jira/browse/FLINK-30035
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.16.0
>Reporter: Steven Zhen Wu
>Priority: Major
>
> I used to be able to run the sql-client with iceberg-flink-runtime jar using 
> the `-j,--jar ` option (e.g. with 1.15.2). 
> {code}
> ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar
> {code}
> With 1.16.0, this doesn't work anymore. As a result, I am seeing 
> ClassNotFoundException.
> {code}
> java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog
> {code}
> I have to put the iceberg-flink-runtime-1.16-1.1.0.jar file inside the 
> `flink/lib` directory to make the jar loaded. This seems like a regression of 
> 1.16.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session

2022-11-27 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639718#comment-17639718
 ] 

Steven Zhen Wu edited comment on FLINK-30035 at 11/27/22 8:54 PM:
--

[~fsk119] here are the steps to reproduce.

* download the [iceberg-flink-runtime 
jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar]
 without putting it into  Flink`lib` dir
* start sql-client with importing external jar
{code}
./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar
{code}
* run SQL cmd
{code}
CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='file:/Users/stevenwu/runtime/hdfs',
  'property-version'='1'
);
{code}

Then we shall see exception
{code}
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodException: Cannot find constructor for interface 
org.apache.iceberg.catalog.Catalog
Missing org.apache.iceberg.hadoop.HadoopCatalog 
[java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog]
{code}

Now if we put the jar inside Flink `lib/` dir. the external jar was loaded 
fine. The same SQL cmd will execute fine.
{code}
cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/
./bin/sql-client.sh embedded
{code}

jar import works fine for 1.15.2.



was (Author: stevenz3wu):
[~fsk119] here are the steps to reproduce.

* download the [iceberg-flink-runtime 
jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar]
 without putting it into  Flink`lib` dir
* start sql-client: ` ./bin/sql-client.sh embedded --jar 
/path/to/iceberg-flink-runtime-1.16-1.1.0.jar`
* run SQL cmd
{code}
CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='file:/Users/stevenwu/runtime/hdfs',
  'property-version'='1'
);
{code}

Then we shall see exception
{code}
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodException: Cannot find constructor for interface 
org.apache.iceberg.catalog.Catalog
Missing org.apache.iceberg.hadoop.HadoopCatalog 
[java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog]
{code}

Now if we put the jar inside Flink `lib/` dir. the external jar was loaded 
fine. The same SQL cmd will execute fine.
{code}
cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/
./bin/sql-client.sh embedded
{code}


> ./bin/sql-client.sh won't import external jar into the session
> --
>
> Key: FLINK-30035
> URL: https://issues.apache.org/jira/browse/FLINK-30035
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.16.0
>Reporter: Steven Zhen Wu
>Priority: Major
>
> I used to be able to run the sql-client with iceberg-flink-runtime jar using 
> the `-j,--jar ` option (e.g. with 1.15.2). 
> {code}
> ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar
> {code}
> With 1.16.0, this doesn't work anymore. As a result, I am seeing 
> ClassNotFoundException.
> {code}
> java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog
> {code}
> I have to put the iceberg-flink-runtime-1.16-1.1.0.jar file inside the 
> `flink/lib` directory to make the jar loaded. This seems like a regression of 
> 1.16.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session

2022-11-27 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639718#comment-17639718
 ] 

Steven Zhen Wu edited comment on FLINK-30035 at 11/27/22 8:53 PM:
--

[~fsk119] here are the steps to reproduce.

* download the [iceberg-flink-runtime 
jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar]
 without putting it into  Flink`lib` dir
* start sql-client: ` ./bin/sql-client.sh embedded --jar 
/path/to/iceberg-flink-runtime-1.16-1.1.0.jar`
* run SQL cmd
{code}
CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='file:/Users/stevenwu/runtime/hdfs',
  'property-version'='1'
);
{code}

Then we shall see exception
{code}
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodException: Cannot find constructor for interface 
org.apache.iceberg.catalog.Catalog
Missing org.apache.iceberg.hadoop.HadoopCatalog 
[java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog]
{code}

Now if we put the jar inside Flink `lib/` dir. the external jar was loaded 
fine. The same SQL cmd will execute fine.
{code}
cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/
./bin/sql-client.sh embedded
{code}



was (Author: stevenz3wu):
[~fsk119] here are the steps to reproduce.

* download the [iceberg-flink-runtime 
jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar]
 without putting it into  Flink`lib` dir
* start sql-client: ` ./bin/sql-client.sh embedded --jar 
/path/to/iceberg-flink-runtime-1.16-1.1.0.jar`
* run SQL cmd
```
CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='file:/Users/stevenwu/runtime/hdfs',
  'property-version'='1'
);
```

Then we shall see exception
```
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodException: Cannot find constructor for interface 
org.apache.iceberg.catalog.Catalog
Missing org.apache.iceberg.hadoop.HadoopCatalog 
[java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog]
```

Now if we put the jar inside Flink `lib/` dir. the external jar was loaded 
fine. The same SQL cmd will execute fine.
```
cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/
./bin/sql-client.sh embedded
```


> ./bin/sql-client.sh won't import external jar into the session
> --
>
> Key: FLINK-30035
> URL: https://issues.apache.org/jira/browse/FLINK-30035
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.16.0
>Reporter: Steven Zhen Wu
>Priority: Major
>
> I used to be able to run the sql-client with iceberg-flink-runtime jar using 
> the `-j,--jar ` option (e.g. with 1.15.2). 
> {code}
> ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar
> {code}
> With 1.16.0, this doesn't work anymore. As a result, I am seeing 
> ClassNotFoundException.
> {code}
> java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog
> {code}
> I have to put the iceberg-flink-runtime-1.16-1.1.0.jar file inside the 
> `flink/lib` directory to make the jar loaded. This seems like a regression of 
> 1.16.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session

2022-11-27 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639718#comment-17639718
 ] 

Steven Zhen Wu commented on FLINK-30035:


[~fsk119] here are the steps to reproduce.

* download the [iceberg-flink-runtime 
jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar]
 without putting it into  Flink`lib` dir
* start sql-client: ` ./bin/sql-client.sh embedded --jar 
/path/to/iceberg-flink-runtime-1.16-1.1.0.jar`
* run SQL cmd
```
CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='file:/Users/stevenwu/runtime/hdfs',
  'property-version'='1'
);
```

Then we shall see exception
```
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodException: Cannot find constructor for interface 
org.apache.iceberg.catalog.Catalog
Missing org.apache.iceberg.hadoop.HadoopCatalog 
[java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog]
```

Now if we put the jar inside Flink `lib/` dir. the external jar was loaded 
fine. The same SQL cmd will execute fine.
```
cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/
./bin/sql-client.sh embedded
```


> ./bin/sql-client.sh won't import external jar into the session
> --
>
> Key: FLINK-30035
> URL: https://issues.apache.org/jira/browse/FLINK-30035
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.16.0
>Reporter: Steven Zhen Wu
>Priority: Major
>
> I used to be able to run the sql-client with iceberg-flink-runtime jar using 
> the `-j,--jar ` option (e.g. with 1.15.2). 
> {code}
> ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar
> {code}
> With 1.16.0, this doesn't work anymore. As a result, I am seeing 
> ClassNotFoundException.
> {code}
> java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog
> {code}
> I have to put the iceberg-flink-runtime-1.16-1.1.0.jar file inside the 
> `flink/lib` directory to make the jar loaded. This seems like a regression of 
> 1.16.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector

2022-11-27 Thread GitBox


Jiabao-Sun commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032994738


##
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSampleSplitter.java:
##
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.source.enumerator.splitter;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
+import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;
+
+import com.mongodb.MongoNamespace;
+import com.mongodb.client.model.Aggregates;
+import com.mongodb.client.model.Projections;
+import com.mongodb.client.model.Sorts;
+import org.bson.BsonDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MAX_KEY;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.BSON_MIN_KEY;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_FIELD;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.ID_HINT;
+
+/**
+ * Sample Partitioner
+ *
+ * Samples the collection to generate partitions.
+ *
+ * Uses the average document size to split the collection into average 
sized chunks
+ *
+ * The partitioner samples the collection, projects and sorts by the 
partition fields. Then uses
+ * every {@code samplesPerPartition} as the value to use to calculate the 
partition boundaries.
+ *
+ * 
+ *   scan.partition.size: The average size (MB) for each partition. Note: 
Uses the average
+ *   document size to determine the number of documents per partition so 
may not be even.
+ *   Defaults to: 64mb.
+ *   scan.partition.samples: The number of samples to take per partition. 
Defaults to: 10. The
+ *   total number of samples taken is calculated as: {@code samples per 
partition * (count of
+ *   documents / number of documents per partition)}.

Review Comment:
   > If multiple samples are taken per partition then somewhere in here we'd 
have to merge sample to arrive at a single partition again, but afaict that 
doesn't happen.
   
   We merge samples in the following code.
   ```java
   List sourceSplits = new ArrayList<>();
   BsonDocument partitionStart = new BsonDocument(ID_FIELD, 
BSON_MIN_KEY);
   int splitNum = 0;
   for (int i = 0; i < samples.size(); i++) {
   if (i % samplesPerPartition == 0 || i == samples.size() - 1) {
   sourceSplits.add(
   createSplit(namespace, splitNum++, partitionStart, 
samples.get(i)));
   partitionStart = samples.get(i);
   }
   }
   ```
   
   
   
   > Instead we have some strange formula that determines the number of samples 
(read: partitions), and I have no idea how the resulting partitions could 
correlate with the desired partition size.
   >
   > Why isnt the number of sample (again: partitions) not count / 
numDocumentsPerPartition?
   
   1. numDocumentsPerPartition = partitionSizeInBytes / avgObjSizeInBytes
   2. samplingRate = samplesPerPartition / numDocumentsPerPartition
   3. samplesCount = samplingRate * count
   4. merge samples by samplesPerPartition
   
   We calculate the sampling rate through samples per partition and partition 
size. We can also be accomplished directly by setting the sampling rate. 
   
   @zentol `scan.partition.samples` or `scan.partition.sampling-rate` which do 
you think is better?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector

2022-11-27 Thread GitBox


Jiabao-Sun commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032991900


##
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoRowDataLookupFunction.java:
##
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import 
org.apache.flink.connector.mongodb.table.converter.BsonToRowDataConverters;
+import 
org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.LookupFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.mongodb.MongoException;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import org.bson.BsonDocument;
+import org.bson.conversions.Bson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static com.mongodb.client.model.Filters.and;
+import static com.mongodb.client.model.Filters.eq;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoUtils.project;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A lookup function for {@link MongoDynamicTableSource}. */
+@Internal
+public class MongoRowDataLookupFunction extends LookupFunction {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(MongoRowDataLookupFunction.class);
+private static final long serialVersionUID = 1L;
+
+private final MongoConnectionOptions connectionOptions;
+private final int maxRetries;
+private final long retryIntervalMs;
+
+private final List fieldNames;
+private final List keyNames;
+
+private final BsonToRowDataConverters.BsonToRowDataConverter 
mongoRowConverter;
+private final RowDataToBsonConverters.RowDataToBsonConverter 
lookupKeyRowConverter;
+
+private transient MongoClient mongoClient;
+
+public MongoRowDataLookupFunction(
+MongoConnectionOptions connectionOptions,
+int maxRetries,
+long retryIntervalMs,
+List fieldNames,
+List fieldTypes,
+List keyNames,
+RowType rowType) {
+checkNotNull(fieldNames, "No fieldNames supplied.");
+checkNotNull(fieldTypes, "No fieldTypes supplied.");
+checkNotNull(keyNames, "No keyNames supplied.");
+this.connectionOptions = checkNotNull(connectionOptions);
+this.maxRetries = maxRetries;
+this.retryIntervalMs = retryIntervalMs;
+this.fieldNames = fieldNames;
+this.mongoRowConverter = 
BsonToRowDataConverters.createNullableConverter(rowType);
+
+this.keyNames = keyNames;
+LogicalType[] keyTypes =
+this.keyNames.stream()
+.map(
+s -> {
+checkArgument(
+fieldNames.contains(s),

Review Comment:
   Thanks. Table API guarantees this to be the case. I'll remove that check.
   > org.apache.flink.table.api.ValidationException: SQL validation failed. 
From line 1, column 131 to line 1, column 133: Column 'f18' not found in table 
'D'
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector

2022-11-27 Thread GitBox


Jiabao-Sun commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032644671


##
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSink.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.sink.MongoSink;
+import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
+import 
org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters;
+import 
org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters.RowDataToBsonConverter;
+import 
org.apache.flink.connector.mongodb.table.serialization.MongoRowDataSerializationSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.bson.BsonValue;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link DynamicTableSink} for MongoDB. */
+@Internal
+public class MongoDynamicTableSink implements DynamicTableSink {
+
+private final MongoConnectionOptions connectionOptions;
+private final MongoWriteOptions writeOptions;
+@Nullable private final Integer parallelism;
+private final DataType physicalRowDataType;
+private final SerializableFunction keyExtractor;
+
+public MongoDynamicTableSink(
+MongoConnectionOptions connectionOptions,
+MongoWriteOptions writeOptions,
+@Nullable Integer parallelism,
+DataType physicalRowDataType,
+SerializableFunction keyExtractor) {
+this.connectionOptions = checkNotNull(connectionOptions);
+this.writeOptions = checkNotNull(writeOptions);
+this.parallelism = parallelism;
+this.physicalRowDataType = checkNotNull(physicalRowDataType);
+this.keyExtractor = checkNotNull(keyExtractor);
+}
+
+@Override
+public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+// UPSERT mode
+ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+for (RowKind kind : requestedMode.getContainedKinds()) {
+if (kind != RowKind.UPDATE_BEFORE) {

Review Comment:
   ~~This connector can support writing in both append-only and upsert modes. 
I'm not sure if an explicitly upsert will force a primary key to be defined.~~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector

2022-11-27 Thread GitBox


Jiabao-Sun commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032989565


##
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSink.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.sink.MongoSink;
+import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
+import 
org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters;
+import 
org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters.RowDataToBsonConverter;
+import 
org.apache.flink.connector.mongodb.table.serialization.MongoRowDataSerializationSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.bson.BsonValue;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link DynamicTableSink} for MongoDB. */
+@Internal
+public class MongoDynamicTableSink implements DynamicTableSink {
+
+private final MongoConnectionOptions connectionOptions;
+private final MongoWriteOptions writeOptions;
+@Nullable private final Integer parallelism;
+private final DataType physicalRowDataType;
+private final SerializableFunction keyExtractor;
+
+public MongoDynamicTableSink(
+MongoConnectionOptions connectionOptions,
+MongoWriteOptions writeOptions,
+@Nullable Integer parallelism,
+DataType physicalRowDataType,
+SerializableFunction keyExtractor) {
+this.connectionOptions = checkNotNull(connectionOptions);
+this.writeOptions = checkNotNull(writeOptions);
+this.parallelism = parallelism;
+this.physicalRowDataType = checkNotNull(physicalRowDataType);
+this.keyExtractor = checkNotNull(keyExtractor);
+}
+
+@Override
+public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+// UPSERT mode
+ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+for (RowKind kind : requestedMode.getContainedKinds()) {
+if (kind != RowKind.UPDATE_BEFORE) {

Review Comment:
   We have added tests for these two scenarios in E2E.
   Explicitly set  an upsert `ChangelogMode` won't force a primary key to be 
defined.
   So change the `ChangelogMode` to upsert mode here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector

2022-11-27 Thread GitBox


Jiabao-Sun commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032983609


##
flink-connector-mongodb-e2e-tests/src/test/java/org/apache/flink/tests/util/mongodb/MongoE2ECase.java:
##
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.mongodb;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.connector.testframe.container.FlinkContainers;
+import org.apache.flink.connector.testframe.container.FlinkContainersSettings;
+import org.apache.flink.connector.testframe.container.TestcontainersSettings;
+import org.apache.flink.test.resources.ResourceTestUtils;
+import org.apache.flink.test.util.SQLJobSubmission;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoDatabase;
+import org.bson.Document;
+import org.bson.types.ObjectId;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MongoDBContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** End-to-end test for the MongoDB connectors. */
+@Testcontainers
+class MongoE2ECase {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(MongoE2ECase.class);
+
+private static final String MONGODB_HOSTNAME = "mongodb";
+
+private static final String MONGO_4_0 = "mongo:4.0.10";

Review Comment:
   In the previous modification, we ignored the test-jar compilation of 
`flink-connector-mongodb`. 
   In order to reuse `MongoTestUtil` in `e2e-tests` module, do we need to 
compile test-jar?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector

2022-11-27 Thread GitBox


Jiabao-Sun commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032983150


##
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.source.reader.split;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
+import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;
+import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit;
+import org.apache.flink.util.CollectionUtil;
+
+import com.mongodb.MongoException;
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCursor;
+import org.bson.BsonDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoUtils.project;
+
+/** An split reader implements {@link SplitReader} for {@link 
MongoScanSourceSplit}. */
+@Internal
+public class MongoScanSourceSplitReader implements 
MongoSourceSplitReader {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(MongoScanSourceSplitReader.class);
+
+private final MongoConnectionOptions connectionOptions;
+private final MongoReadOptions readOptions;
+private final SourceReaderContext readerContext;
+@Nullable private final List projectedFields;
+private final int limit;
+
+private boolean closed = false;
+private boolean finished = false;
+private MongoClient mongoClient;
+private MongoCursor currentCursor;
+private MongoScanSourceSplit currentSplit;
+
+public MongoScanSourceSplitReader(
+MongoConnectionOptions connectionOptions,
+MongoReadOptions readOptions,
+@Nullable List projectedFields,
+int limit,
+SourceReaderContext context) {
+this.connectionOptions = connectionOptions;
+this.readOptions = readOptions;
+this.projectedFields = projectedFields;
+this.limit = limit;
+this.readerContext = context;
+}
+
+@Override
+public RecordsWithSplitIds fetch() throws IOException {
+if (closed) {
+throw new IllegalStateException("Cannot fetch records from a 
closed split reader");
+}
+
+RecordsBySplits.Builder builder = new 
RecordsBySplits.Builder<>();
+
+// Return when no split registered to this reader.
+if (currentSplit == null) {
+return builder.build();
+}
+
+currentCursor = getOrCreateCursor();
+int fetchSize = readOptions.getFetchSize();
+
+try {
+for (int recordNum = 0; recordNum < fetchSize; recordNum++) {
+if (currentCursor.hasNext()) {
+builder.add(currentSplit, currentCursor.next());
+} else {
+builder.addFinishedSplit(currentSplit.splitId());
+finished = true;
+break;
+}
+}
+return builder.build();
+} catch (MongoException e) {
+throw new IOException("Scan records form MongoDB failed", e);
+} finally {
+if (finished) {
+currentSplit = null;
+releaseCursor();
+}
+}
+}
+
+@Override
+public void 

[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector

2022-11-27 Thread GitBox


Jiabao-Sun commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032983150


##
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.source.reader.split;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
+import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;
+import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit;
+import org.apache.flink.util.CollectionUtil;
+
+import com.mongodb.MongoException;
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCursor;
+import org.bson.BsonDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoUtils.project;
+
+/** An split reader implements {@link SplitReader} for {@link 
MongoScanSourceSplit}. */
+@Internal
+public class MongoScanSourceSplitReader implements 
MongoSourceSplitReader {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(MongoScanSourceSplitReader.class);
+
+private final MongoConnectionOptions connectionOptions;
+private final MongoReadOptions readOptions;
+private final SourceReaderContext readerContext;
+@Nullable private final List projectedFields;
+private final int limit;
+
+private boolean closed = false;
+private boolean finished = false;
+private MongoClient mongoClient;
+private MongoCursor currentCursor;
+private MongoScanSourceSplit currentSplit;
+
+public MongoScanSourceSplitReader(
+MongoConnectionOptions connectionOptions,
+MongoReadOptions readOptions,
+@Nullable List projectedFields,
+int limit,
+SourceReaderContext context) {
+this.connectionOptions = connectionOptions;
+this.readOptions = readOptions;
+this.projectedFields = projectedFields;
+this.limit = limit;
+this.readerContext = context;
+}
+
+@Override
+public RecordsWithSplitIds fetch() throws IOException {
+if (closed) {
+throw new IllegalStateException("Cannot fetch records from a 
closed split reader");
+}
+
+RecordsBySplits.Builder builder = new 
RecordsBySplits.Builder<>();
+
+// Return when no split registered to this reader.
+if (currentSplit == null) {
+return builder.build();
+}
+
+currentCursor = getOrCreateCursor();
+int fetchSize = readOptions.getFetchSize();
+
+try {
+for (int recordNum = 0; recordNum < fetchSize; recordNum++) {
+if (currentCursor.hasNext()) {
+builder.add(currentSplit, currentCursor.next());
+} else {
+builder.addFinishedSplit(currentSplit.splitId());
+finished = true;
+break;
+}
+}
+return builder.build();
+} catch (MongoException e) {
+throw new IOException("Scan records form MongoDB failed", e);
+} finally {
+if (finished) {
+currentSplit = null;
+releaseCursor();
+}
+}
+}
+
+@Override
+public void 

[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector

2022-11-27 Thread GitBox


Jiabao-Sun commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032977743


##
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/assigner/MongoSplitAssigner.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.mongodb.source.enumerator.MongoSourceEnumState;
+import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Optional;
+
+/** The split assigner for {@link MongoSourceSplit}. */
+@Internal
+public interface MongoSplitAssigner extends Serializable {
+
+/**
+ * Called to open the assigner to acquire any resources, like threads or 
network connections.
+ */
+void open();
+
+/**
+ * Called to close the assigner, in case it holds on to any resources, 
like threads or network
+ * connections.
+ */
+void close() throws IOException;
+
+/** Gets the next split. */
+Optional getNext();

Review Comment:
   When all splits have been assigned, but the enumerator has not signaled no 
more splits yet.
   When the enumerator receives an empty split, it will check whether it should 
notify the reader to close.
   
   `MongoSourceEnumerator#assignSplits`
   ```java
   Optional split = splitAssigner.getNext();
   if (split.isPresent()) {
   final MongoSourceSplit mongoSplit = split.get();
   context.assignSplit(mongoSplit, nextAwaiting);
   awaitingReader.remove();
   LOG.info("Assign split {} to subtask {}", mongoSplit, nextAwaiting);
   break;
   } else if (splitAssigner.noMoreSplits() && boundedness == 
Boundedness.BOUNDED) {
   LOG.info("All splits have been assigned");
   
context.registeredReaders().keySet().forEach(context::signalNoMoreSplits);
   break;
   } else {
   // there is no available splits by now, skip assigning
   break;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector

2022-11-27 Thread GitBox


Jiabao-Sun commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032971595


##
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoShardedSplitter.java:
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.source.enumerator.splitter;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import com.mongodb.MongoException;
+import com.mongodb.MongoNamespace;
+import com.mongodb.client.MongoClient;
+import org.bson.BsonDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.KEY_FIELD;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.MAX_FIELD;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.MIN_FIELD;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoUtils.isShardedCollectionDropped;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoUtils.readChunks;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoUtils.readCollectionMetadata;
+
+/**
+ * Sharded Partitioner
+ *
+ * Uses the chunks collection and partitions the collection based on the 
sharded collections
+ * chunk ranges.
+ *
+ * The following config collections' read privilege is required.
+ *
+ * 
+ *   config.collections
+ *   config.chunks
+ * 
+ */
+@Internal
+public class MongoShardedSplitter {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(MongoShardedSplitter.class);
+
+public static final MongoShardedSplitter INSTANCE = new 
MongoShardedSplitter();
+
+private MongoShardedSplitter() {}
+
+public Collection split(MongoSplitContext 
splitContext) {
+MongoNamespace namespace = splitContext.getMongoNamespace();
+MongoClient mongoClient = splitContext.getMongoClient();
+
+List chunks;
+Optional collectionMetadata;
+try {
+collectionMetadata = readCollectionMetadata(mongoClient, 
namespace);
+if (!collectionMetadata.isPresent()) {
+LOG.error(
+"Do sharded split failed, collection {} does not 
appear to be sharded.",
+namespace);
+throw new FlinkRuntimeException(
+String.format(
+"Do sharded split failed, %s is not a sharded 
collection.",
+namespace));
+}
+
+if (isShardedCollectionDropped(collectionMetadata.get())) {
+LOG.error("Do sharded split failed, collection {} was 
dropped.", namespace);
+throw new FlinkRuntimeException(
+String.format("Do sharded split failed, %s was 
dropped.", namespace));
+}
+
+chunks = readChunks(mongoClient, collectionMetadata.get());
+if (chunks.isEmpty()) {
+LOG.error("Do sharded split failed, chunks of {} is empty.", 
namespace);
+throw new FlinkRuntimeException(
+String.format(
+"Do sharded split failed, chunks of %s is 
empty.", namespace));
+}
+} catch (MongoException e) {
+LOG.error(
+"Read chunks from {} failed with error message: {}", 
namespace, e.getMessage());
+throw new FlinkRuntimeException(e);
+}
+
+List sourceSplits = new 
ArrayList<>(chunks.size());
+for (int i = 0; i < chunks.size(); i++) {
+BsonDocument chunk = chunks.get(i);
+sourceSplits.add(
+new MongoScanSourceSplit(
+String.format("%s_%d", namespace, i),

Review Comment:
   How about we use the primary key (`_id` field mentioned above) of 
`config.chunks`?
  

[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector

2022-11-27 Thread GitBox


Jiabao-Sun commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032963748


##
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoShardedSplitter.java:
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.source.enumerator.splitter;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import com.mongodb.MongoException;
+import com.mongodb.MongoNamespace;
+import com.mongodb.client.MongoClient;
+import org.bson.BsonDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.KEY_FIELD;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.MAX_FIELD;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.MIN_FIELD;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoUtils.isShardedCollectionDropped;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoUtils.readChunks;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoUtils.readCollectionMetadata;
+
+/**
+ * Sharded Partitioner
+ *
+ * Uses the chunks collection and partitions the collection based on the 
sharded collections
+ * chunk ranges.
+ *
+ * The following config collections' read privilege is required.
+ *
+ * 
+ *   config.collections
+ *   config.chunks
+ * 
+ */
+@Internal
+public class MongoShardedSplitter {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(MongoShardedSplitter.class);
+
+public static final MongoShardedSplitter INSTANCE = new 
MongoShardedSplitter();
+
+private MongoShardedSplitter() {}
+
+public Collection split(MongoSplitContext 
splitContext) {
+MongoNamespace namespace = splitContext.getMongoNamespace();
+MongoClient mongoClient = splitContext.getMongoClient();
+
+List chunks;
+Optional collectionMetadata;
+try {
+collectionMetadata = readCollectionMetadata(mongoClient, 
namespace);
+if (!collectionMetadata.isPresent()) {
+LOG.error(
+"Do sharded split failed, collection {} does not 
appear to be sharded.",
+namespace);
+throw new FlinkRuntimeException(
+String.format(
+"Do sharded split failed, %s is not a sharded 
collection.",
+namespace));
+}
+
+if (isShardedCollectionDropped(collectionMetadata.get())) {
+LOG.error("Do sharded split failed, collection {} was 
dropped.", namespace);
+throw new FlinkRuntimeException(
+String.format("Do sharded split failed, %s was 
dropped.", namespace));
+}
+
+chunks = readChunks(mongoClient, collectionMetadata.get());
+if (chunks.isEmpty()) {
+LOG.error("Do sharded split failed, chunks of {} is empty.", 
namespace);
+throw new FlinkRuntimeException(
+String.format(
+"Do sharded split failed, chunks of %s is 
empty.", namespace));
+}
+} catch (MongoException e) {
+LOG.error(
+"Read chunks from {} failed with error message: {}", 
namespace, e.getMessage());
+throw new FlinkRuntimeException(e);
+}
+
+List sourceSplits = new 
ArrayList<>(chunks.size());
+for (int i = 0; i < chunks.size(); i++) {
+BsonDocument chunk = chunks.get(i);

Review Comment:
   Yes, there will be some extra fields. Do we need to use them to encode the 
split name of `MongoScanSourceSplit`?
   
   - `_id` field represents the primary key recorded in 

[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector

2022-11-27 Thread GitBox


Jiabao-Sun commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032957642


##
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoShardedSplitter.java:
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.source.enumerator.splitter;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import com.mongodb.MongoException;
+import com.mongodb.MongoNamespace;
+import com.mongodb.client.MongoClient;
+import org.bson.BsonDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.KEY_FIELD;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.MAX_FIELD;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoConstants.MIN_FIELD;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoUtils.isShardedCollectionDropped;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoUtils.readChunks;
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoUtils.readCollectionMetadata;
+
+/**
+ * Sharded Partitioner
+ *
+ * Uses the chunks collection and partitions the collection based on the 
sharded collections
+ * chunk ranges.
+ *
+ * The following config collections' read privilege is required.
+ *
+ * 
+ *   config.collections
+ *   config.chunks
+ * 
+ */
+@Internal
+public class MongoShardedSplitter {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(MongoShardedSplitter.class);
+
+public static final MongoShardedSplitter INSTANCE = new 
MongoShardedSplitter();
+
+private MongoShardedSplitter() {}
+
+public Collection split(MongoSplitContext 
splitContext) {
+MongoNamespace namespace = splitContext.getMongoNamespace();
+MongoClient mongoClient = splitContext.getMongoClient();
+
+List chunks;
+Optional collectionMetadata;
+try {
+collectionMetadata = readCollectionMetadata(mongoClient, 
namespace);
+if (!collectionMetadata.isPresent()) {
+LOG.error(
+"Do sharded split failed, collection {} does not 
appear to be sharded.",
+namespace);
+throw new FlinkRuntimeException(
+String.format(
+"Do sharded split failed, %s is not a sharded 
collection.",
+namespace));
+}
+
+if (isShardedCollectionDropped(collectionMetadata.get())) {
+LOG.error("Do sharded split failed, collection {} was 
dropped.", namespace);
+throw new FlinkRuntimeException(
+String.format("Do sharded split failed, %s was 
dropped.", namespace));
+}
+
+chunks = readChunks(mongoClient, collectionMetadata.get());
+if (chunks.isEmpty()) {
+LOG.error("Do sharded split failed, chunks of {} is empty.", 
namespace);
+throw new FlinkRuntimeException(
+String.format(
+"Do sharded split failed, chunks of %s is 
empty.", namespace));
+}

Review Comment:
   As long as we shard the collection, even if it is empty, a record will be 
generated in `config.chunks`.
   ```javascript
   {
   "_id" : ObjectId("63838c89ae7bc37861d753a7"),
   "uuid" : UUID("cce0b7c9-4c67-4d01-ad1f-ddc13d91dc49"),
   "min" : {
   "user_id" : { "$minKey" : 1 },
   "product_no" : { "$minKey" : 1 },
   "product_kind" : { "$minKey" : 1 }
   },
   "max" : {
   "user_id" : { "$maxKey" : 1 },
   "product_no" : { "$maxKey" : 1 },
   "product_kind" : { "$maxKey" : 1 }
   },
   "shard" : "rs0-shard",
   "lastmod" : 

[GitHub] [flink-connector-mongodb] Jiabao-Sun commented on a diff in pull request #1: [FLINK-6573][connectors/mongodb] Flink MongoDB Connector

2022-11-27 Thread GitBox


Jiabao-Sun commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1032955663


##
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.source.reader.split;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
+import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;
+import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit;
+import org.apache.flink.util.CollectionUtil;
+
+import com.mongodb.MongoException;
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCursor;
+import org.bson.BsonDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+import static 
org.apache.flink.connector.mongodb.common.utils.MongoUtils.project;
+
+/** An split reader implements {@link SplitReader} for {@link 
MongoScanSourceSplit}. */
+@Internal
+public class MongoScanSourceSplitReader implements 
MongoSourceSplitReader {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(MongoScanSourceSplitReader.class);
+
+private final MongoConnectionOptions connectionOptions;
+private final MongoReadOptions readOptions;
+private final SourceReaderContext readerContext;
+@Nullable private final List projectedFields;
+private final int limit;
+
+private boolean closed = false;
+private boolean finished = false;
+private MongoClient mongoClient;
+private MongoCursor currentCursor;
+private MongoScanSourceSplit currentSplit;
+
+public MongoScanSourceSplitReader(
+MongoConnectionOptions connectionOptions,
+MongoReadOptions readOptions,
+@Nullable List projectedFields,
+int limit,
+SourceReaderContext context) {
+this.connectionOptions = connectionOptions;
+this.readOptions = readOptions;
+this.projectedFields = projectedFields;
+this.limit = limit;
+this.readerContext = context;
+}
+
+@Override
+public RecordsWithSplitIds fetch() throws IOException {
+if (closed) {
+throw new IllegalStateException("Cannot fetch records from a 
closed split reader");
+}
+
+RecordsBySplits.Builder builder = new 
RecordsBySplits.Builder<>();
+
+// Return when no split registered to this reader.
+if (currentSplit == null) {
+return builder.build();
+}
+
+currentCursor = getOrCreateCursor();
+int fetchSize = readOptions.getFetchSize();
+
+try {
+for (int recordNum = 0; recordNum < fetchSize; recordNum++) {
+if (currentCursor.hasNext()) {
+builder.add(currentSplit, currentCursor.next());
+} else {
+builder.addFinishedSplit(currentSplit.splitId());
+finished = true;
+break;
+}
+}

Review Comment:
   We use a 
[cursor](https://www.mongodb.com/docs/manual/reference/method/cursor.batchSize/#mongodb-method-cursor.batchSize)
 to request a batch of data from mongodb, the size of the batch depends on the 
configuration of `scan.cursor.batch-size`.  No request will be made to mongodb 
until a batch of data in the cursor has 

[GitHub] [flink] flinkbot commented on pull request #21403: [FLINK-29984][flink-metrics] Prometheus histogram minmax

2022-11-27 Thread GitBox


flinkbot commented on PR #21403:
URL: https://github.com/apache/flink/pull/21403#issuecomment-1328234175

   
   ## CI report:
   
   * d916abd22d6bb510b39df1c51b04f74ad92c8d59 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] qingwei91 commented on a diff in pull request #21403: [FLINK-29984][flink-metrics] Prometheus histogram minmax

2022-11-27 Thread GitBox


qingwei91 commented on code in PR #21403:
URL: https://github.com/apache/flink/pull/21403#discussion_r1032920245


##
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java:
##
@@ -378,6 +390,22 @@ private void addSamples(
 addToList(labelValues, quantile.toString()),
 statistics.getQuantile(quantile)));
 }
+if (this.histogramMaxEnabled) {
+samples.add(
+new MetricFamilySamples.Sample(
+metricName,
+labelNamesWithQuantile,
+addToList(labelValues, "1.0"),
+statistics.getMax()));
+}
+if (this.histogramMinEnabled) {
+samples.add(
+new MetricFamilySamples.Sample(
+metricName,
+labelNamesWithQuantile,
+addToList(labelValues, "0.0"),
+statistics.getMin()));
+}

Review Comment:
   Core of the changes, emit max as 1.0 and min as 0.0 when reporting to 
prometheus



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29984) Flink Histogram not emitting min and max when using Prometheus Reporter

2022-11-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29984:
---
Labels: pull-request-available  (was: )

> Flink Histogram not emitting min and max when using Prometheus Reporter
> ---
>
> Key: FLINK-29984
> URL: https://issues.apache.org/jira/browse/FLINK-29984
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Metrics
>Affects Versions: 1.16.0, 1.15.3
>Reporter: Lim Qing Wei
>Assignee: Lim Qing Wei
>Priority: Major
>  Labels: pull-request-available
>
> Flink Histogram when using the Prometheus Metrics Reporter only produces
>  * quantiles of 0.5, 0,75, 0.95, 0.98, 0.99, 0.999
>  * count
>  
> I think it would be a good idea to also produce min and max, as they are 
> already available in the state, we can model it as p0 and p1.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] qingwei91 opened a new pull request, #21403: [FLINK-29984][flink-metrics] Prometheus histogram minmax

2022-11-27 Thread GitBox


qingwei91 opened a new pull request, #21403:
URL: https://github.com/apache/flink/pull/21403

   ## What is the purpose of the change
   
   Expose Histogram min and max when exporting to Prometheus, the data is 
already available, we just need to export it.
   This is enabled in both Prometheus and Prometheus Gateway reporter.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   * Added test to make sure min and max are emitted in prometheus reporter.
   
   ## Does this pull request potentially affect one of the following parts
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30222) Suspended a job in last-state mode bug

2022-11-27 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-30222:
---
Priority: Blocker  (was: Major)

> Suspended a job in last-state mode bug
> --
>
> Key: FLINK-30222
> URL: https://issues.apache.org/jira/browse/FLINK-30222
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: 1.16.0, kubernetes-operator-1.2.0
>Reporter: tanjialiang
>Priority: Blocker
> Fix For: kubernetes-operator-1.3.0
>
> Attachments: image-2022-11-27-16-48-08-445.png
>
>
> In flink 1.16.0, it support set kubernetes HA with options 'kubernetes', such 
> as 'high-availability: kubernetes'. But in kubernetes operator 1.2.0, I try 
> to suspended a job in last-state mode, it validate fail, because of 'Job 
> could not be upgraded with last-state while Kubernetes HA disabled'.
>  
> I try to use kubectl patch to supsended a job with last-state
> {code:sh}
> kubectl -nbigdata-flink patch 
> flinkdeployments.flink.apache.org/streaming-638223bf650ac869689faa62-flink 
> --type=merge -p '{"spec": {"job":
> {"state": "suspended", "upgradeMode": "last-state"}{code}
> it found an error, because my kubernetes HA is disabled
> {code:java}
> Error from server: admission webhook "flinkoperator.flink.apache.org" denied 
> the request: Job could not be upgraded with last-state while Kubernetes HA 
> disabled {code}
> but i enabled kubernetes HA with this follow options:
> {code:yaml}
> kubernetes.cluster-id: 
> high-availability: kubernetes
> high-availability.storageDir: hdfs:///flink/recovery {code}
> and i found flink kubernetes operator 1.2.0 validate the kubernetes HA in the 
> old options:
> {code:yaml}
> high-availability: 
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory 
> {code}
> it may be in the 
> org.apache.flink.kubernetes.operator.utils.FlinkUtils#isKubernetesHAActivated 
> to judge.
> !image-2022-11-27-16-48-08-445.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30222) Suspended a job in last-state mode bug

2022-11-27 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-30222:
---
Fix Version/s: kubernetes-operator-1.3.0

> Suspended a job in last-state mode bug
> --
>
> Key: FLINK-30222
> URL: https://issues.apache.org/jira/browse/FLINK-30222
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: 1.16.0, kubernetes-operator-1.2.0
>Reporter: tanjialiang
>Priority: Major
> Fix For: kubernetes-operator-1.3.0
>
> Attachments: image-2022-11-27-16-48-08-445.png
>
>
> In flink 1.16.0, it support set kubernetes HA with options 'kubernetes', such 
> as 'high-availability: kubernetes'. But in kubernetes operator 1.2.0, I try 
> to suspended a job in last-state mode, it validate fail, because of 'Job 
> could not be upgraded with last-state while Kubernetes HA disabled'.
>  
> I try to use kubectl patch to supsended a job with last-state
> {code:sh}
> kubectl -nbigdata-flink patch 
> flinkdeployments.flink.apache.org/streaming-638223bf650ac869689faa62-flink 
> --type=merge -p '{"spec": {"job":
> {"state": "suspended", "upgradeMode": "last-state"}{code}
> it found an error, because my kubernetes HA is disabled
> {code:java}
> Error from server: admission webhook "flinkoperator.flink.apache.org" denied 
> the request: Job could not be upgraded with last-state while Kubernetes HA 
> disabled {code}
> but i enabled kubernetes HA with this follow options:
> {code:yaml}
> kubernetes.cluster-id: 
> high-availability: kubernetes
> high-availability.storageDir: hdfs:///flink/recovery {code}
> and i found flink kubernetes operator 1.2.0 validate the kubernetes HA in the 
> old options:
> {code:yaml}
> high-availability: 
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory 
> {code}
> it may be in the 
> org.apache.flink.kubernetes.operator.utils.FlinkUtils#isKubernetesHAActivated 
> to judge.
> !image-2022-11-27-16-48-08-445.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30222) Suspended a job in last-state mode bug

2022-11-27 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639640#comment-17639640
 ] 

Gyula Fora commented on FLINK-30222:


Good catch! Would you like to work on this ticket?

The current workaround is to set:
```
    high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
```
instead of simply `kubernetes`

> Suspended a job in last-state mode bug
> --
>
> Key: FLINK-30222
> URL: https://issues.apache.org/jira/browse/FLINK-30222
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: 1.16.0, kubernetes-operator-1.2.0
>Reporter: tanjialiang
>Priority: Major
> Attachments: image-2022-11-27-16-48-08-445.png
>
>
> In flink 1.16.0, it support set kubernetes HA with options 'kubernetes', such 
> as 'high-availability: kubernetes'. But in kubernetes operator 1.2.0, I try 
> to suspended a job in last-state mode, it validate fail, because of 'Job 
> could not be upgraded with last-state while Kubernetes HA disabled'.
>  
> I try to use kubectl patch to supsended a job with last-state
> {code:sh}
> kubectl -nbigdata-flink patch 
> flinkdeployments.flink.apache.org/streaming-638223bf650ac869689faa62-flink 
> --type=merge -p '{"spec": {"job":
> {"state": "suspended", "upgradeMode": "last-state"}{code}
> it found an error, because my kubernetes HA is disabled
> {code:java}
> Error from server: admission webhook "flinkoperator.flink.apache.org" denied 
> the request: Job could not be upgraded with last-state while Kubernetes HA 
> disabled {code}
> but i enabled kubernetes HA with this follow options:
> {code:yaml}
> kubernetes.cluster-id: 
> high-availability: kubernetes
> high-availability.storageDir: hdfs:///flink/recovery {code}
> and i found flink kubernetes operator 1.2.0 validate the kubernetes HA in the 
> old options:
> {code:yaml}
> high-availability: 
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory 
> {code}
> it may be in the 
> org.apache.flink.kubernetes.operator.utils.FlinkUtils#isKubernetesHAActivated 
> to judge.
> !image-2022-11-27-16-48-08-445.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30222) Suspended a job in last-state mode bug

2022-11-27 Thread tanjialiang (Jira)
tanjialiang created FLINK-30222:
---

 Summary: Suspended a job in last-state mode bug
 Key: FLINK-30222
 URL: https://issues.apache.org/jira/browse/FLINK-30222
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.2.0, 1.16.0
Reporter: tanjialiang
 Attachments: image-2022-11-27-16-48-08-445.png

In flink 1.16.0, it support set kubernetes HA with options 'kubernetes', such 
as 'high-availability: kubernetes'. But in kubernetes operator 1.2.0, I try to 
suspended a job in last-state mode, it validate fail, because of 'Job could not 
be upgraded with last-state while Kubernetes HA disabled'.

 

I try to use kubectl patch to supsended a job with last-state
{code:sh}
kubectl -nbigdata-flink patch 
flinkdeployments.flink.apache.org/streaming-638223bf650ac869689faa62-flink 
--type=merge -p '{"spec": {"job":
{"state": "suspended", "upgradeMode": "last-state"}{code}
it found an error, because my kubernetes HA is disabled
{code:java}
Error from server: admission webhook "flinkoperator.flink.apache.org" denied 
the request: Job could not be upgraded with last-state while Kubernetes HA 
disabled {code}
but i enabled kubernetes HA with this follow options:
{code:yaml}
kubernetes.cluster-id: 
high-availability: kubernetes
high-availability.storageDir: hdfs:///flink/recovery {code}
and i found flink kubernetes operator 1.2.0 validate the kubernetes HA in the 
old options:
{code:yaml}
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory {code}
it may be in the 
org.apache.flink.kubernetes.operator.utils.FlinkUtils#isKubernetesHAActivated 
to judge.

!image-2022-11-27-16-48-08-445.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)