[GitHub] [flink] zzzzzzzs commented on pull request #20342: [FLINK-26939][Table SQL/API] Add TRANSLATE supported in SQL & Table API

2022-07-22 Thread GitBox


zzzs commented on PR #20342:
URL: https://github.com/apache/flink/pull/20342#issuecomment-1193063464

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-elasticsearch] Recorvery closed pull request #26: fix the code of create a instance in the doc

2022-07-22 Thread GitBox


Recorvery closed pull request #26: fix the code of create a instance in the doc
URL: https://github.com/apache/flink-connector-elasticsearch/pull/26


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-elasticsearch] Recorvery opened a new pull request, #26: fix the code of create a instance in the doc

2022-07-22 Thread GitBox


Recorvery opened a new pull request, #26:
URL: https://github.com/apache/flink-connector-elasticsearch/pull/26

   In this file,line 41 missed a right brackets ,may confused the learner .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-27618) Support CumeDist

2022-07-22 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-27618.
---
Resolution: Fixed

Fixed in master: 8017a417515cb87b1d15ef1295ecac73126b40f8

> Support CumeDist
> 
>
> Key: FLINK-27618
> URL: https://issues.apache.org/jira/browse/FLINK-27618
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28451) HiveFunctionDefinitionFactory load udf using user classloader instead of thread context classloader

2022-07-22 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-28451.
---
Resolution: Fixed

Fixed in master: e13ae26e806eff403787d5b75e90008bcc13d8dc and 
9a4fd227a5824e707f4dbe52773a260fbb89854a

> HiveFunctionDefinitionFactory load udf using user classloader instead of 
> thread context classloader
> ---
>
> Key: FLINK-28451
> URL: https://issues.apache.org/jira/browse/FLINK-28451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Affects Versions: 1.16.0
>Reporter: dalongliu
>Assignee: luoyuxia
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> Currently `HiveFunctionDefinitionFactory` load user defined function using 
> thread context classloader, after we introducing user classloader to load 
> function class from FLINK-27659, this maybe doesn't work if the user jar not 
> in thread classloader. The following exception will be thrown:
> {code:java}
>  23478 [main] WARN  org.apache.flink.table.client.cli.CliClient [] - Could 
> not execute SQL 
> statement.org.apache.flink.table.client.gateway.SqlExecutionException: Failed 
> to parse statement: SELECT id, func1(str) FROM (VALUES (1, 'Hello World')) AS 
> T(id, str) ;at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:174)
>  ~[classes/:?]at 
> org.apache.flink.table.client.cli.SqlCommandParserImpl.parseCommand(SqlCommandParserImpl.java:45)
>  ~[classes/:?]at 
> org.apache.flink.table.client.cli.SqlMultiLineParser.parse(SqlMultiLineParser.java:71)
>  ~[classes/:?]at 
> org.jline.reader.impl.LineReaderImpl.acceptLine(LineReaderImpl.java:2964) 
> ~[jline-reader-3.21.0.jar:?]at 
> org.jline.reader.impl.LineReaderImpl$1.apply(LineReaderImpl.java:3778) 
> ~[jline-reader-3.21.0.jar:?]at 
> org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:679) 
> ~[jline-reader-3.21.0.jar:?]at 
> org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:296)
>  [classes/:?]at 
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:281)
>  [classes/:?]at 
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:229)
>  [classes/:?]at 
> org.apache.flink.table.client.cli.CliClientITCase.runSqlStatements(CliClientITCase.java:172)
>  [test-classes/:?]at 
> org.apache.flink.table.client.cli.CliClientITCase.testSqlStatements(CliClientITCase.java:137)
>  [test-classes/:?]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) ~[?:1.8.0_291]at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_291]at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_291]at java.lang.reflect.Method.invoke(Method.java:498) 
> ~[?:1.8.0_291]at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  [junit-4.13.2.jar:4.13.2]at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  [junit-4.13.2.jar:4.13.2]at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  [junit-4.13.2.jar:4.13.2]at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  [junit-4.13.2.jar:4.13.2]at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> [junit-4.13.2.jar:4.13.2]at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> [junit-4.13.2.jar:4.13.2]at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) 
> [classes/:?]at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) 
> [junit-4.13.2.jar:4.13.2]at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) 
> [junit-4.13.2.jar:4.13.2]at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  [junit-4.13.2.jar:4.13.2]at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) 
> [junit-4.13.2.jar:4.13.2]at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  [junit-4.13.2.jar:4.13.2]at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  [junit-4.13.2.jar:4.13.2]at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) 
> [junit-4.13.2.jar:4.13.2]at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) 
> [junit-4.13.2.jar:4.13.2]at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) 
> [junit-4.13.2.jar:4.13.2]at 
> org.junit.runners.Paren

[GitHub] [flink] wuchong merged pull request #19727: [FLINK-27618][sql] Flink supports CUME_DIST function

2022-07-22 Thread GitBox


wuchong merged PR #19727:
URL: https://github.com/apache/flink/pull/19727


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wuchong closed pull request #20211: [FLINK-28451][hive] Use UserCodeClassloader instead of the current thread's classloader to load function

2022-07-22 Thread GitBox


wuchong closed pull request #20211: [FLINK-28451][hive] Use UserCodeClassloader 
instead of the current thread's classloader to load function
URL: https://github.com/apache/flink/pull/20211


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-elasticsearch] Recorvery closed pull request #25: Update Elasticsearch7SinkBuilder.java

2022-07-22 Thread GitBox


Recorvery closed pull request #25: Update Elasticsearch7SinkBuilder.java
URL: https://github.com/apache/flink-connector-elasticsearch/pull/25


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-elasticsearch] boring-cyborg[bot] commented on pull request #25: Update Elasticsearch7SinkBuilder.java

2022-07-22 Thread GitBox


boring-cyborg[bot] commented on PR #25:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/25#issuecomment-1193058526

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-elasticsearch] Recorvery opened a new pull request, #25: Update Elasticsearch7SinkBuilder.java

2022-07-22 Thread GitBox


Recorvery opened a new pull request, #25:
URL: https://github.com/apache/flink-connector-elasticsearch/pull/25

   The example code is missing the right-hand parenthesis


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on a diff in pull request #20084: [FLINK-27988][table-planner] Let HiveTableSource extend from SupportStatisticsReport

2022-07-22 Thread GitBox


luoyuxia commented on code in PR #20084:
URL: https://github.com/apache/flink/pull/20084#discussion_r928066455


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java:
##
@@ -315,7 +315,7 @@ private RowType getProducedRowType() {
 return (RowType) 
producedSchema.toRowDataType().bridgedTo(RowData.class).getLogicalType();
 }
 
-private BulkFormat createDefaultBulkFormat() {
+public BulkFormat createDefaultBulkFormat() {

Review Comment:
   Can be `protected`.



##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java:
##
@@ -261,6 +273,99 @@ public DynamicTableSource copy() {
 return source;
 }
 
+@Override
+public TableStats reportStatistics() {
+try {
+// only support BOUNDED source
+if (isStreamingSource()) {
+return TableStats.UNKNOWN;
+}
+if 
(flinkConf.get(FileSystemConnectorOptions.SOURCE_REPORT_STATISTICS)
+!= FileSystemConnectorOptions.FileStatisticsType.ALL) {
+return TableStats.UNKNOWN;
+}
+
+HiveSourceBuilder sourceBuilder =
+new HiveSourceBuilder(jobConf, flinkConf, tablePath, 
hiveVersion, catalogTable)
+.setProjectedFields(projectedFields)
+.setLimit(limit);
+int threadNum =
+
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
+List hivePartitionsToRead =
+getAllPartitions(
+jobConf,
+hiveVersion,
+tablePath,
+catalogTable.getPartitionKeys(),
+remainingPartitions);
+BulkFormat defaultBulkFormat =
+sourceBuilder.createDefaultBulkFormat();
+List inputSplits =
+HiveSourceFileEnumerator.createInputSplits(
+0, hivePartitionsToRead, threadNum, jobConf);
+if (inputSplits.size() != 0) {
+TableStats tableStats;
+if (defaultBulkFormat instanceof 
FileBasedStatisticsReportableInputFormat) {
+// If HiveInputFormat's variable useMapRedReader is false, 
Hive using Flink's
+// InputFormat to read data.
+tableStats =
+((FileBasedStatisticsReportableInputFormat) 
defaultBulkFormat)
+.reportStatistics(
+inputSplits.stream()
+.map(FileSourceSplit::path)
+
.collect(Collectors.toList()),
+
catalogTable.getSchema().toRowDataType());
+} else {
+// If HiveInputFormat's variable useMapRedReader is true, 
Hive using MapRed

Review Comment:
   ```suggestion
   // If HiveInputFormat's variable useMapRedReader is 
true, Flink will use hadoop mapred record reader to read data.
   ```



##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java:
##
@@ -261,6 +273,99 @@ public DynamicTableSource copy() {
 return source;
 }
 
+@Override
+public TableStats reportStatistics() {
+try {
+// only support BOUNDED source
+if (isStreamingSource()) {
+return TableStats.UNKNOWN;
+}
+if 
(flinkConf.get(FileSystemConnectorOptions.SOURCE_REPORT_STATISTICS)
+!= FileSystemConnectorOptions.FileStatisticsType.ALL) {
+return TableStats.UNKNOWN;
+}
+
+HiveSourceBuilder sourceBuilder =
+new HiveSourceBuilder(jobConf, flinkConf, tablePath, 
hiveVersion, catalogTable)
+.setProjectedFields(projectedFields)
+.setLimit(limit);
+int threadNum =
+
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
+List hivePartitionsToRead =
+getAllPartitions(
+jobConf,
+hiveVersion,
+tablePath,
+catalogTable.getPartitionKeys(),
+remainingPartitions);
+BulkFormat defaultBulkFormat =
+sourceBuilder.createDefaultBulkFormat();
+List inputSplits =
+HiveSourceFileEnumerator.createInputSplits(
+0, hivePartitionsToRead, threadNum, jobConf);
+if (inp

[GitHub] [flink] DavidLiu001 commented on a diff in pull request #20338: [FLINK-27536][Connectors / Common] Rename method parameter in AsyncSi…

2022-07-22 Thread GitBox


DavidLiu001 commented on code in PR #20338:
URL: https://github.com/apache/flink/pull/20338#discussion_r927431507


##
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##
@@ -170,30 +170,30 @@
  * the valid limits of the destination). The logic then needs to create 
and execute the request
  * asynchronously against the destination (ideally by batching together 
multiple request entries
  * to increase efficiency). The logic also needs to identify individual 
request entries that
- * were not persisted successfully and resubmit them using the {@code 
requestResult} callback.
+ * were not persisted successfully and resubmit them using the {@code 
requestToRetry} callback.

Review Comment:
   For example, it is used to call the method of accept() ->  
requestToRetry.accept(retryableFailedRecords)
   @dannycranmer  If you still think requestsToRetry is better, I will change 
it to  requestsToRetry



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zzzzzzzs commented on pull request #20323: [FLINK-26944][Table SQL/API] Add ADD_MONTHS supported in SQL and Table API

2022-07-22 Thread GitBox


zzzs commented on PR #20323:
URL: https://github.com/apache/flink/pull/20323#issuecomment-1193042878

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-26047) Support usrlib in HDFS for YARN application mode

2022-07-22 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-26047:
--
Fix Version/s: 1.16.0

> Support usrlib in HDFS for YARN application mode
> 
>
> Key: FLINK-26047
> URL: https://issues.apache.org/jira/browse/FLINK-26047
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Reporter: Biao Geng
>Assignee: Biao Geng
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.16.0
>
>
> In YARN Application mode, we currently support using user jar and lib jar 
> from HDFS. For example, we can run commands like:
> {quote}./bin/flink run-application -t yarn-application \
>   -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" \
>   hdfs://myhdfs/jars/my-application.jar{quote}
> For {{usrlib}}, we currently only support local directory. I propose to add 
> HDFS support for {{usrlib}} to work with CLASSPATH_INCLUDE_USER_JAR better. 
> It can also benefit cases like using notebook to submit flink job.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-26047) Support usrlib in HDFS for YARN application mode

2022-07-22 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang closed FLINK-26047.
-
Resolution: Fixed

Fixed via:

master: fc533b9d9c1252db124b4c1cb8365b3906b009cf

> Support usrlib in HDFS for YARN application mode
> 
>
> Key: FLINK-26047
> URL: https://issues.apache.org/jira/browse/FLINK-26047
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Reporter: Biao Geng
>Assignee: Biao Geng
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.16.0
>
>
> In YARN Application mode, we currently support using user jar and lib jar 
> from HDFS. For example, we can run commands like:
> {quote}./bin/flink run-application -t yarn-application \
>   -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" \
>   hdfs://myhdfs/jars/my-application.jar{quote}
> For {{usrlib}}, we currently only support local directory. I propose to add 
> HDFS support for {{usrlib}} to work with CLASSPATH_INCLUDE_USER_JAR better. 
> It can also benefit cases like using notebook to submit flink job.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] wangyang0918 merged pull request #19065: [FLINK-26047][yarn] Support remote usrlib in HDFS for YARN deployment

2022-07-22 Thread GitBox


wangyang0918 merged PR #19065:
URL: https://github.com/apache/flink/pull/19065


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zzzzzzzs commented on pull request #20342: [FLINK-26939][Table SQL/API] Add TRANSLATE supported in SQL & Table API

2022-07-22 Thread GitBox


zzzs commented on PR #20342:
URL: https://github.com/apache/flink/pull/20342#issuecomment-1193036846

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zstraw commented on pull request #17956: [FLINK-18779][table sql/planner]Support the SupportsFilterPushDown for LookupTableSource

2022-07-22 Thread GitBox


zstraw commented on PR #17956:
URL: https://github.com/apache/flink/pull/17956#issuecomment-1193036249

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27851) Join can't access the pk from source table

2022-07-22 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-27851:
---
Labels: pull-request-available stale-assigned  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issue is assigned but has not 
received an update in 30 days, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a 
comment updating the community on your progress.  If this issue is waiting on 
feedback, please consider this a reminder to the committer/reviewer. Flink is a 
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone 
else may work on it.


> Join can't access the pk from source table
> --
>
> Key: FLINK-27851
> URL: https://issues.apache.org/jira/browse/FLINK-27851
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: xuyang
>Assignee: xuyang
>Priority: Minor
>  Labels: pull-request-available, stale-assigned
>
> If the source table contains a pk and enable mini-batch, the join can't get 
> the pk information in source table. The root cause is that the 
> `FlinkRelMdUniqueKeys` is not override the function with arg 
> MiniBatchAssigner.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28652) Amazon Linux 2 based Docker container for Flink

2022-07-22 Thread Alexey Petrenko (Jira)
Alexey Petrenko created FLINK-28652:
---

 Summary: Amazon Linux 2 based Docker container for Flink
 Key: FLINK-28652
 URL: https://issues.apache.org/jira/browse/FLINK-28652
 Project: Flink
  Issue Type: New Feature
  Components: flink-docker
 Environment: AWS
Reporter: Alexey Petrenko


Amazon Linux 2 is the Linux distribution which is widely used by projects 
running on AWS.
It would be nice to have Amazon Linux 2 based container for Flink in such cases 
so it's possible to standardize the AWS running product on a single Linux 
distribution.

I currently have a prototype of such a container which we're verifying to work.
And I'm ready to donate the code as soon as it's completed.
It requires just minor changes related to packages installation and jemalloc 
library location in the docker-entrypoint.sh



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28650) Flink SQL Parsing bug for METADATA

2022-07-22 Thread Jun Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17570124#comment-17570124
 ] 

Jun Qin commented on FLINK-28650:
-

Thanks [~JasonLee]. Your example indeed works, and it is included in my 
description above. The JIRA is to fix the issue when column names are 
explicitly specified. 
[~jark] FYI

> Flink SQL Parsing bug for METADATA
> --
>
> Key: FLINK-28650
> URL: https://issues.apache.org/jira/browse/FLINK-28650
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.14.4
>Reporter: Jun Qin
>Priority: Major
>
> With the following source/sink tables:
> {code:sql}
> CREATE TABLE sourceTable ( 
> `key` INT, 
> `time` TIMESTAMP(3),
> `value` STRING NOT NULL, 
> id INT 
> ) 
> WITH ( 
> 'connector' = 'datagen', 
> 'rows-per-second'='10', 
> 'fields.id.kind'='sequence', 
> 'fields.id.start'='1', 
> 'fields.id.end'='100' 
> );
> CREATE TABLE sinkTable1 ( 
> `time` TIMESTAMP(3) METADATA FROM 'timestamp', 
> `value` STRING NOT NULL
> ) 
> WITH ( 
>   'connector' = 'kafka',
> ...
> )
> CREATE TABLE sinkTable2 ( 
> `time` TIMESTAMP(3),-- without METADATA
> `value` STRING NOT NULL
> ) 
> WITH ( 
>   'connector' = 'kafka',
> ...
> )
> {code}
> the following three pass the validation:
> {code:sql}
> INSERT INTO sinkTable1
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> INSERT INTO sinkTable2
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> INSERT INTO sinkTable2 (`time`,`value`)
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> {code}
> but this one does not:
> {code:sql}
> INSERT INTO sinkTable1 (`time`,`value`)
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> {code}
> It failed with 
> {code:java}
> Unknown target column 'time'
> {code}
> It seems when providing column names in INSERT, the METADATA have an 
> undesired effect. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28651) JarRunHandler ignores restore mode set in the configuration

2022-07-22 Thread Nuno Afonso (Jira)
Nuno Afonso created FLINK-28651:
---

 Summary: JarRunHandler ignores restore mode set in the 
configuration
 Key: FLINK-28651
 URL: https://issues.apache.org/jira/browse/FLINK-28651
 Project: Flink
  Issue Type: Bug
  Components: Runtime / REST
Affects Versions: 1.15.1, 1.15.0
Reporter: Nuno Afonso


Hello all,

 

I started a Flink cluster with execution.savepoint-restore-mode set to CLAIM. 
Then, I submitted a job through the REST API without specifying restoreMode in 
the request. I would expect Flink to use CLAIM, but the job used NO_CLAIM as 
restore mode.

 

After looking into the source code, I believe the issue comes from the way 
JarRunHandler picks the default for restoreMode. It directly gets it from the 
default of execution.savepoint-restore-mode instead of looking into the 
existing configuration: 
[https://github.com/apache/flink/blob/release-1.15.1/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L150]

 

I think the fix can be achieved by getting execution.savepoint-restore-mode 
from the configuration.

 

Looking forward to hearing your feedback.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

2022-07-22 Thread GitBox


hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r927824434


##
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/RequestInfo.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.time.Instant;
+
+/** Dataclass to encapsulate information about starting requests. */
+@PublicEvolving
+public class RequestInfo {
+private final int batchSize;
+private final Instant requestStartTime;
+
+private RequestInfo(int batchSize, Instant requestStartTime) {
+this.batchSize = batchSize;
+this.requestStartTime = requestStartTime;
+}
+
+@PublicEvolving
+public static RequestInfoBuilder builder() {
+return new RequestInfoBuilder();
+}
+
+public int getBatchSize() {
+return batchSize;
+}
+
+public Instant getRequestStartTime() {
+return requestStartTime;
+}
+
+/** Builder for {@link RequestInfo} dataclass. */
+public static class RequestInfoBuilder {
+private int batchSize;
+private Instant requestStartTime;
+
+public RequestInfoBuilder setBatchSize(final int batchSize) {
+this.batchSize = batchSize;
+return this;
+}
+
+public RequestInfoBuilder setRequestStartTime(final Instant 
requestStartTime) {
+this.requestStartTime = requestStartTime;
+return this;
+}
+
+public RequestInfo build() {
+return new RequestInfo(batchSize, requestStartTime);
+}
+}

Review Comment:
   I think a fluent builder makes the code more readable, especially for a 
dataclass like this, which can be extended to have more information in the 
future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] jeesmon opened a new pull request, #328: [FLINK-28637] Set explicit version for okhttp to fix vulnerability

2022-07-22 Thread GitBox


jeesmon opened a new pull request, #328:
URL: https://github.com/apache/flink-kubernetes-operator/pull/328

   ## What is the purpose of the change
   
   Setting explicit version for okhttp until we can upgrade to new version of 
JSODK with the fix.
   
   ## Brief change log
   
 - Setting explicit version for okhttp to 4.10.0 to fix PRISMA-2022-0239


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hlteoh37 commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

2022-07-22 Thread GitBox


hlteoh37 commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r927823525


##
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##
@@ -344,69 +352,69 @@ public void write(InputT element, Context context) throws 
IOException, Interrupt
  * 
  */
 private void nonBlockingFlush() throws InterruptedException {
-while (!isInFlightRequestOrMessageLimitExceeded()
+while (!rateLimitingStrategy.shouldBlock(createRequestInfo())
 && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
 || bufferedRequestEntriesTotalSizeInBytes >= 
maxBatchSizeInBytes)) {
 flush();
 }
 }
 
-/**
- * Determines if the sink should block and complete existing in flight 
requests before it may
- * prudently create any new ones. This is exactly determined by if the 
number of requests
- * currently in flight exceeds the maximum supported by the sink OR if the 
number of in flight
- * messages exceeds the maximum determined to be appropriate by the rate 
limiting strategy.
- */
-private boolean isInFlightRequestOrMessageLimitExceeded() {
-return inFlightRequestsCount >= maxInFlightRequests
-|| inFlightMessages >= rateLimitingStrategy.getRateLimit();
+private RequestInfo createRequestInfo() {
+int batchSize = getNextBatchSize();
+Instant requestStartTime = Instant.now();
+return RequestInfo.builder()
+.setBatchSize(batchSize)
+.setRequestStartTime(requestStartTime)
+.build();

Review Comment:
   Happy to remove `requestStartTime`. It is not used by the current 
implementation of `RateLimitingStrategy` at the moment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28637) High vulnerability in flink-kubernetes-operator-1.1.0-shaded.jar

2022-07-22 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-28637:
---
Fix Version/s: kubernetes-operator-1.1.1
   kubernetes-operator-1.2.0

> High vulnerability in flink-kubernetes-operator-1.1.0-shaded.jar
> 
>
> Key: FLINK-28637
> URL: https://issues.apache.org/jira/browse/FLINK-28637
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.1.0, kubernetes-operator-1.0.1
>Reporter: James Busche
>Assignee: Jeesmon Jacob
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.2.0, kubernetes-operator-1.1.1
>
>
> I noticed a high vulnerability in the 
> flink-kubernetes-operator-1.1.0-shaded.jar file.
> ===
> cvss: 7.5
> riskFactors: Has fix,High severity
> cve: PRISMA-2022-0239    
> link: https://github.com/square/okhttp/issues/6738
> status: fixed in 4.9.2
> packagePath: 
> /flink-kubernetes-operator/flink-kubernetes-operator-1.1.0-shaded.jar
> description: com.squareup.okhttp3_okhttp packages prior to version 4.9.2 are 
> vulnerable for sensitive information disclosure. An illegal character in a 
> header value will cause IllegalArgumentException which will include full 
> header value. This applies to Authorization, Cookie, Proxy-Authorization and 
> Set-Cookie headers. 
> ===
> It looks like we're using version 3.12.12, and there's no plans to provide 
> this fix for the 3.x version.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28637) High vulnerability in flink-kubernetes-operator-1.1.0-shaded.jar

2022-07-22 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-28637:
---
Affects Version/s: kubernetes-operator-1.0.1

> High vulnerability in flink-kubernetes-operator-1.1.0-shaded.jar
> 
>
> Key: FLINK-28637
> URL: https://issues.apache.org/jira/browse/FLINK-28637
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.1.0, kubernetes-operator-1.0.1
>Reporter: James Busche
>Assignee: Jeesmon Jacob
>Priority: Major
>  Labels: pull-request-available
>
> I noticed a high vulnerability in the 
> flink-kubernetes-operator-1.1.0-shaded.jar file.
> ===
> cvss: 7.5
> riskFactors: Has fix,High severity
> cve: PRISMA-2022-0239    
> link: https://github.com/square/okhttp/issues/6738
> status: fixed in 4.9.2
> packagePath: 
> /flink-kubernetes-operator/flink-kubernetes-operator-1.1.0-shaded.jar
> description: com.squareup.okhttp3_okhttp packages prior to version 4.9.2 are 
> vulnerable for sensitive information disclosure. An illegal character in a 
> header value will cause IllegalArgumentException which will include full 
> header value. This applies to Authorization, Cookie, Proxy-Authorization and 
> Set-Cookie headers. 
> ===
> It looks like we're using version 3.12.12, and there's no plans to provide 
> this fix for the 3.x version.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28637) High vulnerability in flink-kubernetes-operator-1.1.0-shaded.jar

2022-07-22 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-28637:
--

Assignee: Jeesmon Jacob

> High vulnerability in flink-kubernetes-operator-1.1.0-shaded.jar
> 
>
> Key: FLINK-28637
> URL: https://issues.apache.org/jira/browse/FLINK-28637
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.1.0
>Reporter: James Busche
>Assignee: Jeesmon Jacob
>Priority: Major
>  Labels: pull-request-available
>
> I noticed a high vulnerability in the 
> flink-kubernetes-operator-1.1.0-shaded.jar file.
> ===
> cvss: 7.5
> riskFactors: Has fix,High severity
> cve: PRISMA-2022-0239    
> link: https://github.com/square/okhttp/issues/6738
> status: fixed in 4.9.2
> packagePath: 
> /flink-kubernetes-operator/flink-kubernetes-operator-1.1.0-shaded.jar
> description: com.squareup.okhttp3_okhttp packages prior to version 4.9.2 are 
> vulnerable for sensitive information disclosure. An illegal character in a 
> header value will cause IllegalArgumentException which will include full 
> header value. This applies to Authorization, Cookie, Proxy-Authorization and 
> Set-Cookie headers. 
> ===
> It looks like we're using version 3.12.12, and there's no plans to provide 
> this fix for the 3.x version.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28637) High vulnerability in flink-kubernetes-operator-1.1.0-shaded.jar

2022-07-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-28637:
---
Labels: pull-request-available  (was: )

> High vulnerability in flink-kubernetes-operator-1.1.0-shaded.jar
> 
>
> Key: FLINK-28637
> URL: https://issues.apache.org/jira/browse/FLINK-28637
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.1.0
>Reporter: James Busche
>Priority: Major
>  Labels: pull-request-available
>
> I noticed a high vulnerability in the 
> flink-kubernetes-operator-1.1.0-shaded.jar file.
> ===
> cvss: 7.5
> riskFactors: Has fix,High severity
> cve: PRISMA-2022-0239    
> link: https://github.com/square/okhttp/issues/6738
> status: fixed in 4.9.2
> packagePath: 
> /flink-kubernetes-operator/flink-kubernetes-operator-1.1.0-shaded.jar
> description: com.squareup.okhttp3_okhttp packages prior to version 4.9.2 are 
> vulnerable for sensitive information disclosure. An illegal character in a 
> header value will cause IllegalArgumentException which will include full 
> header value. This applies to Authorization, Cookie, Proxy-Authorization and 
> Set-Cookie headers. 
> ===
> It looks like we're using version 3.12.12, and there's no plans to provide 
> this fix for the 3.x version.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] jeesmon opened a new pull request, #327: [FLINK-28637] Set explicit version for okhttp to fix vulnerability

2022-07-22 Thread GitBox


jeesmon opened a new pull request, #327:
URL: https://github.com/apache/flink-kubernetes-operator/pull/327

   ## What is the purpose of the change
   
   Setting explicit version for okhttp until we can upgrade to new version of 
JSODK with the fix.
   
   ## Brief change log
   
 - Setting explicit version for okhttp to 4.10.0 to fix PRISMA-2022-0239 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-28637) High vulnerability in flink-kubernetes-operator-1.1.0-shaded.jar

2022-07-22 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17570097#comment-17570097
 ] 

Gyula Fora edited comment on FLINK-28637 at 7/22/22 3:53 PM:
-

Personally I am not extremely confident in simply swapping out a HttpClient 
implementation and releasing it with only minimal testing. The current JOSDK, 
fabric8, okhttp clients have been tested in various cloud environments for 
weeks/months.

It would be a real shame to introduce instability or any other problems for 
fixing a vulnaribility that cannot reasonable surface for the operator. 
Especially by doing a last minute change like that.

Please open a PR, we can merge this for the main/release-1.1 branches and 
release a patch release after 1-2 weeks of testing.


was (Author: gyfora):
Personally I am not extremely confident in simply swapping out a HttpClient 
implementation and releasing it with only minimal testing. The current JOSDK, 
fabric8, okhttp clients have been tested in various cloud environments for 
weeks/months.

It would be a real shame to introduce unstability or any other problems for 
fixing a vulnaribility that cannot reasonable surface for the operator. 
Especially by doing a last minute change like that.

Please open a PR, we can merge this for the main/release-1.1 branches and 
release a patch release after 1-2 weeks of testing.

> High vulnerability in flink-kubernetes-operator-1.1.0-shaded.jar
> 
>
> Key: FLINK-28637
> URL: https://issues.apache.org/jira/browse/FLINK-28637
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.1.0
>Reporter: James Busche
>Priority: Major
>
> I noticed a high vulnerability in the 
> flink-kubernetes-operator-1.1.0-shaded.jar file.
> ===
> cvss: 7.5
> riskFactors: Has fix,High severity
> cve: PRISMA-2022-0239    
> link: https://github.com/square/okhttp/issues/6738
> status: fixed in 4.9.2
> packagePath: 
> /flink-kubernetes-operator/flink-kubernetes-operator-1.1.0-shaded.jar
> description: com.squareup.okhttp3_okhttp packages prior to version 4.9.2 are 
> vulnerable for sensitive information disclosure. An illegal character in a 
> header value will cause IllegalArgumentException which will include full 
> header value. This applies to Authorization, Cookie, Proxy-Authorization and 
> Set-Cookie headers. 
> ===
> It looks like we're using version 3.12.12, and there's no plans to provide 
> this fix for the 3.x version.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28637) High vulnerability in flink-kubernetes-operator-1.1.0-shaded.jar

2022-07-22 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17570097#comment-17570097
 ] 

Gyula Fora commented on FLINK-28637:


Personally I am not extremely confident in simply swapping out a HttpClient 
implementation and releasing it with only minimal testing. The current JOSDK, 
fabric8, okhttp clients have been tested in various cloud environments for 
weeks/months.

It would be a real shame to introduce unstability or any other problems for 
fixing a vulnaribility that cannot reasonable surface for the operator. 
Especially by doing a last minute change like that.

Please open a PR, we can merge this for the main/release-1.1 branches and 
release a patch release after 1-2 weeks of testing.

> High vulnerability in flink-kubernetes-operator-1.1.0-shaded.jar
> 
>
> Key: FLINK-28637
> URL: https://issues.apache.org/jira/browse/FLINK-28637
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.1.0
>Reporter: James Busche
>Priority: Major
>
> I noticed a high vulnerability in the 
> flink-kubernetes-operator-1.1.0-shaded.jar file.
> ===
> cvss: 7.5
> riskFactors: Has fix,High severity
> cve: PRISMA-2022-0239    
> link: https://github.com/square/okhttp/issues/6738
> status: fixed in 4.9.2
> packagePath: 
> /flink-kubernetes-operator/flink-kubernetes-operator-1.1.0-shaded.jar
> description: com.squareup.okhttp3_okhttp packages prior to version 4.9.2 are 
> vulnerable for sensitive information disclosure. An illegal character in a 
> header value will cause IllegalArgumentException which will include full 
> header value. This applies to Authorization, Cookie, Proxy-Authorization and 
> Set-Cookie headers. 
> ===
> It looks like we're using version 3.12.12, and there's no plans to provide 
> this fix for the 3.x version.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28637) High vulnerability in flink-kubernetes-operator-1.1.0-shaded.jar

2022-07-22 Thread Jeesmon Jacob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17570096#comment-17570096
 ] 

Jeesmon Jacob commented on FLINK-28637:
---

[~mbalassi] Like JOSDK explicitly set okhttp version, can we use the same 
approach in 1.1.0 until we can upgrade JOSDK? That way we don't need to ship a 
new version with vulnerability. I'm using this approach locally in 1.0.1 and 
happy to create a PR. All e2e tests are passing with okhttp version upgrade.

Just adding the diff here so we can refer in case we decide not to fix it now 
and someone need it to satisfy their internal security requirements.
{code:java}
diff --git a/flink-kubernetes-operator/pom.xml 
b/flink-kubernetes-operator/pom.xml
index 6e85b8c..e82a5e9 100644
--- a/flink-kubernetes-operator/pom.xml
+++ b/flink-kubernetes-operator/pom.xml
@@ -143,6 +143,28 @@ under the License.
 ${junit.jupiter.version}
 test
 
+
+
+
+com.squareup.okhttp3
+okhttp
+${okhttp.version}
+
+
+com.squareup.okhttp3
+logging-interceptor
+${okhttp.version}
+
+
+com.squareup.okhttp3
+mockwebserver
+${okhttp.version}
+test
+
 

 
diff --git a/pom.xml b/pom.xml
index 279f0b5..9f04d01 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,6 +79,8 @@ under the License.

 2.4.2
 true
+
+4.10.0
 

 
{code}

> High vulnerability in flink-kubernetes-operator-1.1.0-shaded.jar
> 
>
> Key: FLINK-28637
> URL: https://issues.apache.org/jira/browse/FLINK-28637
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.1.0
>Reporter: James Busche
>Priority: Major
>
> I noticed a high vulnerability in the 
> flink-kubernetes-operator-1.1.0-shaded.jar file.
> ===
> cvss: 7.5
> riskFactors: Has fix,High severity
> cve: PRISMA-2022-0239    
> link: https://github.com/square/okhttp/issues/6738
> status: fixed in 4.9.2
> packagePath: 
> /flink-kubernetes-operator/flink-kubernetes-operator-1.1.0-shaded.jar
> description: com.squareup.okhttp3_okhttp packages prior to version 4.9.2 are 
> vulnerable for sensitive information disclosure. An illegal character in a 
> header value will cause IllegalArgumentException which will include full 
> header value. This applies to Authorization, Cookie, Proxy-Authorization and 
> Set-Cookie headers. 
> ===
> It looks like we're using version 3.12.12, and there's no plans to provide 
> this fix for the 3.x version.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] pnowojski commented on a diff in pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…

2022-07-22 Thread GitBox


pnowojski commented on code in PR #20245:
URL: https://github.com/apache/flink/pull/20245#discussion_r927747602


##
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/AIMDScalingStrategy.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * AIMDScalingStrategy scales up linearly and scales down multiplicatively. See
+ * https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease for 
more details
+ */
+@PublicEvolving
+public class AIMDScalingStrategy {
+private final int increaseRate;
+private final double decreaseFactor;
+private final int rateThreshold;
+
+public AIMDScalingStrategy(int increaseRate, double decreaseFactor, int 
rateThreshold) {
+Preconditions.checkArgument(increaseRate > 0, "increaseRate must be 
positive integer.");
+Preconditions.checkArgument(
+decreaseFactor < 1.0 && decreaseFactor > 0.0,
+"decreaseFactor must be strictly between 0.0 and 1.0.");
+Preconditions.checkArgument(rateThreshold > 0, "rateThreshold must be 
a positive integer.");
+Preconditions.checkArgument(
+rateThreshold >= increaseRate, "rateThreshold must be larger 
than increaseRate.");
+this.increaseRate = increaseRate;
+this.decreaseFactor = decreaseFactor;
+this.rateThreshold = rateThreshold;
+}
+
+public int scaleUp(int currentRate) {
+return Math.min(currentRate + increaseRate, rateThreshold);
+}
+
+public int scaleDown(int currentRate) {
+return Math.max(1, (int) Math.round(currentRate * decreaseFactor));
+}
+
+@PublicEvolving
+public static AIMDScalingStrategyBuilder builder() {
+return new AIMDScalingStrategyBuilder();
+}
+
+/** Builder for {@link AIMDScalingStrategy}. */
+public static class AIMDScalingStrategyBuilder {
+
+private int increaseRate = 10;
+private double decreaseFactor = 0.5;
+private int rateThreshold;

Review Comment:
   nit: why does it have default value `0` if later you are doing 
`checkArgument(rateThreshold > 0)` in the constructor? If it's obligatory 
parameter without a good default value, then I think it would be cleaner to 
pass it through the constructor to the `AIMDScalingStrategyBuilder`:
   
   ```
   AIMDScalingStrategyBuilder
 .builder(myRateThreshold)
 .setOptionalParam1(foo)
 .setOptionalParam2(bar)
   ```



##
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/strategy/RequestInfo.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer.strategy;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.time.Instant;
+
+/** Dataclass to encapsulate information about starting requests. */
+@PublicEvolving
+public class RequestInfo {
+private final int batchSize;
+private final Instant requestStartTime;
+
+private RequestInfo(int batchSize, Instant requestStartTime) {
+this.batchSize = batchSize;
+this.requestStartTime = requestStartTime;
+}
+
+@PublicEvolving
+public static RequestInfoBuilder bui

[jira] [Commented] (FLINK-28650) Flink SQL Parsing bug for METADATA

2022-07-22 Thread JasonLee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17570073#comment-17570073
 ] 

JasonLee commented on FLINK-28650:
--

hi [~qinjunjerry] ,maybe u can try the following
{code:java}
// code placeholder
INSERT INTO sinkTable1
SELECT 
`time`, 
`value`
FROM sourceTable; {code}

> Flink SQL Parsing bug for METADATA
> --
>
> Key: FLINK-28650
> URL: https://issues.apache.org/jira/browse/FLINK-28650
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.14.4
>Reporter: Jun Qin
>Priority: Major
>
> With the following source/sink tables:
> {code:sql}
> CREATE TABLE sourceTable ( 
> `key` INT, 
> `time` TIMESTAMP(3),
> `value` STRING NOT NULL, 
> id INT 
> ) 
> WITH ( 
> 'connector' = 'datagen', 
> 'rows-per-second'='10', 
> 'fields.id.kind'='sequence', 
> 'fields.id.start'='1', 
> 'fields.id.end'='100' 
> );
> CREATE TABLE sinkTable1 ( 
> `time` TIMESTAMP(3) METADATA FROM 'timestamp', 
> `value` STRING NOT NULL
> ) 
> WITH ( 
>   'connector' = 'kafka',
> ...
> )
> CREATE TABLE sinkTable2 ( 
> `time` TIMESTAMP(3),-- without METADATA
> `value` STRING NOT NULL
> ) 
> WITH ( 
>   'connector' = 'kafka',
> ...
> )
> {code}
> the following three pass the validation:
> {code:sql}
> INSERT INTO sinkTable1
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> INSERT INTO sinkTable2
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> INSERT INTO sinkTable2 (`time`,`value`)
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> {code}
> but this one does not:
> {code:sql}
> INSERT INTO sinkTable1 (`time`,`value`)
> SELECT 
> `time`, 
> `value`
> FROM sourceTable;
> {code}
> It failed with 
> {code:java}
> Unknown target column 'time'
> {code}
> It seems when providing column names in INSERT, the METADATA have an 
> undesired effect. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zzzzzzzs commented on pull request #20323: [FLINK-26944][Table SQL/API] Add ADD_MONTHS supported in SQL and Table API

2022-07-22 Thread GitBox


zzzs commented on PR #20323:
URL: https://github.com/apache/flink/pull/20323#issuecomment-1192681902

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zzzzzzzs commented on pull request #20342: [FLINK-26939][Table SQL/API] Add TRANSLATE supported in SQL & Table API

2022-07-22 Thread GitBox


zzzs commented on PR #20342:
URL: https://github.com/apache/flink/pull/20342#issuecomment-1192681393

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zzzzzzzs commented on pull request #20342: [FLINK-26939][Table SQL/API] Add TRANSLATE supported in SQL & Table API

2022-07-22 Thread GitBox


zzzs commented on PR #20342:
URL: https://github.com/apache/flink/pull/20342#issuecomment-1192674314

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-28231) Support Apache Ozone FileSystem

2022-07-22 Thread Xin Wen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17570064#comment-17570064
 ] 

Xin Wen edited comment on FLINK-28231 at 7/22/22 2:53 PM:
--

Hi [~bianqi] , can you share the steps to substitute Hadoop to Ozone? I am 
facing the same problem. Maybe write a blog before official document is updated!


was (Author: JIRAUSER293262):
Hi bianqi,

> Support Apache Ozone FileSystem
> ---
>
> Key: FLINK-28231
> URL: https://issues.apache.org/jira/browse/FLINK-28231
> Project: Flink
>  Issue Type: New Feature
>  Components: fs
>Affects Versions: 1.12.1
> Environment: * Flink1.12.1
>  * Hadoop3.2.2
>  * Ozone1.2.1
>Reporter: bianqi
>Priority: Major
> Attachments: abc.png, image-2022-06-24-10-05-42-193.png, 
> image-2022-06-24-10-08-26-010.png, mapreduce-webui.png, 
> 微信图片_20220623224142.png
>
>
> h3. Background:
> Apache Ozone has become a new generation object storage system. The future is 
> promising.
> Currently, Flink already supports S3, aliyunOSS, and Google Cloud Storage. 
> But Apache Ozone is not supported.
> h3. Purpose:
>  
> Flink supports the Apache Ozone file system
> h3. work content:
> We need to develop Flink to support Apache Ozone FileSystem. Also improve the 
> documentation.
> [https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/filesystems/overview/]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28231) Support Apache Ozone FileSystem

2022-07-22 Thread Xin Wen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17570064#comment-17570064
 ] 

Xin Wen commented on FLINK-28231:
-

Hi bianqi,

> Support Apache Ozone FileSystem
> ---
>
> Key: FLINK-28231
> URL: https://issues.apache.org/jira/browse/FLINK-28231
> Project: Flink
>  Issue Type: New Feature
>  Components: fs
>Affects Versions: 1.12.1
> Environment: * Flink1.12.1
>  * Hadoop3.2.2
>  * Ozone1.2.1
>Reporter: bianqi
>Priority: Major
> Attachments: abc.png, image-2022-06-24-10-05-42-193.png, 
> image-2022-06-24-10-08-26-010.png, mapreduce-webui.png, 
> 微信图片_20220623224142.png
>
>
> h3. Background:
> Apache Ozone has become a new generation object storage system. The future is 
> promising.
> Currently, Flink already supports S3, aliyunOSS, and Google Cloud Storage. 
> But Apache Ozone is not supported.
> h3. Purpose:
>  
> Flink supports the Apache Ozone file system
> h3. work content:
> We need to develop Flink to support Apache Ozone FileSystem. Also improve the 
> documentation.
> [https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/filesystems/overview/]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28637) High vulnerability in flink-kubernetes-operator-1.1.0-shaded.jar

2022-07-22 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-28637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17570063#comment-17570063
 ] 

Márton Balassi commented on FLINK-28637:


Fortunately both the Fabric8 and the JOSDK community was very responsive, this 
gives a path for fixing this. However given the following:
 
1. The HTTP client is internal to the operator, this vulnerability is very 
unlikely to affect it,
2. We also need to bump the dependency within the Flink native k8s integration,
3. We need extensive testing to make sure the new dependency version behaves 
properly,
 
My suggestion is to release 1.1.0 with this as a known issue and fix it in 
1.1.1. That said we can merge a fix for it to the release-1.1 as soon as 
possible, so folks who are prohibited to use the 1.1.0 version can roll their 
own image from source.
 
The JOSDK folks offered to produce a new patch release that we can depend on in 
1.1.1.

> High vulnerability in flink-kubernetes-operator-1.1.0-shaded.jar
> 
>
> Key: FLINK-28637
> URL: https://issues.apache.org/jira/browse/FLINK-28637
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.1.0
>Reporter: James Busche
>Priority: Major
>
> I noticed a high vulnerability in the 
> flink-kubernetes-operator-1.1.0-shaded.jar file.
> ===
> cvss: 7.5
> riskFactors: Has fix,High severity
> cve: PRISMA-2022-0239    
> link: https://github.com/square/okhttp/issues/6738
> status: fixed in 4.9.2
> packagePath: 
> /flink-kubernetes-operator/flink-kubernetes-operator-1.1.0-shaded.jar
> description: com.squareup.okhttp3_okhttp packages prior to version 4.9.2 are 
> vulnerable for sensitive information disclosure. An illegal character in a 
> header value will cause IllegalArgumentException which will include full 
> header value. This applies to Authorization, Cookie, Proxy-Authorization and 
> Set-Cookie headers. 
> ===
> It looks like we're using version 3.12.12, and there's no plans to provide 
> this fix for the 3.x version.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28650) Flink SQL Parsing bug for METADATA

2022-07-22 Thread Jun Qin (Jira)
Jun Qin created FLINK-28650:
---

 Summary: Flink SQL Parsing bug for METADATA
 Key: FLINK-28650
 URL: https://issues.apache.org/jira/browse/FLINK-28650
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.14.4
Reporter: Jun Qin


With the following source/sink tables:
{code:sql}
CREATE TABLE sourceTable ( 
`key` INT, 
`time` TIMESTAMP(3),
`value` STRING NOT NULL, 
id INT 
) 
WITH ( 
'connector' = 'datagen', 
'rows-per-second'='10', 
'fields.id.kind'='sequence', 
'fields.id.start'='1', 
'fields.id.end'='100' 
);

CREATE TABLE sinkTable1 ( 
`time` TIMESTAMP(3) METADATA FROM 'timestamp', 
`value` STRING NOT NULL
) 
WITH ( 
  'connector' = 'kafka',
...
)
CREATE TABLE sinkTable2 ( 
`time` TIMESTAMP(3),-- without METADATA
`value` STRING NOT NULL
) 
WITH ( 
  'connector' = 'kafka',
...
)
{code}
the following three pass the validation:
{code:sql}
INSERT INTO sinkTable1
SELECT 
`time`, 
`value`
FROM sourceTable;

INSERT INTO sinkTable2
SELECT 
`time`, 
`value`
FROM sourceTable;

INSERT INTO sinkTable2 (`time`,`value`)
SELECT 
`time`, 
`value`
FROM sourceTable;
{code}
but this one does not:
{code:sql}
INSERT INTO sinkTable1 (`time`,`value`)
SELECT 
`time`, 
`value`
FROM sourceTable;
{code}
It failed with 

{code:java}
Unknown target column 'time'
{code}

It seems when providing column names in INSERT, the METADATA have an undesired 
effect. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28649) Allow adopting unmanaged jobs as FlinkSessionJob resource

2022-07-22 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17570053#comment-17570053
 ] 

Gyula Fora commented on FLINK-28649:


cc [~jeesmon] 

> Allow adopting unmanaged jobs as FlinkSessionJob resource
> -
>
> Key: FLINK-28649
> URL: https://issues.apache.org/jira/browse/FLINK-28649
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Priority: Major
> Fix For: kubernetes-operator-1.2.0
>
>
> We should allow users to create a FlinkSessionJob resource for already 
> existing jobs deployed to a managed session cluster.
> We should add a configuration option to specify the jobId of the running job, 
> and then the operator would assume that it is running according to the 
> initial spec provided.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28649) Allow adopting unmanaged jobs as FlinkSessionJob resource

2022-07-22 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-28649:
--

 Summary: Allow adopting unmanaged jobs as FlinkSessionJob resource
 Key: FLINK-28649
 URL: https://issues.apache.org/jira/browse/FLINK-28649
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Reporter: Gyula Fora
 Fix For: kubernetes-operator-1.2.0


We should allow users to create a FlinkSessionJob resource for already existing 
jobs deployed to a managed session cluster.

We should add a configuration option to specify the jobId of the running job, 
and then the operator would assume that it is running according to the initial 
spec provided.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28648) Allow session deletion to block on any running job

2022-07-22 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-28648:
--

 Summary: Allow session deletion to block on any running job
 Key: FLINK-28648
 URL: https://issues.apache.org/jira/browse/FLINK-28648
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Reporter: Gyula Fora
 Fix For: kubernetes-operator-1.2.0


Currently session FlinkDeployment deletion blocks on existing FlinkSessionJob-s 
for that cluster.

We could add the option to block on any running job in case it is an unmanaged 
job deployed through the Flink CLI directly



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28648) Allow session deletion to block on any running job

2022-07-22 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17570051#comment-17570051
 ] 

Gyula Fora commented on FLINK-28648:


cc [~jeesmon] 

> Allow session deletion to block on any running job
> --
>
> Key: FLINK-28648
> URL: https://issues.apache.org/jira/browse/FLINK-28648
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Priority: Major
> Fix For: kubernetes-operator-1.2.0
>
>
> Currently session FlinkDeployment deletion blocks on existing 
> FlinkSessionJob-s for that cluster.
> We could add the option to block on any running job in case it is an 
> unmanaged job deployed through the Flink CLI directly



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zzzzzzzs commented on pull request #20342: [FLINK-26939][Table SQL/API] Add TRANSLATE supported in SQL & Table API

2022-07-22 Thread GitBox


zzzs commented on PR #20342:
URL: https://github.com/apache/flink/pull/20342#issuecomment-1192603439

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fredia commented on a diff in pull request #19907: [FLINK-27692][state] Support local recovery for materialized part of changelog

2022-07-22 Thread GitBox


fredia commented on code in PR #19907:
URL: https://github.com/apache/flink/pull/19907#discussion_r927677335


##
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java:
##
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationTestBase.CollectionSink;
+import 
org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationTestBase.CountFunction;
+import org.apache.flink.test.util.InfiniteIntegerSource;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.configuration.CheckpointingOptions.LOCAL_RECOVERY;
+import static 
org.apache.flink.configuration.ClusterOptions.JOB_MANAGER_PROCESS_WORKING_DIR_BASE;
+import static 
org.apache.flink.configuration.ClusterOptions.PROCESS_WORKING_DIR_BASE;
+import static 
org.apache.flink.configuration.ClusterOptions.TASK_MANAGER_PROCESS_WORKING_DIR_BASE;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+
+/**
+ * Local recovery IT case for changelog. It never fails because local recovery 
is nice but not
+ * necessary.
+ */
+@RunWith(Parameterized.class)
+public class ChangelogLocalRecoveryITCase {
+
+private static final int NUM_TASK_MANAGERS = 2;
+private static final int NUM_TASK_SLOTS = 1;
+
+@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+@Parameterized.Parameter public AbstractStateBackend delegatedStateBackend;
+
+@Parameterized.Parameters(name = "delegated state backend type = {0}")
+public static Collection parameter() {
+return Arrays.asList(
+new HashMapStateBackend(),
+new EmbeddedRocksDBStateBackend(false),
+new EmbeddedRocksDBStateBackend(true));
+}
+
+private MiniClusterWithClientResource cluster;
+private static String workingDir;
+
+@BeforeClass
+public static void setWorkingDir() throws IOException {
+workingDir = TEMPORARY_FOLDER.newFolder("work").getAbsolutePath();
+}
+
+@Before
+public void setup() throws Exception {
+Configuration configuration = new Configuration();
+
configuration.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 1);
+
+configuration.setString(PROCESS_WORKING_DIR_BASE, workingDir);
+configuration.setString(JOB_MANAGER_PROCESS_WORKING_DIR_BASE, 
workingDir);
+configuration.setString(TASK_MANAGER_PROCESS_WORKING_DIR_BASE, 
workingDir);
+configuration.setBoolean(LOCAL_RECOVERY, true);
+FsStateChangelogStorageFactory.configure(
+configuration, TEMPORARY_FOLDE

[GitHub] [flink] fredia commented on a diff in pull request #20152: [FLINK-27155][changelog] Reduce multiple reads to the same Changelog …

2022-07-22 Thread GitBox


fredia commented on code in PR #20152:
URL: https://github.com/apache/flink/pull/20152#discussion_r927673437


##
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java:
##
@@ -87,9 +87,14 @@ protected  CheckpointableKeyedStateBackend restore(
 String subtaskName = env.getTaskInfo().getTaskNameWithSubtasks();
 ExecutionConfig executionConfig = env.getExecutionConfig();
 
+env.getAsyncOperationsThreadPool();
+
 ChangelogStateFactory changelogStateFactory = new 
ChangelogStateFactory();
 CheckpointableKeyedStateBackend keyedStateBackend =
 ChangelogBackendRestoreOperation.restore(
+env.getJobID(),
+env.getAsyncOperationsThreadPool(),
+env.getTaskManagerInfo().getConfiguration(),

Review Comment:
   I debug this locally, there is only  `ENABLE_CHANGE_LOG_FOR_APPLICATION` in 
`JobConfiguration`, and the `PERIODIC_MATERIALIZATION_INTERVAL`  that 
`ChangelogHandleReaderWithCache` needs is not in `TaskManagerInfo` too. 
   
   I think this should be 
`env.getExecutionConfig().getPeriodicMaterializeIntervalMillis()`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zzzzzzzs commented on pull request #20323: [FLINK-26944][Table SQL/API] Add ADD_MONTHS supported in SQL and Table API

2022-07-22 Thread GitBox


zzzs commented on PR #20323:
URL: https://github.com/apache/flink/pull/20323#issuecomment-1192597362

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zzzzzzzs commented on pull request #20330: [FLINK-26940][Table SQL/API] Add SUBSTRING_INDEX supported in SQL & Table API

2022-07-22 Thread GitBox


zzzs commented on PR #20330:
URL: https://github.com/apache/flink/pull/20330#issuecomment-1192596952

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on pull request #20332: [FLINK-28624][csv] Accept mapper/schema factories

2022-07-22 Thread GitBox


zentol commented on PR #20332:
URL: https://github.com/apache/flink/pull/20332#issuecomment-1192596936

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-docker] nferrario commented on a diff in pull request #117: [FLINK-28057] Fix LD_PRELOAD on ARM images

2022-07-22 Thread GitBox


nferrario commented on code in PR #117:
URL: https://github.com/apache/flink-docker/pull/117#discussion_r927645868


##
1.15/scala_2.12-java11-debian/docker-entrypoint.sh:
##
@@ -91,7 +91,12 @@ prepare_configuration() {
 
 maybe_enable_jemalloc() {
 if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
-export LD_PRELOAD=$LD_PRELOAD:/usr/lib/x86_64-linux-gnu/libjemalloc.so
+# Maybe use export LD_PRELOAD=$LD_PRELOAD:/usr/lib/$(uname 
-i)-linux-gnu/libjemalloc.so
+if [[ `uname -i` == 'aarch64' ]]; then
+export 
LD_PRELOAD=$LD_PRELOAD:/usr/lib/aarch64-linux-gnu/libjemalloc.so

Review Comment:
   I agree, but the test will never work unless we run a Dockerized test. We 
cannot fake a system library in a way that works for multiple OS (Linux, macOS)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-docker] Myasuka commented on a diff in pull request #117: [FLINK-28057] Fix LD_PRELOAD on ARM images

2022-07-22 Thread GitBox


Myasuka commented on code in PR #117:
URL: https://github.com/apache/flink-docker/pull/117#discussion_r927644004


##
1.15/scala_2.12-java11-debian/docker-entrypoint.sh:
##
@@ -91,7 +91,12 @@ prepare_configuration() {
 
 maybe_enable_jemalloc() {
 if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
-export LD_PRELOAD=$LD_PRELOAD:/usr/lib/x86_64-linux-gnu/libjemalloc.so
+# Maybe use export LD_PRELOAD=$LD_PRELOAD:/usr/lib/$(uname 
-i)-linux-gnu/libjemalloc.so
+if [[ `uname -i` == 'aarch64' ]]; then
+export 
LD_PRELOAD=$LD_PRELOAD:/usr/lib/aarch64-linux-gnu/libjemalloc.so

Review Comment:
   I don't think it's a good idea to just disable the test directly, we can 
check the environment to decide whether to run test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on pull request #20346: [FLINK-28647] Remove separate error handling and adjust documentation for CLAIM mode + RocksDB native savepoint

2022-07-22 Thread GitBox


rkhachatryan commented on PR #20346:
URL: https://github.com/apache/flink/pull/20346#issuecomment-1192564645

   Thanks for the review @fredia 
   @dawidwys would you like to take a look as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28390) Allows RocksDB to configure FIFO Compaction to reduce CPU overhead.

2022-07-22 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17570007#comment-17570007
 ] 

Yun Tang commented on FLINK-28390:
--

[~Ming Li] I don't think we should add such configurations in Flink to avoid 
user loss data unexpectedly, you can still set this option via 
RocksDBOptionsFactory.

> Allows RocksDB to configure FIFO Compaction to reduce CPU overhead.
> ---
>
> Key: FLINK-28390
> URL: https://issues.apache.org/jira/browse/FLINK-28390
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: ming li
>Priority: Major
>
> We know that the fifo compaction strategy may silently delete data and may 
> lose data for the business. But in some scenarios, FIFO compaction can be a 
> very effective way to reduce CPU usage.
>  
> Flink's Taskmanager is usually some small-scale processes, such as allocating 
> 4 CPUs and 16G memory. When the state size is small, the CPU overhead 
> occupied by RocksDB is not high, and as the state increases, RocksDB may 
> frequently be in the compaction operation, which will occupy a large amount 
> of CPU and affect the computing operation.
>  
> We usually configure a TTL for the state, so when using FIFO we can configure 
> it to be slightly longer than the TTL, so that the upper layer is the same as 
> before. 
>  
> Although the FIFO Compaction strategy may bring space amplification, the disk 
> is cheaper than the CPU after all, so the overall cost is reduced.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] fredia commented on a diff in pull request #20103: [FLINK-28178][runtime-web] Show the delegated StateBackend and whethe…

2022-07-22 Thread GitBox


fredia commented on code in PR #20103:
URL: https://github.com/apache/flink/pull/20103#discussion_r927635877


##
flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/job-checkpoints.component.html:
##
@@ -551,15 +551,15 @@
 
   
   
-Changelog state-backend
-Enabled
-Disabled
+State Changelog
+Enabled

Review Comment:
   Do we need to add this item into `CheckpointConfig` in `job-checkpoint.ts`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] amazhar1 commented on pull request #20327: [FLINK-28595][Connectors/kafka] KafkaSource should not read metadata of unmatched regex topics

2022-07-22 Thread GitBox


amazhar1 commented on PR #20327:
URL: https://github.com/apache/flink/pull/20327#issuecomment-1192553353

   I like it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28637) High vulnerability in flink-kubernetes-operator-1.1.0-shaded.jar

2022-07-22 Thread Jeesmon Jacob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17570003#comment-17570003
 ] 

Jeesmon Jacob commented on FLINK-28637:
---

There is a new PR against OSDK to bump up okhttp version: 
https://github.com/fabric8io/kubernetes-client/issues/4290#issuecomment-1192511844

> High vulnerability in flink-kubernetes-operator-1.1.0-shaded.jar
> 
>
> Key: FLINK-28637
> URL: https://issues.apache.org/jira/browse/FLINK-28637
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.1.0
>Reporter: James Busche
>Priority: Major
>
> I noticed a high vulnerability in the 
> flink-kubernetes-operator-1.1.0-shaded.jar file.
> ===
> cvss: 7.5
> riskFactors: Has fix,High severity
> cve: PRISMA-2022-0239    
> link: https://github.com/square/okhttp/issues/6738
> status: fixed in 4.9.2
> packagePath: 
> /flink-kubernetes-operator/flink-kubernetes-operator-1.1.0-shaded.jar
> description: com.squareup.okhttp3_okhttp packages prior to version 4.9.2 are 
> vulnerable for sensitive information disclosure. An illegal character in a 
> header value will cause IllegalArgumentException which will include full 
> header value. This applies to Authorization, Cookie, Proxy-Authorization and 
> Set-Cookie headers. 
> ===
> It looks like we're using version 3.12.12, and there's no plans to provide 
> this fix for the 3.x version.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28640) Let BlocklistDeclarativeSlotPool accept duplicate slot offers

2022-07-22 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-28640.
---
Resolution: Done

Done via f6a22eaf99d4ba2ef03445bae54a0da7c39c4d1a

> Let BlocklistDeclarativeSlotPool accept duplicate slot offers
> -
>
> Key: FLINK-28640
> URL: https://issues.apache.org/jira/browse/FLINK-28640
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Lijie Wang
>Assignee: Lijie Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> BlocklistSlotPool should accept a duplicate (already accepted) slot offer, 
> even if it is from a currently blocked task manager



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zhuzhurk closed pull request #20341: [FLINK-28640][runtime] Let BlocklistDeclarativeSlotPool accept duplicate slot offers

2022-07-22 Thread GitBox


zhuzhurk closed pull request #20341: [FLINK-28640][runtime] Let 
BlocklistDeclarativeSlotPool accept duplicate slot offers
URL: https://github.com/apache/flink/pull/20341


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zhuzhurk commented on pull request #20341: [FLINK-28640][runtime] Let BlocklistDeclarativeSlotPool accept duplicate slot offers

2022-07-22 Thread GitBox


zhuzhurk commented on PR #20341:
URL: https://github.com/apache/flink/pull/20341#issuecomment-1192544056

   Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fredia commented on pull request #20346: [FLINK-28647] Remove separate error handling and adjust documentation for CLAIM mode + RocksDB native savepoint

2022-07-22 Thread GitBox


fredia commented on PR #20346:
URL: https://github.com/apache/flink/pull/20346#issuecomment-1192540738

   Thanks for the PR, LGTM(non-blinding).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] gyfora commented on pull request #560: Kubernetes Operator Release 1.1.0

2022-07-22 Thread GitBox


gyfora commented on PR #560:
URL: https://github.com/apache/flink-web/pull/560#issuecomment-1192537374

   Thanks @mbalassi updated the post according to your suggestions :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] czy006 commented on pull request #308: [FLINK-28223] Add artifact-fetcher to the pod-template.yaml example

2022-07-22 Thread GitBox


czy006 commented on PR #308:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/308#issuecomment-1192537029

   @mbalassi sorry,i miss gamil. now on going do it,This will be completed in 
two days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #20346: [FLINK-28647] Remove separate error handling and adjust documentation for CLAIM mode + RocksDB native savepoint

2022-07-22 Thread GitBox


flinkbot commented on PR #20346:
URL: https://github.com/apache/flink/pull/20346#issuecomment-1192530151

   
   ## CI report:
   
   * 1f802932319b3d68cfe4b02593ef714603acf1f5 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on a diff in pull request #19727: [FLINK-27618][sql] Flink supports CUME_DIST function

2022-07-22 Thread GitBox


luoyuxia commented on code in PR #19727:
URL: https://github.com/apache/flink/pull/19727#discussion_r927605133


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/AggsHandleFunction.java:
##
@@ -36,4 +36,10 @@ public interface AggsHandleFunction extends 
AggsHandleFunctionBase {
  * @return the final result (saved in a row) of the current accumulators.
  */
 RowData getValue() throws Exception;
+
+/**
+ * Set window size for the aggregate function. Some aggregate functions 
may requires the size of
+ * current window to do calculation.
+ */

Review Comment:
   done.



##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala:
##
@@ -320,6 +326,7 @@ class AggsHandlerCodeGenerator(
 initialAggregateInformation(aggInfoList)
 
 // generates all methods body first to add necessary reuse code to context
+val setWindowSizeCode = if (isWindowSizeNeeded) genSetWindowSize() else ""

Review Comment:
   Based on the current implementation, we will always call method 
`setWindowSize`, so we can't generate the code to  throw exception . But I add 
some comments for this method  `genSetWindowSize`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan closed pull request #20300: [WIP][FLINK-28597][state] Discard non-initial checkpoints without a delay

2022-07-22 Thread GitBox


rkhachatryan closed pull request #20300: [WIP][FLINK-28597][state] Discard 
non-initial checkpoints without a delay
URL: https://github.com/apache/flink/pull/20300


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on pull request #20300: [WIP][FLINK-28597][state] Discard non-initial checkpoints without a delay

2022-07-22 Thread GitBox


rkhachatryan commented on PR #20300:
URL: https://github.com/apache/flink/pull/20300#issuecomment-119259

   Superceded by #20313


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #20345: [FLINK-28610][runtime] Enable speculative execution for sources

2022-07-22 Thread GitBox


flinkbot commented on PR #20345:
URL: https://github.com/apache/flink/pull/20345#issuecomment-1192520232

   
   ## CI report:
   
   * 155bb0b49452195dd01bad4772cf2e5692eed958 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28647) Remove separate error handling and adjust documentation for CLAIM mode + RocksDB native savepoint

2022-07-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-28647:
---
Labels: pull-request-available  (was: )

> Remove separate error handling and adjust documentation for CLAIM mode + 
> RocksDB native savepoint
> -
>
> Key: FLINK-28647
> URL: https://issues.apache.org/jira/browse/FLINK-28647
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Runtime / Checkpointing
>Affects Versions: 1.16.0
>Reporter: Roman Khachatryan
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> After FLINK-25872, checkpoint folder deletion is not performed as long as 
> there is some state from that checkpoint used by other checkpoints.
> Therefore, the following changes could be reverted/adjusted:
> * FLINK-25745 e8bcbfd5a48fd8d3ca48ef7803867569214e0dbc Do not log exception
> * FLINK-25745 c1f5c5320150402fc0cb4fbf3a31f9a27b1e4d9a Document incremental 
> savepoints in CLAIM mode limitation
> cc: [~Yanfei Lei], [~dwysakowicz]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] rkhachatryan opened a new pull request, #20346: [FLINK-28647] Remove separate error handling and adjust documentation for CLAIM mode + RocksDB native savepoint

2022-07-22 Thread GitBox


rkhachatryan opened a new pull request, #20346:
URL: https://github.com/apache/flink/pull/20346

   ## What is the purpose of the change
   
   Please see ticket description (FLINK-28647).
   
   ## Brief changelog
   
   - update the docs (checkpoint folder **is** deleted but with a delay)
   - remove additional error handling (just fail in case of folder deletion 
failure)
   
   ## Verifying this change
   
   This change is a trivial rework without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: yes
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28610) Enable speculative execution of sources

2022-07-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-28610:
---
Labels: pull-request-available  (was: )

> Enable speculative execution of sources
> ---
>
> Key: FLINK-28610
> URL: https://issues.apache.org/jira/browse/FLINK-28610
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> Currently speculative execution of sources is disabled. It can be enabled 
> with the improvement done to support InputFormat sources and new sources to 
> work correctly with speculative execution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zhuzhurk opened a new pull request, #20345: [FLINK-28610][runtime] Enable speculative execution for sources

2022-07-22 Thread GitBox


zhuzhurk opened a new pull request, #20345:
URL: https://github.com/apache/flink/pull/20345

   ## What is the purpose of the change
   
   This PR enables speculative execution for sources and added IT cases for 
source speculation.
   
   ## Verifying this change
   
 - *Added integration tests in SpeculativeSchedulerITCase*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28647) Remove separate error handling and adjust documentation for CLAIM mode + RocksDB native savepoint

2022-07-22 Thread Roman Khachatryan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-28647:
--
Description: 
After FLINK-25872, checkpoint folder deletion is not performed as long as there 
is some state from that checkpoint used by other checkpoints.
Therefore, the following changes could be reverted/adjusted:
* FLINK-25745 e8bcbfd5a48fd8d3ca48ef7803867569214e0dbc Do not log exception
* FLINK-25745 c1f5c5320150402fc0cb4fbf3a31f9a27b1e4d9a Document incremental 
savepoints in CLAIM mode limitation

cc: [~Yanfei Lei], [~dwysakowicz]

  was:
After FLINK-25872, checkpoint folder deletion is not performed as long as there 
is some state from that checkpoint used by other checkpoints.
Therefore, the following changes could be reverted/adjusted:
* FLINK-25745 e8bcbfd5a48fd8d3ca48ef7803867569214e0dbc Do not log exception
* FLINK-25745 c1f5c5320150402fc0cb4fbf3a31f9a27b1e4d9a Document incremental 
savepoints in CLAIM mode limitation

cc: [~Yanfei Lei]


> Remove separate error handling and adjust documentation for CLAIM mode + 
> RocksDB native savepoint
> -
>
> Key: FLINK-28647
> URL: https://issues.apache.org/jira/browse/FLINK-28647
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Runtime / Checkpointing
>Affects Versions: 1.16.0
>Reporter: Roman Khachatryan
>Assignee: Roman Khachatryan
>Priority: Major
> Fix For: 1.16.0
>
>
> After FLINK-25872, checkpoint folder deletion is not performed as long as 
> there is some state from that checkpoint used by other checkpoints.
> Therefore, the following changes could be reverted/adjusted:
> * FLINK-25745 e8bcbfd5a48fd8d3ca48ef7803867569214e0dbc Do not log exception
> * FLINK-25745 c1f5c5320150402fc0cb4fbf3a31f9a27b1e4d9a Document incremental 
> savepoints in CLAIM mode limitation
> cc: [~Yanfei Lei], [~dwysakowicz]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28647) Remove separate error handling and adjust documentation for CLAIM mode + RocksDB native savepoint

2022-07-22 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-28647:
-

 Summary: Remove separate error handling and adjust documentation 
for CLAIM mode + RocksDB native savepoint
 Key: FLINK-28647
 URL: https://issues.apache.org/jira/browse/FLINK-28647
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Runtime / Checkpointing
Affects Versions: 1.16.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.16.0


After FLINK-25872, checkpoint folder deletion is not performed as long as there 
is some state from that checkpoint used by other checkpoints.
Therefore, the following changes could be reverted/adjusted:
* FLINK-25745 e8bcbfd5a48fd8d3ca48ef7803867569214e0dbc Do not log exception
* FLINK-25745 c1f5c5320150402fc0cb4fbf3a31f9a27b1e4d9a Document incremental 
savepoints in CLAIM mode limitation

cc: [~Yanfei Lei]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28610) Enable speculative execution of sources

2022-07-22 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-28610:
---

Assignee: Zhu Zhu

> Enable speculative execution of sources
> ---
>
> Key: FLINK-28610
> URL: https://issues.apache.org/jira/browse/FLINK-28610
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
> Fix For: 1.16.0
>
>
> Currently speculative execution of sources is disabled. It can be enabled 
> with the improvement done to support InputFormat sources and new sources to 
> work correctly with speculative execution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28146) Sync blocklist information between JobMaster & ResourceManager

2022-07-22 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-28146.
---
Fix Version/s: 1.16.0
   Resolution: Done

Done via 7b05a1b4c9a4ae664fb6b7c4bb85fb3ea6281505

> Sync blocklist information between JobMaster & ResourceManager
> --
>
> Key: FLINK-28146
> URL: https://issues.apache.org/jira/browse/FLINK-28146
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.16.0
>Reporter: Lijie Wang
>Assignee: Lijie Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> The newly added/updated blocked nodes should be synchronized between JM and 
> RM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28640) Let BlocklistDeclarativeSlotPool accept duplicate slot offers

2022-07-22 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-28640:
---

Assignee: Lijie Wang

> Let BlocklistDeclarativeSlotPool accept duplicate slot offers
> -
>
> Key: FLINK-28640
> URL: https://issues.apache.org/jira/browse/FLINK-28640
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Lijie Wang
>Assignee: Lijie Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> BlocklistSlotPool should accept a duplicate (already accepted) slot offer, 
> even if it is from a currently blocked task manager



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zhuzhurk closed pull request #20264: [FLINK-28146][runtime] Sync blocklist information between JobMaster & ResourceManager

2022-07-22 Thread GitBox


zhuzhurk closed pull request #20264: [FLINK-28146][runtime] Sync blocklist 
information between JobMaster & ResourceManager
URL: https://github.com/apache/flink/pull/20264


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zhuzhurk commented on pull request #20264: [FLINK-28146][runtime] Sync blocklist information between JobMaster & ResourceManager

2022-07-22 Thread GitBox


zhuzhurk commented on PR #20264:
URL: https://github.com/apache/flink/pull/20264#issuecomment-1192507409

   Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-28586) Speculative execution for new sources

2022-07-22 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-28586.
---
Resolution: Done

Done via
79d93f2512f6826baefb14c8dc9b59d419d7df0a
9af271f3108ce8af6b6972fabf5420b99e55fc71
bedcc3f7b5c0fc184953d3c1a969f03887db2cae
7129c2ee09ce7eb3959ce88383b5d8ea0987fcf5
863222e926df26fde4caa470c58b261174181719

> Speculative execution for new sources
> -
>
> Key: FLINK-28586
> URL: https://issues.apache.org/jira/browse/FLINK-28586
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common, Runtime / Coordination
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> This task enables new sources(FLIP-27) for speculative execution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zhuzhurk closed pull request #20299: [FLINK-28586] Enable speculative execution for new sources

2022-07-22 Thread GitBox


zhuzhurk closed pull request #20299: [FLINK-28586] Enable speculative execution 
for new sources
URL: https://github.com/apache/flink/pull/20299


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zhuzhurk commented on pull request #20299: [FLINK-28586] Enable speculative execution for new sources

2022-07-22 Thread GitBox


zhuzhurk commented on PR #20299:
URL: https://github.com/apache/flink/pull/20299#issuecomment-1192504215

   Also verified with TPC-DS benchmarks on real cluster. Speculative execution 
is working and there is no suspicious error.
   Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liujiawinds commented on a diff in pull request #19844: [FLINK-27805][Connectors/ORC] bump orc version to 1.7.5

2022-07-22 Thread GitBox


liujiawinds commented on code in PR #19844:
URL: https://github.com/apache/flink/pull/19844#discussion_r927579106


##
flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterTest.java:
##
@@ -75,8 +78,105 @@ public void testOrcBulkWriter() throws Exception {
 
 testHarness.snapshot(1, ++time);
 testHarness.notifyOfCompletedCheckpoint(1);
+}
+}
 
-OrcBulkWriterTestUtil.validate(outDir, input);
+@Test
+public void testOrcBulkWriter() throws Exception {
+final File outDir = TEMPORARY_FOLDER.newFolder();
+final Properties writerProps = new Properties();
+writerProps.setProperty("orc.compress", CompressionKind.LZ4.name());
+
+OrcBulkWriterFactory writer =
+new OrcBulkWriterFactory<>(
+new RecordVectorizer(schema), writerProps, new 
Configuration());
+
+writeRecordsIntoOrcFile(outDir, writer);
+
+// validate records and compression kind
+OrcBulkWriterTestUtil.validate(outDir, input, CompressionKind.LZ4);
+}
+
+@Test
+public void testZstdCompression() throws Exception {
+final File outDir = TEMPORARY_FOLDER.newFolder();
+final Properties writerProps = new Properties();
+writerProps.setProperty("orc.compress", CompressionKind.ZSTD.name());
+
+OrcBulkWriterFactory writer =
+new OrcBulkWriterFactory<>(
+new RecordVectorizer(schema), writerProps, new 
Configuration());
+
+writeRecordsIntoOrcFile(outDir, writer);
+
+// validate records and compression kind
+OrcBulkWriterTestUtil.validate(outDir, input, CompressionKind.ZSTD);
+}
+
+@Test
+public void testOrcColumnEncryption() throws Exception {
+final File outDir = TEMPORARY_FOLDER.newFolder();
+
+// Simple configuration for column encryption.
+Configuration conf = new Configuration();
+conf.set("orc.encrypt", "pii:_col0");
+conf.set("orc.mask", "sha256:_col0");
+OrcFile.WriterOptions writerOptions =
+OrcFile.writerOptions(conf)
+.encrypt(OrcConf.ENCRYPTION.getString(conf))
+.masks(OrcConf.DATA_MASK.getString(conf))
+.setKeyProvider(new DummyInMemoryKeystore());
+
+OrcBulkWriterFactory writer =
+new OrcBulkWriterFactory<>(new RecordVectorizer(schema), 
writerOptions);
+
+writeRecordsIntoOrcFile(outDir, writer);
+
+// Validate bucket count and file count.

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liujiawinds commented on a diff in pull request #19844: [FLINK-27805][Connectors/ORC] bump orc version to 1.7.5

2022-07-22 Thread GitBox


liujiawinds commented on code in PR #19844:
URL: https://github.com/apache/flink/pull/19844#discussion_r927578849


##
flink-formats/flink-sql-orc/src/main/resources/META-INF/NOTICE:
##
@@ -6,13 +6,14 @@ The Apache Software Foundation (http://www.apache.org/).
 
 This project bundles the following dependencies under the Apache Software 
License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
-- org.apache.orc:orc-core:1.5.6
-- org.apache.orc:orc-shims:1.5.6
-- org.apache.hive:hive-storage-api:2.6.0
-- io.airlift:aircompressor:0.10
-- commons-lang:commons-lang:2.6
+- org.apache.orc:orc-core:1.7.5
+- org.apache.orc:orc-shims:1.7.5
+- org.apache.hive:hive-storage-api:2.8.1
+- io.airlift:aircompressor:0.21
+- org.apache.commons:commons-lang3:3.3.2
 
 This project bundles the following dependencies under the BSD license.
 See bundled license files for details.
 
 - com.google.protobuf:protobuf-java:2.5.0
+- org.threeten:threeten-extra:1.5.0

Review Comment:
   Removed the redundant dependencies.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liujiawinds commented on a diff in pull request #19844: [FLINK-27805][Connectors/ORC] bump orc version to 1.7.5

2022-07-22 Thread GitBox


liujiawinds commented on code in PR #19844:
URL: https://github.com/apache/flink/pull/19844#discussion_r927578181


##
flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterTest.java:
##
@@ -75,8 +78,105 @@ public void testOrcBulkWriter() throws Exception {
 
 testHarness.snapshot(1, ++time);
 testHarness.notifyOfCompletedCheckpoint(1);
+}
+}
 
-OrcBulkWriterTestUtil.validate(outDir, input);
+@Test
+public void testOrcBulkWriter() throws Exception {
+final File outDir = TEMPORARY_FOLDER.newFolder();
+final Properties writerProps = new Properties();
+writerProps.setProperty("orc.compress", CompressionKind.LZ4.name());
+
+OrcBulkWriterFactory writer =
+new OrcBulkWriterFactory<>(
+new RecordVectorizer(schema), writerProps, new 
Configuration());
+
+writeRecordsIntoOrcFile(outDir, writer);
+
+// validate records and compression kind
+OrcBulkWriterTestUtil.validate(outDir, input, CompressionKind.LZ4);
+}
+
+@Test
+public void testZstdCompression() throws Exception {
+final File outDir = TEMPORARY_FOLDER.newFolder();
+final Properties writerProps = new Properties();
+writerProps.setProperty("orc.compress", CompressionKind.ZSTD.name());
+
+OrcBulkWriterFactory writer =
+new OrcBulkWriterFactory<>(
+new RecordVectorizer(schema), writerProps, new 
Configuration());
+
+writeRecordsIntoOrcFile(outDir, writer);
+
+// validate records and compression kind
+OrcBulkWriterTestUtil.validate(outDir, input, CompressionKind.ZSTD);
+}
+
+@Test
+public void testOrcColumnEncryption() throws Exception {
+final File outDir = TEMPORARY_FOLDER.newFolder();
+
+// Simple configuration for column encryption.
+Configuration conf = new Configuration();
+conf.set("orc.encrypt", "pii:_col0");
+conf.set("orc.mask", "sha256:_col0");
+OrcFile.WriterOptions writerOptions =
+OrcFile.writerOptions(conf)
+.encrypt(OrcConf.ENCRYPTION.getString(conf))
+.masks(OrcConf.DATA_MASK.getString(conf))
+.setKeyProvider(new DummyInMemoryKeystore());
+
+OrcBulkWriterFactory writer =
+new OrcBulkWriterFactory<>(new RecordVectorizer(schema), 
writerOptions);
+
+writeRecordsIntoOrcFile(outDir, writer);
+
+// Validate bucket count and file count.
+final File[] buckets = outDir.listFiles();
+assertThat(buckets).isNotNull();
+assertThat(buckets.length).isEqualTo(1);
+final File[] partFiles = buckets[0].listFiles();
+assertThat(partFiles).isNotNull();
+
+// Validate data in orc and file schema attribute value.

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liujiawinds commented on a diff in pull request #19844: [FLINK-27805][Connectors/ORC] bump orc version to 1.7.5

2022-07-22 Thread GitBox


liujiawinds commented on code in PR #19844:
URL: https://github.com/apache/flink/pull/19844#discussion_r927577889


##
flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterTest.java:
##
@@ -75,8 +78,105 @@ public void testOrcBulkWriter() throws Exception {
 
 testHarness.snapshot(1, ++time);
 testHarness.notifyOfCompletedCheckpoint(1);
+}
+}
 
-OrcBulkWriterTestUtil.validate(outDir, input);
+@Test
+public void testOrcBulkWriter() throws Exception {
+final File outDir = TEMPORARY_FOLDER.newFolder();
+final Properties writerProps = new Properties();
+writerProps.setProperty("orc.compress", CompressionKind.LZ4.name());
+
+OrcBulkWriterFactory writer =
+new OrcBulkWriterFactory<>(
+new RecordVectorizer(schema), writerProps, new 
Configuration());
+
+writeRecordsIntoOrcFile(outDir, writer);
+
+// validate records and compression kind
+OrcBulkWriterTestUtil.validate(outDir, input, CompressionKind.LZ4);
+}
+
+@Test
+public void testZstdCompression() throws Exception {
+final File outDir = TEMPORARY_FOLDER.newFolder();
+final Properties writerProps = new Properties();
+writerProps.setProperty("orc.compress", CompressionKind.ZSTD.name());
+
+OrcBulkWriterFactory writer =
+new OrcBulkWriterFactory<>(
+new RecordVectorizer(schema), writerProps, new 
Configuration());
+
+writeRecordsIntoOrcFile(outDir, writer);
+
+// validate records and compression kind
+OrcBulkWriterTestUtil.validate(outDir, input, CompressionKind.ZSTD);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liujiawinds commented on a diff in pull request #19844: [FLINK-27805][Connectors/ORC] bump orc version to 1.7.5

2022-07-22 Thread GitBox


liujiawinds commented on code in PR #19844:
URL: https://github.com/apache/flink/pull/19844#discussion_r927577711


##
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/EncryptionProvider.java:
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.CryptoUtils;
+import org.apache.orc.impl.HadoopShims;
+import org.apache.orc.impl.KeyProvider;
+import org.apache.orc.impl.writer.WriterEncryptionKey;
+import org.apache.orc.impl.writer.WriterEncryptionVariant;
+
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * Copy encryption variants generation code from org.apache.orc:orc-core:1.7.2 
{@link
+ * org.apache.orc.impl.WriterImpl}. It's used to get encryption variants which 
are same as {@link
+ * org.apache.orc.impl.WriterImpl} generated.
+ *
+ * NOTE: This class will be removed after ORC-1200 is merged.
+ */
+public class EncryptionProvider {
+
+private final SortedMap keys = new 
TreeMap<>();
+
+private final OrcFile.WriterOptions opts;
+
+public EncryptionProvider(OrcFile.WriterOptions opts) {
+this.opts = opts;
+}
+
+public WriterEncryptionVariant[] getEncryptionVariants() throws 
IOException {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lsyldliu commented on a diff in pull request #20252: [FLINK-28463][flink-sql-parser] Flink dialect supports CREATE TABLE AS SELECT(CTAS) syntax

2022-07-22 Thread GitBox


lsyldliu commented on code in PR #20252:
URL: https://github.com/apache/flink/pull/20252#discussion_r927563895


##
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl:
##
@@ -1151,9 +1152,13 @@ SqlCreate SqlCreateTable(Span s, boolean replace, 
boolean isTemporary) :
 [
 
 tableLike = SqlTableLike(getPos())
+|

Review Comment:
   ```suggestion
   {
   return new SqlCreateTableLike(startPos.plus(getPos()),
   tableName,
   columnList,
   constraints,
   propertyList,
   partitionColumns,
   watermark,
   comment,
   tableLike,
   isTemporary,
   ifNotExists);
   }
   |
   
   asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
   {
   return new SqlCreateTableAs(startPos.plus(getPos()),
   tableName,
   columnList,
   constraints,
   propertyList,
   partitionColumns,
   watermark,
   comment,
   asQuery,
   isTemporary,
   ifNotExists);
   }
   ]
   {
   return new SqlCreateTable(startPos.plus(getPos()),
   tableName,
   columnList,
   constraints,
   propertyList,
   partitionColumns,
   watermark,
   comment,
   isTemporary,
   ifNotExists);
   }
   ```



##
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * {@link SqlNode} to describe the CREATE TABLE AS syntax. The CTAS syntax is 
different from CREATE
+ * TABLE syntax, which also create the pipeline to sync the data from the 
source to the derived
+ * table.
+ *
+ * Example: A DDL like the one below for creating a `derived_table`
+ *
+ * {@code
+ * CREATE TABLE base_table (
+ * id BIGINT,
+ * name STRING,
+ * tstmp TIMESTAMP,
+ * PRIMARY KEY(id)
+ * ) WITH (
+ * ‘connector’ = ‘kafka’,
+ * ‘connector.starting-offset’: ‘12345’,
+ * ‘format’ =  ‘json’
+ * )
+ *
+ * CREATE TABLE derived_table
+ * WITH (
+ *   'connector' = 'jdbc',
+ *   'url' = 'http://localhost:1',
+ *   'table-name' = 'syncedTable'
+ * )
+ * AS SELECT * FROM base_table_1;
+ * }
+ */
+public class SqlCreateTableAs extends SqlCreateTable {
+
+public static final SqlSpecialOperator OPERATOR =
+new SqlSpecialOperator("CREATE TABLE AS", SqlKind.CREATE_TABLE);
+
+private final SqlNode asQuery;
+
+public SqlCreateTableAs(
+SqlParserPos pos,
+SqlIdentifier tableName,
+SqlNodeList columnList,
+List tableConstraints,
+SqlNodeList propertyList,
+SqlNodeList partitionKeyList,
+@Nullable SqlWatermark watermark,
+@Nullable SqlCharStringLiteral comment,
+SqlNode asQuery,
+boolean isTemporary,
+boolean ifNotExists) {
+super(
+OPERATOR,
+pos,
+tableName,
+columnList,
+tableConstraints,
+propertyList,
+partitionKeyList,
+watermark,
+c

[GitHub] [flink] liujiawinds commented on a diff in pull request #19844: [FLINK-27805][Connectors/ORC] bump orc version to 1.7.5

2022-07-22 Thread GitBox


liujiawinds commented on code in PR #19844:
URL: https://github.com/apache/flink/pull/19844#discussion_r927577474


##
flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterTest.java:
##
@@ -46,16 +57,8 @@ public class OrcBulkWriterTest {
 private final List input =
 Arrays.asList(new Record("Shiv", 44), new Record("Jesse", 23), new 
Record("Walt", 50));
 
-@Test
-public void testOrcBulkWriter() throws Exception {
-final File outDir = TEMPORARY_FOLDER.newFolder();
-final Properties writerProps = new Properties();
-writerProps.setProperty("orc.compress", "LZ4");
-
-final OrcBulkWriterFactory writer =
-new OrcBulkWriterFactory<>(
-new RecordVectorizer(schema), writerProps, new 
Configuration());
-
+private void writeRecordsIntoOrcFile(File outDir, 
OrcBulkWriterFactory writer)

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on pull request #20211: [FLINK-28451][hive] Use UserCodeClassloader instead of the current thread's classloader to load function

2022-07-22 Thread GitBox


luoyuxia commented on PR #20211:
URL: https://github.com/apache/flink/pull/20211#issuecomment-1192481882

   > LGTM. I have appended a commit to fix a minor problem. If you don't have 
concerns, I will merge it when CI passed.
   
   Thanks. That's fine. I rebased master and fix the compile error just now 
with this 
[commit](https://github.com/apache/flink/pull/20211/commits/f923801f1d0df630cc2fff56ad4bf9e20c45629c)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] mbalassi commented on a diff in pull request #560: Kubernetes Operator Release 1.1.0

2022-07-22 Thread GitBox


mbalassi commented on code in PR #560:
URL: https://github.com/apache/flink-web/pull/560#discussion_r927546290


##
_posts/2022-07-25-release-kubernetes-operator-1.1.0.md:
##
@@ -0,0 +1,114 @@
+---
+layout: post
+title:  "Apache Flink Kubernetes Operator 1.1.0 Release Announcement"
+subtitle: "Lifecycle management for Apache Flink deployments using native 
Kubernetes tooling"
+date: 2022-07-25T08:00:00.000Z
+categories: news
+authors:
+- gyfora:
+  name: "Gyula Fora"
+  twitter: "GyulaFora"
+- matyas:
+  name: "Matyas Orhidi"
+
+---
+
+The community has continued to work hard on improving the Flink Kubernetes 
Operator capabilities since our [first production ready 
release](https://flink.apache.org/news/2022/06/05/release-kubernetes-operator-1.0.0.html)
 we launched about two months ago.
+
+With the release of Flink Kubernetes Operator 1.1.0 we are proud to announce a 
number of exciting new features that improves the overall experience of 
managing Flink resources and the operator itself in production environments.

Review Comment:
   nit: exciting new features _improving_ the overall



##
_posts/2022-07-25-release-kubernetes-operator-1.1.0.md:
##
@@ -0,0 +1,114 @@
+---
+layout: post
+title:  "Apache Flink Kubernetes Operator 1.1.0 Release Announcement"
+subtitle: "Lifecycle management for Apache Flink deployments using native 
Kubernetes tooling"
+date: 2022-07-25T08:00:00.000Z
+categories: news
+authors:
+- gyfora:
+  name: "Gyula Fora"
+  twitter: "GyulaFora"
+- matyas:
+  name: "Matyas Orhidi"
+
+---
+
+The community has continued to work hard on improving the Flink Kubernetes 
Operator capabilities since our [first production ready 
release](https://flink.apache.org/news/2022/06/05/release-kubernetes-operator-1.0.0.html)
 we launched about two months ago.
+
+With the release of Flink Kubernetes Operator 1.1.0 we are proud to announce a 
number of exciting new features that improves the overall experience of 
managing Flink resources and the operator itself in production environments.
+
+## Release Highlights
+
+A non-exhaustive list of some of the more exciting features added in the 
release:
+
+ * Kubernetes Events on application and job state changes
+ * New operator metrics
+ * Unified and more robust reconciliation flow
+ * Periodic savepoint triggering
+ * Custom Flink Resource Listeners
+ * Dynamic watched namespaces
+ * New built-in examples for submitting Flink SQL and Python jobs
+ * Experimental autoscaling support
+
+### Kubernetes Events for Application and Job State Changes
+
+The operator now emits native Kubernetes Events on relevant Flink Deployment 
and Job changes. This includes status changes, custom resource specification 
changes, deployment failures, etc.
+
+```
+Events:
+  TypeReason Age   From  Message
+  --     ---
+  Normal  Submit 53m   JobManagerDeployment  Starting deployment
+  Normal  StatusChanged  52m   Job   Job status changed from 
RECONCILING to CREATED
+  Normal  StatusChanged  52m   Job   Job status changed from 
CREATED to RUNNING
+```
+
+### New Operator Metrics
+
+The first version of the operator only came with basic system level metrics to 
monitor the JVM process.
+
+In 1.1.0 we have introduced a wide range of additional metrics related to 
lifecycle-management, Kubernetes API server access and the Java Operator SDK 
framework the operator itself is built on. These metrics allow operator 
administrators to get a comprehensive view of what’s happening in the 
environment.
+
+For details check 
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/metrics-logging/#metrics.
+
+### Unified and more robust reconciliation flow
+
+We have spent a considerable effort refactoring and streamlining the core 
reconciliation flow responsible for executing and tracking resource upgrades, 
savepoints, rollbacks and other operations.
+
+In the process we made a number of important improvements to tolerate operator 
failures and temporary kubernetes API outages more gracefully, which is 
critical in production environments.
+
+### Periodic Savepoints
+
+By popular demand we have introduced periodic savepointing for applications 
and session jobs using a the following simple configuration option:
+
+```
+flinkConfiguration:
+  ...
+  kubernetes.operator.periodic.savepoint.interval: 6h
+```
+
+### Custom Flink Resource Listeners
+
+The operator allows users to listen to events and status updates triggered for 
the Flink Resources managed by the operator.
+
+This feature enables tighter integration with the user's own data platform. By 
implementing the `FlinkResourceListener` interface users can listen to both 
events and status updates per resource type (`FlinkDeployment` / 
`FlinkSessionJob`). The interface methods will be called after the respective 
events have been triggered by the system.

[GitHub] [flink] leozhangsr commented on pull request #20234: [FLINK-28475] [Connector/kafka] Stopping offset can be 0

2022-07-22 Thread GitBox


leozhangsr commented on PR #20234:
URL: https://github.com/apache/flink/pull/20234#issuecomment-1192461458

   @PatrickRen waiting a review from you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Tartarus0zm commented on pull request #20252: [FLINK-28463][flink-sql-parser] Flink dialect supports CREATE TABLE AS SELECT(CTAS) syntax

2022-07-22 Thread GitBox


Tartarus0zm commented on PR #20252:
URL: https://github.com/apache/flink/pull/20252#issuecomment-1192460451

   @beyond1920  @lsyldliu  Thank you for your advice, I have updated the pr, 
please take a look again!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Tartarus0zm commented on a diff in pull request #20252: [FLINK-28463][flink-sql-parser] Flink dialect supports CREATE TABLE AS SELECT(CTAS) syntax

2022-07-22 Thread GitBox


Tartarus0zm commented on code in PR #20252:
URL: https://github.com/apache/flink/pull/20252#discussion_r927546446


##
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java:
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCreate;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** CREATE TABLE AS SELECT DDL sql call. */
+public class SqlCreateTableAs extends SqlCreate implements ExtendedSqlNode {
+
+public static final SqlSpecialOperator OPERATOR =
+new SqlSpecialOperator("CREATE TABLE AS SELECT", 
SqlKind.OTHER_DDL);
+
+private final SqlCreateTable sqlCreateTable;
+private final SqlNode asQuery;
+
+public SqlCreateTableAs(SqlCreateTable sqlCreateTable, SqlNode asQuery) {
+super(
+OPERATOR,
+sqlCreateTable.getParserPosition(),
+sqlCreateTable.getReplace(),
+sqlCreateTable.isIfNotExists());
+this.sqlCreateTable = sqlCreateTable;
+this.asQuery = asQuery;
+}
+
+@Override
+public @Nonnull SqlOperator getOperator() {
+return OPERATOR;
+}
+
+@Override
+public @Nonnull List getOperandList() {

Review Comment:
   introduce `SqlCreateTableLike` and `SqlCreateTableAs` as subclass,
   so referring to the recommendations of @beyond1920 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wuchong commented on a diff in pull request #20211: [FLINK-28451][hive] Use UserCodeClassloader instead of the current thread's classloader to load function

2022-07-22 Thread GitBox


wuchong commented on code in PR #20211:
URL: https://github.com/apache/flink/pull/20211#discussion_r927497910


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java:
##
@@ -127,13 +125,26 @@ public Class getUDFClass() throws 
ClassNotFoundException {
  * @return the UDF deserialized
  */
 private UDFType deserializeUDF() {
-try {
-return (UDFType)
-SerializationUtilities.deserializeObject(
-udfSerializedString, (Class) 
getUDFClass());
-} catch (ClassNotFoundException e) {
-throw new FlinkHiveUDFException(
-String.format("Failed to deserialize function %s.", 
className), e);
-}
+return (UDFType)
+deserializeObjectFromKryo(udfSerializedBytes, 
(Class) getUDFClass());
+}
+
+private static byte[] serializeObjectToKryo(Serializable object) {
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+Output output = new Output(baos);
+Kryo kryo = new Kryo();

Review Comment:
   Kryo construction is heavy, we can borrow Kryo from the Hive util. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-28515) The files in local recovery directory hasn't be clean up properly after checkpoint abort

2022-07-22 Thread Jinzhong Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569936#comment-17569936
 ] 

Jinzhong Li edited comment on FLINK-28515 at 7/22/22 10:38 AM:
---

Thanks for reply [~roman] 

I'd like to explain the reason for this issue in more detail. Please correct me 
if anything is wrong.

>> So if it's already running, cleanup() can be skipped by the current thread.
closeSnapshotIO() only closes the registry, and I don't see that the folder is 
registered with it. 

This is right, for this case, closeSnapshotIO() will close all the 
CheckpointStreams which belong to the folder, but it will not delete the folder.

 

1. But if cleanup() method is skipped here, it will still be invoke in 
[AsyncSnapshotCallable.call() 
finally|https://github.com/apache/flink/blob/81379a56495283020a5919e8115936c163b251ba/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L87]
 . 

2. AsyncSnapshotCallable.cleanup() can only delete the ONGOING folder, not 
COMPLETED.(AsyncSnapshotTask.cleanup() -> 
IncrementalRocksDBSnapshotResources.release -> SnapshotDirectory.cleanUp)
{code:java}
 public boolean cleanup() throws IOException {
      if (state.compareAndSet(State.ONGOING, State.DELETED)) {
          FileUtils.deleteDirectory(directory.toFile());
      }
      return true;
 } {code}
3. AsyncSnapshotTask.callInternal() will invoke 
RocksDBIncrementalSnapshotOperation.get(CloseableRegistry 
snapshotCloseableRegistry), in which the folder status will be [[transformed 
from ONGOING to 
COMPLETED|https://github.com/apache/flink/blob/81379a56495283020a5919e8115936c163b251ba/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L422]|#L422].]

If RocksDBIncrementalSnapshotOperation.get(CloseableRegistry) encounters *no* 
exception after rocksdb AsyncSnapshotTask is cancelled,

(1) the localSnapshot folder can't be cleaned-up by 
AsyncSnapshotCallable.cleanup() because the folder status is COMPLETED;

(2) the localSnapshot folder can't be cleaned-up by 
RocksDBIncrementalSnapshotOperation.get(CloseableRegistry)-finally because the 
completed flag is ture.
{code:java}
@Override
public SnapshotResult get(CloseableRegistry 
snapshotCloseableRegistry)  
 throws Exception {
      try{
          ...
          completed = true;
          return snapshotResult;
       } finally {
           if (!completed) {
              .
            }
       }
} {code}
 


was (Author: lijinzhong):
Thanks for reply [~roman] 

I'd like to explain the reason for this issue in more detail. Please correct me 
if anything is wrong.

>> So if it's already running, cleanup() can be skipped by the current thread.
closeSnapshotIO() only closes the registry, and I don't see that the folder is 
registered with it. 

This is right, for this case, closeSnapshotIO() will close all the 
CheckpointStreams which belong to the folder, but it will not delete the folder.

 

1. But if cleanup() method is skipped here, it will still be invoke in 
[AsyncSnapshotCallable.call() 
finally|https://github.com/apache/flink/blob/81379a56495283020a5919e8115936c163b251ba/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L87]
 . 

2. AsyncSnapshotCallable.cleanup() can only delete the ONGOING folder, not 
COMPLETED.(AsyncSnapshotTask.cleanup() -> 
IncrementalRocksDBSnapshotResources.release -> SnapshotDirectory.cleanUp)
{code:java}
 public boolean cleanup() throws IOException {
      if (state.compareAndSet(State.ONGOING, State.DELETED)) {
          FileUtils.deleteDirectory(directory.toFile());
      }
      return true;
 } {code}
3. AsyncSnapshotTask.callInternal() will invoke 
RocksDBIncrementalSnapshotOperation.get(CloseableRegistry 
snapshotCloseableRegistry), in which the folder status will be [transformed 
from ONGOING to COMPLETED|#L422].]

If RocksDBIncrementalSnapshotOperation.get(CloseableRegistry) encounters *no* 
exception after rocksdb AsyncSnapshotTask is cancelled,

(1) the localSnapshot folder can't be cleaned-up by 
AsyncSnapshotCallable.cleanup() because the folder status is COMPLETED;

(2) the localSnapshot folder can't be cleaned-up by 
RocksDBIncrementalSnapshotOperation.get(CloseableRegistry)-finally because the 
completed flag is ture.
{code:java}
@Override
public SnapshotResult get(CloseableRegistry 
snapshotCloseableRegistry)  
 throws Exception {
      try{
          ...
          completed = true;
          return snapshotResult;
       } finally {
           if (!completed) {
              .
            }
       }
} {code}
 

> The files in local recovery directory hasn't be clean up properly after 
> checkpoint abort
> 
>
> 

[jira] [Comment Edited] (FLINK-28515) The files in local recovery directory hasn't be clean up properly after checkpoint abort

2022-07-22 Thread Jinzhong Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569936#comment-17569936
 ] 

Jinzhong Li edited comment on FLINK-28515 at 7/22/22 10:38 AM:
---

Thanks for reply [~roman] 

I'd like to explain the reason for this issue in more detail. Please correct me 
if anything is wrong.

>> So if it's already running, cleanup() can be skipped by the current thread.
closeSnapshotIO() only closes the registry, and I don't see that the folder is 
registered with it. 

This is right, for this case, closeSnapshotIO() will close all the 
CheckpointStreams which belong to the folder, but it will not delete the folder.

 

1. But if cleanup() method is skipped here, it will still be invoke in 
[AsyncSnapshotCallable.call() 
finally|https://github.com/apache/flink/blob/81379a56495283020a5919e8115936c163b251ba/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L87]
 . 

2. AsyncSnapshotCallable.cleanup() can only delete the ONGOING folder, not 
COMPLETED.(AsyncSnapshotTask.cleanup() -> 
IncrementalRocksDBSnapshotResources.release -> SnapshotDirectory.cleanUp)
{code:java}
 public boolean cleanup() throws IOException {
      if (state.compareAndSet(State.ONGOING, State.DELETED)) {
          FileUtils.deleteDirectory(directory.toFile());
      }
      return true;
 } {code}
3. AsyncSnapshotTask.callInternal() will invoke 
RocksDBIncrementalSnapshotOperation.get(CloseableRegistry 
snapshotCloseableRegistry), in which the folder status will be [transformed 
from ONGOING to 
COMPLETED|https://github.com/apache/flink/blob/81379a56495283020a5919e8115936c163b251ba/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L422]].

If RocksDBIncrementalSnapshotOperation.get(CloseableRegistry) encounters *no* 
exception after rocksdb AsyncSnapshotTask is cancelled,

(1) the localSnapshot folder can't be cleaned-up by 
AsyncSnapshotCallable.cleanup() because the folder status is COMPLETED;

(2) the localSnapshot folder can't be cleaned-up by 
RocksDBIncrementalSnapshotOperation.get(CloseableRegistry)-finally because the 
completed flag is ture.
{code:java}
@Override
public SnapshotResult get(CloseableRegistry 
snapshotCloseableRegistry)  
 throws Exception {
      try{
          ...
          completed = true;
          return snapshotResult;
       } finally {
           if (!completed) {
              .
            }
       }
} {code}
 


was (Author: lijinzhong):
Thanks for reply [~roman] 

I'd like to explain the reason for this issue in more detail. Please correct me 
if anything is wrong.

>> So if it's already running, cleanup() can be skipped by the current thread.
closeSnapshotIO() only closes the registry, and I don't see that the folder is 
registered with it. 

This is right, for this case, closeSnapshotIO() will close all the 
CheckpointStreams which belong to the folder, but it will not delete the folder.

 

1. But if cleanup() method is skipped here, it will still be invoke in 
[AsyncSnapshotCallable.call() 
finally|https://github.com/apache/flink/blob/81379a56495283020a5919e8115936c163b251ba/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L87]
 . 

2. AsyncSnapshotCallable.cleanup() can only delete the ONGOING folder, not 
COMPLETED.(AsyncSnapshotTask.cleanup() -> 
IncrementalRocksDBSnapshotResources.release -> SnapshotDirectory.cleanUp)
{code:java}
 public boolean cleanup() throws IOException {
      if (state.compareAndSet(State.ONGOING, State.DELETED)) {
          FileUtils.deleteDirectory(directory.toFile());
      }
      return true;
 } {code}
3. AsyncSnapshotTask.callInternal() will invoke 
RocksDBIncrementalSnapshotOperation.get(CloseableRegistry 
snapshotCloseableRegistry), in which the folder status will be [[transformed 
from ONGOING to 
COMPLETED|https://github.com/apache/flink/blob/81379a56495283020a5919e8115936c163b251ba/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L422]|#L422].]

If RocksDBIncrementalSnapshotOperation.get(CloseableRegistry) encounters *no* 
exception after rocksdb AsyncSnapshotTask is cancelled,

(1) the localSnapshot folder can't be cleaned-up by 
AsyncSnapshotCallable.cleanup() because the folder status is COMPLETED;

(2) the localSnapshot folder can't be cleaned-up by 
RocksDBIncrementalSnapshotOperation.get(CloseableRegistry)-finally because the 
completed flag is ture.
{code:java}
@Override
public SnapshotResult get(CloseableRegistry 
snapshotCloseableRegistry)  
 throws Exception {
      try{
          ...
          completed = true;
          return snapshotResult;
       } finally {
           if (!completed) {
              .
            

[jira] [Comment Edited] (FLINK-28515) The files in local recovery directory hasn't be clean up properly after checkpoint abort

2022-07-22 Thread Jinzhong Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569936#comment-17569936
 ] 

Jinzhong Li edited comment on FLINK-28515 at 7/22/22 10:36 AM:
---

Thanks for reply [~roman] 

I'd like to explain the reason for this issue in more detail. Please correct me 
if anything is wrong.

>> So if it's already running, cleanup() can be skipped by the current thread.
closeSnapshotIO() only closes the registry, and I don't see that the folder is 
registered with it. 

This is right, for this case, closeSnapshotIO() will close all the 
CheckpointStreams which belong to the folder, but it will not delete the folder.

 

1. But if cleanup() method is skipped here, it will still be invoke in 
[AsyncSnapshotCallable.call() 
finally|https://github.com/apache/flink/blob/81379a56495283020a5919e8115936c163b251ba/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L87]
 . 

2. AsyncSnapshotCallable.cleanup() can only delete the ONGOING folder, not 
COMPLETED.(AsyncSnapshotTask.cleanup() -> 
IncrementalRocksDBSnapshotResources.release -> SnapshotDirectory.cleanUp)
{code:java}
 public boolean cleanup() throws IOException {
      if (state.compareAndSet(State.ONGOING, State.DELETED)) {
          FileUtils.deleteDirectory(directory.toFile());
      }
      return true;
 } {code}
3. AsyncSnapshotTask.callInternal() will invoke 
RocksDBIncrementalSnapshotOperation.get(CloseableRegistry 
snapshotCloseableRegistry), in which the folder status will be [transformed 
from ONGOING to COMPLETED|#L422].]

If RocksDBIncrementalSnapshotOperation.get(CloseableRegistry) encounters *no* 
exception after rocksdb AsyncSnapshotTask is cancelled,

(1) the localSnapshot folder can't be cleaned-up by 
AsyncSnapshotCallable.cleanup() because the folder status is COMPLETED;

(2) the localSnapshot folder can't be cleaned-up by 
RocksDBIncrementalSnapshotOperation.get(CloseableRegistry)-finally because the 
completed flag is ture.
{code:java}
@Override
public SnapshotResult get(CloseableRegistry 
snapshotCloseableRegistry)  
 throws Exception {
      try{
          ...
          completed = true;
          return snapshotResult;
       } finally {
           if (!completed) {
              .
            }
       }
} {code}
 


was (Author: lijinzhong):
Thanks for reply [~roman] 

I'd like to explain the reason for this issue in more detail. Please correct me 
if anything is wrong.

>> So if it's already running, cleanup() can be skipped by the current thread.
closeSnapshotIO() only closes the registry, and I don't see that the folder is 
registered with it. 

This is right, for this case, closeSnapshotIO() will close all the 
CheckpointStreams which belong to the folder, but it will not delete the folder.

 

1. But if cleanup() method is skipped here, it will still be invoke in 
[AsyncSnapshotCallable.call() 
finally|https://github.com/apache/flink/blob/81379a56495283020a5919e8115936c163b251ba/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L87]
 . 

2. AsyncSnapshotCallable.cleanup() can only delete the ONGOING folder, not 
COMPLETED.(AsyncSnapshotTask.cleanup() -> 
IncrementalRocksDBSnapshotResources.release -> SnapshotDirectory.cleanUp)
{code:java}
 public boolean cleanup() throws IOException {
      if (state.compareAndSet(State.ONGOING, State.DELETED)) {
          FileUtils.deleteDirectory(directory.toFile());
      }
      return true;
 } {code}
3. AsyncSnapshotTask.callInternal() will invoke 
RocksDBIncrementalSnapshotOperation.get(CloseableRegistry 
snapshotCloseableRegistry), in which the folder status will be [transformed 
from ONGOING to 
COMPLETED|[https://github.com/apache/flink/blob/81379a56495283020a5919e8115936c163b251ba/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L422].]

(1) the localSnapshot folder can't be cleaned-up by 
AsyncSnapshotCallable.cleanup() because the folder status is COMPLETED;

(2) the localSnapshot folder can't be cleaned-up by 
RocksDBIncrementalSnapshotOperation.get(CloseableRegistry)-finally because the 
completed flag is ture.
{code:java}
@Override
public SnapshotResult get(CloseableRegistry 
snapshotCloseableRegistry)  
 throws Exception {
      try{
          ...
          completed = true;
          return snapshotResult;
       } finally {
           if (!completed) {
              .
            }
       }
} {code}
 

> The files in local recovery directory hasn't be clean up properly after 
> checkpoint abort
> 
>
> Key: FLINK-28515
> URL: https://issues.apache.org/jira/browse/FLINK-28515
> Project: Flink
>  Issue Type: B

  1   2   >