(flink) branch master updated (39779829b88 -> 7c4dec63eb4)

2024-04-19 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 39779829b88 [FLINK-34954][core] Kryo Input bug fix
 add 7c4dec63eb4 [FLINK-28048][connectors] Introduce Source API alternative 
to FiniteTestSource (#23777)

No new revisions were added by this update.

Summary of changes:
 .../f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e   | 118 +++
 flink-connectors/flink-connector-datagen/pom.xml   |  16 +++
 ...tion.java => IndexLookupGeneratorFunction.java} | 145 ---
 .../datagen/source/DataGeneratorSource.java|   2 +-
 ...ittingSourceReaderWithCheckpointsInBetween.java | 158 +
 .../datagen/source/TestDataGenerators.java |  83 +++
 flink-connectors/flink-connector-hive/pom.xml  |   8 ++
 .../flink/connectors/hive/HiveTableSinkITCase.java |  49 ---
 .../connectors/hive/HiveTableSourceITCase.java |  40 +++---
 .../source/lib/util/IteratorSourceReaderBase.java  |  10 +-
 flink-formats/flink-avro/pom.xml   |   8 ++
 .../formats/avro/AvroStreamingFileSinkITCase.java  |  25 +++-
 flink-formats/flink-compress/pom.xml   |   8 ++
 .../formats/compress/CompressionFactoryITCase.java |  10 +-
 flink-formats/flink-csv/pom.xml|   8 ++
 flink-formats/flink-hadoop-bulk/pom.xml|   8 ++
 .../bulk/HadoopPathBasedPartFileWriterITCase.java  |  15 +-
 flink-formats/flink-json/pom.xml   |   8 ++
 flink-formats/flink-orc/pom.xml|   8 ++
 .../flink/orc/writer/OrcBulkWriterITCase.java  |  10 +-
 flink-formats/flink-parquet/pom.xml|   8 ++
 .../avro/AvroParquetStreamingFileSinkITCase.java   |  21 ++-
 .../ParquetProtoStreamingFileSinkITCase.java   |  10 +-
 flink-formats/flink-sequence-file/pom.xml  |   8 ++
 .../SequenceStreamingFileSinkITCase.java   |  12 +-
 flink-table/flink-table-planner/pom.xml|  10 +-
 .../runtime/stream/sql/CompactionITCaseBase.java   |  20 +--
 flink-tests/pom.xml|   2 +
 .../flink/test/streaming/runtime/SinkITCase.java   |  75 +-
 29 files changed, 652 insertions(+), 251 deletions(-)
 copy 
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/{FromElementsGeneratorFunction.java
 => IndexLookupGeneratorFunction.java} (61%)
 create mode 100644 
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DoubleEmittingSourceReaderWithCheckpointsInBetween.java
 create mode 100644 
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/TestDataGenerators.java



(flink) branch master updated: [hotfix][docs] config key for parquet int64 option (#23909)

2023-12-12 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 6334923af1e [hotfix][docs] config key for parquet int64 option (#23909)
6334923af1e is described below

commit 6334923af1e31f5e3d08425ecca27e0ee021dae8
Author: Thomas Weise 
AuthorDate: Tue Dec 12 10:36:16 2023 -0500

[hotfix][docs] config key for parquet int64 option (#23909)
---
 docs/content/docs/connectors/table/formats/parquet.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/content/docs/connectors/table/formats/parquet.md 
b/docs/content/docs/connectors/table/formats/parquet.md
index 0e53f4b919e..2b543c0eaab 100644
--- a/docs/content/docs/connectors/table/formats/parquet.md
+++ b/docs/content/docs/connectors/table/formats/parquet.md
@@ -110,7 +110,7 @@ Data Type Mapping
 Currently, Parquet format type mapping is compatible with Apache Hive, but by 
default not with Apache Spark:
 
 - Timestamp: mapping timestamp type to int96 whatever the precision is.
-- Spark compatibility requires int64 via config option `timestamp.time.unit` 
(see above).
+- Spark compatibility requires int64 via config option `write.int64.timestamp` 
(see above).
 - Decimal: mapping decimal type to fixed length byte array according to the 
precision.
 
 The following table lists the type mapping from Flink type to Parquet type.



(flink) branch master updated: [FLINK-25565][Formats][Parquet] timestamp int64 option tidy up (#23900)

2023-12-11 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 548e4b5188b [FLINK-25565][Formats][Parquet] timestamp int64 option 
tidy up (#23900)
548e4b5188b is described below

commit 548e4b5188bb3f092206182d779a909756408660
Author: Thomas Weise 
AuthorDate: Mon Dec 11 08:05:53 2023 -0500

[FLINK-25565][Formats][Parquet] timestamp int64 option tidy up (#23900)
---
 docs/content/docs/connectors/table/formats/parquet.md   | 5 +++--
 .../flink/formats/parquet/vector/reader/TimestampColumnReader.java  | 6 +++---
 2 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/docs/content/docs/connectors/table/formats/parquet.md 
b/docs/content/docs/connectors/table/formats/parquet.md
index 75c524f238f..0e53f4b919e 100644
--- a/docs/content/docs/connectors/table/formats/parquet.md
+++ b/docs/content/docs/connectors/table/formats/parquet.md
@@ -107,9 +107,10 @@ For example, you can configure `parquet.compression=GZIP` 
to enable gzip compres
 Data Type Mapping
 
 
-Currently, Parquet format type mapping is compatible with Apache Hive, but 
different with Apache Spark:
+Currently, Parquet format type mapping is compatible with Apache Hive, but by 
default not with Apache Spark:
 
 - Timestamp: mapping timestamp type to int96 whatever the precision is.
+- Spark compatibility requires int64 via config option `timestamp.time.unit` 
(see above).
 - Decimal: mapping decimal type to fixed length byte array according to the 
precision.
 
 The following table lists the type mapping from Flink type to Parquet type.
@@ -185,7 +186,7 @@ The following table lists the type mapping from Flink type 
to Parquet type.
 
 
   TIMESTAMP
-  INT96
+  INT96 (or INT64)
   
 
 
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java
index aa544f4e91c..7a36edc573b 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java
@@ -159,8 +159,8 @@ public class TimestampColumnReader extends 
AbstractColumnReader

(flink) branch release-1.18 updated: [FLINK-25565][Formats][Parquet] write and read parquet int64 timestamp (#18304) (#23887)

2023-12-08 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
 new 4379f2c39fb [FLINK-25565][Formats][Parquet] write and read parquet 
int64 timestamp (#18304) (#23887)
4379f2c39fb is described below

commit 4379f2c39fbdaeb78f874f6dd461d20b8a8961cd
Author: Thomas Weise 
AuthorDate: Fri Dec 8 13:49:04 2023 -0500

[FLINK-25565][Formats][Parquet] write and read parquet int64 timestamp 
(#18304) (#23887)

Co-authored-by: Bo Cui 
---
 .../docs/connectors/table/formats/parquet.md   |  14 ++
 .../docs/connectors/table/formats/parquet.md   |  14 ++
 .../formats/parquet/ParquetFileFormatFactory.java  |  15 ++
 .../parquet/ParquetVectorizedInputFormat.java  |  13 +-
 .../formats/parquet/row/ParquetRowDataBuilder.java |  13 +-
 .../formats/parquet/row/ParquetRowDataWriter.java  |  67 -
 .../parquet/utils/ParquetSchemaConverter.java  |  50 +++-
 .../formats/parquet/vector/ParquetDictionary.java  |  20 +-
 .../parquet/vector/ParquetSplitReaderUtil.java |   3 +-
 .../vector/reader/AbstractColumnReader.java|   2 +-
 .../vector/reader/TimestampColumnReader.java   | 101 +++-
 .../formats/parquet/ParquetTimestampITCase.java| 280 +
 .../parquet/row/ParquetRowDataWriterTest.java  |  14 ++
 .../vector/ParquetInt64TimestampReaderTest.java|  69 +
 .../runtime/stream/FsStreamingSinkITCaseBase.scala |  65 +++--
 15 files changed, 683 insertions(+), 57 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/formats/parquet.md 
b/docs/content.zh/docs/connectors/table/formats/parquet.md
index 92536d00f98..295b11ef41b 100644
--- a/docs/content.zh/docs/connectors/table/formats/parquet.md
+++ b/docs/content.zh/docs/connectors/table/formats/parquet.md
@@ -84,6 +84,20 @@ Format 参数
   Boolean
   使用 UTC 时区或本地时区在纪元时间和 LocalDateTime 之间进行转换。Hive 0.x/1.x/2.x 使用本地时区,但 
Hive 3.x 使用 UTC 时区。
 
+
+  timestamp.time.unit
+  optional
+  micros
+  String
+  根据TimeUnit在Timestamp和int64之间进行转换,可选值nanos/micros/millis。
+
+
+  write.int64.timestamp
+  optional
+  false
+  Boolean
+  以int64替代int96存储parquet Timestamp。 注意:Timestamp将于时区无关(从不转换为不同时区)。
+
 
 
 
diff --git a/docs/content/docs/connectors/table/formats/parquet.md 
b/docs/content/docs/connectors/table/formats/parquet.md
index 0b7ba9c42f5..75c524f238f 100644
--- a/docs/content/docs/connectors/table/formats/parquet.md
+++ b/docs/content/docs/connectors/table/formats/parquet.md
@@ -84,6 +84,20 @@ Format Options
   Boolean
   Use UTC timezone or local timezone to the conversion between epoch 
time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x use 
UTC timezone.
 
+
+  timestamp.time.unit
+  optional
+  micros
+  String
+  Store parquet int64/LogicalTypes timestamps in this time unit, value 
is nanos/micros/millis.
+
+
+  write.int64.timestamp
+  optional
+  false
+  Boolean
+  Write parquet timestamp as int64/LogicalTypes instead of 
int96/OriginalTypes. Note: Timestamp will be time zone agnostic (NEVER 
converted to a different time zone).
+
 
 
 
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java
index 14f257899c1..8be727c7947 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java
@@ -68,6 +68,21 @@ public class ParquetFileFormatFactory implements 
BulkReaderFormatFactory, BulkWr
 + " time and LocalDateTime. Hive 
0.x/1.x/2.x use local timezone. But Hive 3.x"
 + " use UTC timezone");
 
+public static final ConfigOption TIMESTAMP_TIME_UNIT =
+key("timestamp.time.unit")
+.stringType()
+.defaultValue("micros")
+.withDescription(
+"Store parquet int64/LogicalTypes timestamps in 
this time unit, value is nanos/micros/millis");
+
+public static final ConfigOption WRITE_INT64_TIMESTAMP =
+key("write.int64.timestamp")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"Write parquet timestamp as int64/LogicalTypes 
instead of int96/OriginalTypes. "
++ "Note: Timestamp will be time zone 
agnostic (NEVER converted to

(flink) branch master updated (43fec308b32 -> 978bff3bb90)

2023-12-06 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 43fec308b32 [FLINK-33599] Run restore tests with RocksDB state backend 
(#23883)
 add 978bff3bb90 [FLINK-25565][Formats][Parquet] write and read parquet 
int64 timestamp (#18304)

No new revisions were added by this update.

Summary of changes:
 .../docs/connectors/table/formats/parquet.md   |  14 ++
 .../docs/connectors/table/formats/parquet.md   |  14 ++
 .../formats/parquet/ParquetFileFormatFactory.java  |  15 ++
 .../parquet/ParquetVectorizedInputFormat.java  |  13 +-
 .../formats/parquet/row/ParquetRowDataBuilder.java |  13 +-
 .../formats/parquet/row/ParquetRowDataWriter.java  |  67 -
 .../parquet/utils/ParquetSchemaConverter.java  |  50 +++-
 .../formats/parquet/vector/ParquetDictionary.java  |  20 +-
 .../parquet/vector/ParquetSplitReaderUtil.java |   3 +-
 .../vector/reader/AbstractColumnReader.java|   2 +-
 .../vector/reader/TimestampColumnReader.java   | 101 +++-
 .../formats/parquet/ParquetTimestampITCase.java| 280 +
 .../parquet/row/ParquetRowDataWriterTest.java  |  14 ++
 .../vector/ParquetInt64TimestampReaderTest.java|  69 +
 .../runtime/stream/FsStreamingSinkITCaseBase.scala |  65 +++--
 15 files changed, 683 insertions(+), 57 deletions(-)
 create mode 100644 
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTimestampITCase.java
 create mode 100644 
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetInt64TimestampReaderTest.java



(flink) branch master updated (7eaa5db3024 -> acd3e7ab66c)

2023-12-04 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 7eaa5db3024 [FLINK-33581][python] Deprecate getter/setter methods 
related to state backend in python flink.
 add acd3e7ab66c [FLINK-27529] [connector/common] Fix Integer Comparison 
For HybridSource sourceIndex (#23703)

No new revisions were added by this update.

Summary of changes:
 .../source/hybrid/HybridSourceSplitEnumerator.java |  5 +-
 .../hybrid/HybridSourceSplitEnumeratorTest.java| 56 ++
 2 files changed, 59 insertions(+), 2 deletions(-)



[flink] branch master updated: [FLINK-32884] [flink-clients] Sending messageHeaders with decorated customHeaders for PyFlink client (#23452)

2023-09-26 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new c5516533a57 [FLINK-32884] [flink-clients] Sending messageHeaders with 
decorated customHeaders for PyFlink client (#23452)
c5516533a57 is described below

commit c5516533a5705297fe3cfd33860e59da30750f69
Author: Elkhan Dadash 
AuthorDate: Tue Sep 26 22:30:12 2023 -0700

[FLINK-32884] [flink-clients] Sending messageHeaders with decorated 
customHeaders for PyFlink client (#23452)
---
 .../org/apache/flink/client/program/rest/RestClusterClient.java  | 2 +-
 .../org/apache/flink/client/program/rest/UrlPrefixDecorator.java | 9 +
 .../apache/flink/client/program/rest/RestClusterClientTest.java  | 6 ++
 .../flink/runtime/rest/messages/CustomHeadersDecorator.java  | 9 +
 4 files changed, 25 insertions(+), 1 deletion(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index b05dc331853..cc5a6d2d833 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -1021,7 +1021,7 @@ public class RestClusterClient implements 
ClusterClient {
 restClient.sendRequest(
 
webMonitorBaseUrl.getHost(),
 
webMonitorBaseUrl.getPort(),
-messageHeaders,
+headers,
 
messageParameters,
 request,
 filesToUpload);
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java
index bd6398176c5..52f1b8cf56e 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java
@@ -107,4 +107,13 @@ public class UrlPrefixDecorator<
 public Collection> getSupportedAPIVersions() {
 return decorated.getSupportedAPIVersions();
 }
+
+@Override
+public Collection> getResponseTypeParameters() {
+return decorated.getResponseTypeParameters();
+}
+
+public MessageHeaders getDecorated() {
+return decorated;
+}
 }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 4f7e2f0b08d..e1db5c6d1b0 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -61,6 +61,7 @@ import 
org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
 import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
 import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
 import 
org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValueQueryParameter;
+import org.apache.flink.runtime.rest.messages.CustomHeadersDecorator;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -909,6 +910,11 @@ class RestClusterClientTest {
 final AtomicBoolean firstSubmitRequestFailed = new 
AtomicBoolean(false);
 failHttpRequest =
 (messageHeaders, messageParameters, requestBody) -> {
+messageHeaders =
+((UrlPrefixDecorator)
+((CustomHeadersDecorator) 
messageHeaders)
+.getDecorated())
+.getDecorated();
 if (messageHeaders instanceof JobExecutionResultHeaders) {
 return !firstExecutionResultPollFailed.getAndSet(true);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java
index 979c849166c..05fa82

[flink] branch master updated: PyFlink remote execution should support URLs with paths and https scheme

2023-09-21 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 5682472e402 PyFlink remote execution should support URLs with paths 
and https scheme
5682472e402 is described below

commit 5682472e4029c25d4a2651a609c999029fa3281b
Author: Elkhan Dadashov 
AuthorDate: Tue Sep 12 21:32:21 2023 -0700

PyFlink remote execution should support URLs with paths and https scheme
---
 .../generated/common_host_port_section.html|  6 ++
 .../shortcodes/generated/rest_configuration.html   |  6 ++
 .../org/apache/flink/client/cli/DefaultCLI.java| 12 
 .../client/program/rest/RestClusterClient.java | 39 -
 .../apache/flink/client/cli/DefaultCLITest.java| 32 ++-
 .../client/program/rest/RestClusterClientTest.java | 66 +-
 .../apache/flink/configuration/RestOptions.java|  9 +++
 .../main/java/org/apache/flink/util/NetUtils.java  |  9 ++-
 .../java/org/apache/flink/util/NetUtilsTest.java   |  8 +++
 9 files changed, 170 insertions(+), 17 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/common_host_port_section.html 
b/docs/layouts/shortcodes/generated/common_host_port_section.html
index c2753d125e9..624278f7fb4 100644
--- a/docs/layouts/shortcodes/generated/common_host_port_section.html
+++ b/docs/layouts/shortcodes/generated/common_host_port_section.html
@@ -44,6 +44,12 @@
 String
 The port that the server binds itself. Accepts a list of ports 
(“50100,50101”), ranges (“50100-50200”) or a combination of both. It is 
recommended to set a range of ports to avoid collisions when multiple Rest 
servers are running on the same machine.
 
+
+rest.path
+(none)
+String
+The path that should be used by clients to interact to the 
server which is accessible via URL.
+
 
 rest.port
 8081
diff --git a/docs/layouts/shortcodes/generated/rest_configuration.html 
b/docs/layouts/shortcodes/generated/rest_configuration.html
index 467b4275acd..af006be2037 100644
--- a/docs/layouts/shortcodes/generated/rest_configuration.html
+++ b/docs/layouts/shortcodes/generated/rest_configuration.html
@@ -104,6 +104,12 @@
 Long
 The maximum time in ms for a connection to stay idle before 
failing.
 
+
+rest.path
+(none)
+String
+The path that should be used by clients to interact to the 
server which is accessible via URL.
+
 
 rest.port
 8081
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 273e007a8ae..d7f2fb25ce4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -21,6 +21,8 @@ package org.apache.flink.client.cli;
 import org.apache.flink.client.deployment.executors.RemoteExecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.NetUtils;
 
@@ -29,6 +31,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 
 import java.net.InetSocketAddress;
+import java.net.URL;
 
 import static 
org.apache.flink.client.cli.CliFrontend.setJobManagerAddressInConfig;
 
@@ -60,6 +63,11 @@ public class DefaultCLI extends AbstractCustomCommandLine {
 String addressWithPort = 
commandLine.getOptionValue(addressOption.getOpt());
 InetSocketAddress jobManagerAddress = 
NetUtils.parseHostPortAddress(addressWithPort);
 setJobManagerAddressInConfig(resultingConfiguration, 
jobManagerAddress);
+
+URL url = NetUtils.getCorrectHostnamePort(addressWithPort);
+resultingConfiguration.setString(RestOptions.PATH, url.getPath());
+resultingConfiguration.setBoolean(
+SecurityOptions.SSL_REST_ENABLED, isHttpsProtocol(url));
 }
 resultingConfiguration.setString(DeploymentOptions.TARGET, 
RemoteExecutor.NAME);
 
@@ -68,6 +76,10 @@ public class DefaultCLI extends AbstractCustomCommandLine {
 return resultingConfiguration;
 }
 
+private static boolean isHttpsProtocol(URL url) {
+return url.getProtocol() != null && 
(url.getProtocol().equalsIgnoreCase("https"));
+}
+
 @Override
 public String getId() {
 return ID;
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterCli

[flink] branch master updated: [FLINK-32885 ] Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used by RestClusterClient for PyFlink remote execution

2023-09-12 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new e7eeea033a6 [FLINK-32885 ] Refactoring: Moving UrlPrefixDecorator into 
flink-clients so it can be used by RestClusterClient for PyFlink remote 
execution
e7eeea033a6 is described below

commit e7eeea033a68e1ff6bf82132b5a59eb0a5a2d0ed
Author: Elkhan Dadashov 
AuthorDate: Fri Sep 8 14:43:03 2023 -0700

[FLINK-32885 ] Refactoring: Moving UrlPrefixDecorator into flink-clients so 
it can be used by RestClusterClient for PyFlink remote execution
---
 .../java/org/apache/flink/client/ClientUtils.java  | 23 ++
 .../client/program/rest}/UrlPrefixDecorator.java   | 13 ---
 .../flink/table/client/gateway/ExecutorImpl.java   | 27 --
 3 files changed, 37 insertions(+), 26 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java 
b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index e6a39a5f1a0..b828534bea5 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -33,6 +33,7 @@ import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
 import org.apache.flink.runtime.client.JobInitializationException;
 import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.rest.HttpHeader;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkUserCodeClassLoaders;
 import org.apache.flink.util.SerializedThrowable;
@@ -43,6 +44,8 @@ import org.slf4j.LoggerFactory;
 
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.Executors;
@@ -185,4 +188,24 @@ public enum ClientUtils {
 TimeUnit.MILLISECONDS);
 return scheduledExecutor;
 }
+
+public static Collection 
readHeadersFromEnvironmentVariable(String envVarName) {
+List headers = new ArrayList<>();
+String rawHeaders = System.getenv(envVarName);
+
+if (rawHeaders != null) {
+String[] lines = rawHeaders.split("\n");
+for (String line : lines) {
+String[] keyValue = line.split(":", 2);
+if (keyValue.length == 2) {
+headers.add(new HttpHeader(keyValue[0], keyValue[1]));
+} else {
+LOG.info(
+"Skipped a malformed header {} from 
FLINK_REST_CLIENT_HEADERS env variable. Expecting newline-separated headers in 
format header_name:header_value.",
+line);
+}
+}
+}
+return headers;
+}
 }
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/util/UrlPrefixDecorator.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java
similarity index 91%
rename from 
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/util/UrlPrefixDecorator.java
rename to 
flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java
index 9e905029fc4..bd6398176c5 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/util/UrlPrefixDecorator.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.gateway.rest.header.util;
+package org.apache.flink.client.program.rest;
 
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.RestClient;
@@ -24,10 +24,12 @@ import 
org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
-import org.apache.flink.table.gateway.rest.header.SqlGatewayMessageHeaders;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import java.util.Collection;
+
 import static 
org.apache.flink.shaded.guava31.com.google.common.base.Preconditions.checkNotNull;
 
 /**
@@ -41,7 +43,7 @@ import static 
org.apache.flink.shaded.guava31.com.google.common.base.Preconditio
  */
 public class UrlPrefixDecorator<
 R extends RequestBody, P extends ResponseBody, M extends 
MessageParameters>
-implements SqlGatewayMessageHe

[flink] branch master updated (079cd60510d -> 2b14c3ae834)

2023-08-16 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 079cd60510d [FLINK-32730][table-planner] Fix the bug of Scan reuse 
after projection pushdown without considering the dpp pattern
 add 2b14c3ae834 [docs] Add documentation for SQL Client URL support

No new revisions were added by this update.

Summary of changes:
 docs/content.zh/docs/dev/table/sqlClient.md | 20 
 docs/content/docs/dev/table/sqlClient.md| 19 +++
 2 files changed, 39 insertions(+)



[flink] branch master updated (d88ef0e355b -> 6e61b932a3e)

2023-07-07 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from d88ef0e355b [FLINK-32502][runtime] Remove the 
AbstractLeaderElectionService (#22959)
 add 6e61b932a3e [FLINK-32035][sql-client] Support HTTPS endpoints in SQL 
Client gateway mode (#22810)

No new revisions were added by this update.

Summary of changes:
 .../flink-end-to-end-tests-restclient}/pom.xml |  31 ++---
 .../flink/runtime/rest/RestClientITCase.java   | 142 +
 .../src/test/resources/log4j2-test.properties  |   0
 flink-end-to-end-tests/pom.xml |   1 +
 .../org/apache/flink/runtime/net/SSLUtils.java |  60 ++---
 .../org/apache/flink/runtime/rest/RestClient.java  |  37 +-
 .../flink/table/client/gateway/ExecutorImpl.java   |   7 +-
 7 files changed, 227 insertions(+), 51 deletions(-)
 copy {flink-state-backends/flink-statebackend-heap-spillable => 
flink-end-to-end-tests/flink-end-to-end-tests-restclient}/pom.xml (73%)
 create mode 100644 
flink-end-to-end-tests/flink-end-to-end-tests-restclient/src/test/java/org/apache/flink/runtime/rest/RestClientITCase.java
 copy {flink-architecture-tests/flink-architecture-tests-production => 
flink-end-to-end-tests/flink-end-to-end-tests-restclient}/src/test/resources/log4j2-test.properties
 (100%)



[flink] branch master updated: [FLINK-32373][sql-client] Support passing headers with SQL Client gateway requests (#22816)

2023-07-06 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 1354d2fae3f [FLINK-32373][sql-client] Support passing headers with SQL 
Client gateway requests (#22816)
1354d2fae3f is described below

commit 1354d2fae3fde2a448ce1fac5dee7859973a93e1
Author: Alexander Fedulov <1492164+afedu...@users.noreply.github.com>
AuthorDate: Thu Jul 6 21:34:36 2023 +0200

[FLINK-32373][sql-client] Support passing headers with SQL Client gateway 
requests (#22816)
---
 .../flink/configuration/ConfigConstants.java   |   6 ++
 .../org/apache/flink/runtime/rest/HttpHeader.java  |  83 ++
 .../org/apache/flink/runtime/rest/RestClient.java  |  19 ++--
 .../rest/messages/CustomHeadersDecorator.java  | 120 +
 .../runtime/rest/messages/MessageHeaders.java  |  14 +++
 .../flink/table/client/gateway/ExecutorImpl.java   |  57 --
 .../table/client/gateway/ExecutorImplITCase.java   |  19 
 7 files changed, 302 insertions(+), 16 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 3c7a89c9e00..a4968db4550 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1756,6 +1756,12 @@ public final class ConfigConstants {
 /** The user lib directory name. */
 public static final String DEFAULT_FLINK_USR_LIB_DIR = "usrlib";
 
+/**
+ * The environment variable name which contains a list of 
newline-separated HTTP headers for
+ * Flink's REST client.
+ */
+public static final String FLINK_REST_CLIENT_HEADERS = 
"FLINK_REST_CLIENT_HEADERS";
+
 //  Encoding --
 
 public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpHeader.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpHeader.java
new file mode 100644
index 000..06ee95bd451
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpHeader.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import java.util.Objects;
+
+/** Represents an HTTP header with a name and a value. */
+public class HttpHeader {
+
+/** The name of the HTTP header. */
+private final String name;
+
+/** The value of the HTTP header. */
+private final String value;
+
+/**
+ * Constructs an {@code HttpHeader} object with the specified name and 
value.
+ *
+ * @param name the name of the HTTP header
+ * @param value the value of the HTTP header
+ */
+public HttpHeader(String name, String value) {
+this.name = name;
+this.value = value;
+}
+
+/**
+ * Returns the name of this HTTP header.
+ *
+ * @return the name of this HTTP header
+ */
+public String getName() {
+return name;
+}
+
+/**
+ * Returns the value of this HTTP header.
+ *
+ * @return the value of this HTTP header
+ */
+public String getValue() {
+return value;
+}
+
+@Override
+public String toString() {
+return "HttpHeader{" + "name='" + name + '\'' + ", value='" + value + 
'\'' + '}';
+}
+
+@Override
+public boolean equals(Object other) {
+if (this == other) {
+return true;
+}
+if (other == null || getClass() != other.getClass()) {
+return false;
+}
+HttpHeader that = (HttpHeader) other;
+return Objects.equals(getName(), that.getName())
+&& Objects.equals(getValue(), that.getValue());
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(getName(), getValue()

[flink] branch master updated: [FLINK-32030][sql-client] Add URLs support for SQL Client gateway mode (#22556)

2023-05-16 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 4bd51ce122d [FLINK-32030][sql-client] Add URLs support for SQL Client 
gateway mode (#22556)
4bd51ce122d is described below

commit 4bd51ce122d03a13cfd6fdf69325630679cd5053
Author: Alexander Fedulov <1492164+afedu...@users.noreply.github.com>
AuthorDate: Tue May 16 18:52:06 2023 +0200

[FLINK-32030][sql-client] Add URLs support for SQL Client gateway mode 
(#22556)
---
 .../main/java/org/apache/flink/util/NetUtils.java  |  12 +++
 .../java/org/apache/flink/util/NetUtilsTest.java   |  11 +++
 .../org/apache/flink/runtime/rest/RestClient.java  |  14 ++-
 .../apache/flink/table/client/cli/CliOptions.java  |   7 +-
 .../flink/table/client/cli/CliOptionsParser.java   |  31 ++-
 .../flink/table/client/gateway/Executor.java   |   5 +
 .../flink/table/client/gateway/ExecutorImpl.java   |  49 +++---
 .../apache/flink/table/client/SqlClientTest.java   |  17 +++-
 .../rest/header/util/UrlPrefixDecorator.java   | 103 +
 9 files changed, 231 insertions(+), 18 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index fc3be6cf9b4..bab4d9b3208 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -120,6 +120,18 @@ public class NetUtils {
 }
 }
 
+/**
+ * Converts an InetSocketAddress to a URL. This method assigns the 
"http://; schema to the URL
+ * by default.
+ *
+ * @param socketAddress the InetSocketAddress to be converted
+ * @return a URL object representing the provided socket address with 
"http://; schema
+ */
+public static URL socketToUrl(InetSocketAddress socketAddress) {
+String hostPort = socketAddress.getHostString() + ":" + 
socketAddress.getPort();
+return validateHostPortString(hostPort);
+}
+
 /**
  * Calls {@link ServerSocket#accept()} on the provided server socket, 
suppressing any thrown
  * {@link SocketTimeoutException}s. This is a workaround for the 
underlying JDK-8237858 bug in
diff --git a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java 
b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
index a835e2bad7c..6168da4474e 100644
--- a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.util;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
@@ -32,6 +34,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
 
+import static org.apache.flink.util.NetUtils.socketToUrl;
 import static org.hamcrest.core.IsCollectionContaining.hasItems;
 import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertEquals;
@@ -343,4 +346,12 @@ public class NetUtilsTest extends TestLogger {
 NetUtils.unresolvedHostAndPortToNormalizedString(host, 
port));
 }
 }
+
+@Test
+public void testSocketToUrl() throws MalformedURLException {
+InetSocketAddress socketAddress = new InetSocketAddress("foo.com", 
8080);
+URL expectedResult = new URL("http://foo.com:8080;);
+
+
Assertions.assertThat(socketToUrl(socketAddress)).isEqualTo(expectedResult);
+}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 2ab8fdac579..03a5c7b0742 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -123,6 +123,8 @@ public class RestClient implements AutoCloseableAsync {
 
 private final AtomicBoolean isRunning = new AtomicBoolean(true);
 
+public static final String VERSION_PLACEHOLDER = "{{VERSION}}";
+
 @VisibleForTesting List 
outboundChannelHandlerFactories;
 
 public RestClient(Configuration configuration, Executor executor)
@@ -353,7 +355,7 @@ public class RestClient implements AutoCloseableAsync {
 }
 
 String versionedHandlerURL =
-"/" + apiVersion.getURLVersionPrefix() + 
messageHeaders.getTargetRestEndpointURL();
+constructVersionedHandlerUrl(messageHeaders, 
apiVersion.getURLVersionPrefix

[flink] branch master updated: [FLINK-29099][connectors/kinesis] Update global watermark for idle subtask (#20844)

2022-11-28 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 61834da8298 [FLINK-29099][connectors/kinesis] Update global watermark 
for idle subtask (#20844)
61834da8298 is described below

commit 61834da82984323484718e551c15e31ce023e026
Author: Seth Saperstein <99828679+sethsaperstein-l...@users.noreply.github.com>
AuthorDate: Mon Nov 28 15:35:01 2022 -0800

[FLINK-29099][connectors/kinesis] Update global watermark for idle subtask 
(#20844)
---
 .../connectors/kinesis/internals/KinesisDataFetcher.java   | 14 --
 .../kinesis/util/JobManagerWatermarkTracker.java   |  5 +
 .../kinesis/util/JobManagerWatermarkTrackerTest.java   |  1 +
 3 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 4fcc80a250e..d6c7b296da8 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -1174,6 +1174,14 @@ public class KinesisDataFetcher {
 }
 }
 
+LOG.debug(
+"WatermarkEmitter subtask: {}, last watermark: {}, potential 
watermark: {}"
++ ", potential next watermark: {}",
+indexOfThisConsumerSubtask,
+lastWatermark,
+potentialWatermark,
+potentialNextWatermark);
+
 // advance watermark if possible (watermarks can only be ascending)
 if (potentialWatermark == Long.MAX_VALUE) {
 if (shardWatermarks.isEmpty() || shardIdleIntervalMillis > 0) {
@@ -1265,11 +1273,11 @@ public class KinesisDataFetcher {
 public void onProcessingTime(long timestamp) {
 if (nextWatermark != Long.MIN_VALUE) {
 long globalWatermark = lastGlobalWatermark;
-// TODO: refresh watermark while idle
 if (!(isIdle && nextWatermark == propagatedLocalWatermark)) {
 globalWatermark = 
watermarkTracker.updateWatermark(nextWatermark);
 propagatedLocalWatermark = nextWatermark;
 } else {
+globalWatermark = 
watermarkTracker.updateWatermark(Long.MIN_VALUE);
 LOG.info(
 "WatermarkSyncCallback subtask: {} is idle",
 indexOfThisConsumerSubtask);
@@ -1279,12 +1287,14 @@ public class KinesisDataFetcher {
 lastLogged = System.currentTimeMillis();
 LOG.info(
 "WatermarkSyncCallback subtask: {} local 
watermark: {}"
-+ ", global watermark: {}, delta: {} 
timeouts: {}, emitter: {}",
++ ", global watermark: {}, delta: {} 
timeouts: {}, idle: {}"
++ ", emitter: {}",
 indexOfThisConsumerSubtask,
 nextWatermark,
 globalWatermark,
 nextWatermark - globalWatermark,
 watermarkTracker.getUpdateTimeoutCount(),
+isIdle,
 recordEmitter.printInfo());
 
 // Following is for debugging non-reproducible issue with 
stalled watermark
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
index b4c78438dca..51d055e2c96 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
@@ -129,6 +129,11 @@ public class JobManagerWatermarkTracker extends 
WatermarkTracker {
 } catch (Exception e) {
 throw new RuntimeException(e);
 }
+// no op to get global watermark without updating it
+if (value.watermark == Long.MIN_VALUE) {
+addCount--;
+return accumulator;
+  

[flink-kubernetes-operator] branch main updated: [FLINK-30004] Cleanup deployment after savepoint for Flink versions < 1.15

2022-11-15 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new 7db0eb1e [FLINK-30004] Cleanup deployment after savepoint for Flink 
versions < 1.15
7db0eb1e is described below

commit 7db0eb1e02a1430128f436bdecea9748ce81fe14
Author: Thomas Weise 
AuthorDate: Sun Nov 13 15:56:41 2022 -0500

[FLINK-30004] Cleanup deployment after savepoint for Flink versions < 1.15
---
 .../operator/service/AbstractFlinkService.java|  1 +
 .../operator/service/NativeFlinkService.java  |  7 ++-
 .../operator/service/NativeFlinkServiceTest.java  | 19 +--
 3 files changed, 24 insertions(+), 3 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 38b32d04..2208cf80 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -326,6 +326,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
 exception);
 }
 if (deleteClusterAfterSavepoint) {
+LOG.info("Cleaning up deployment after 
stop-with-savepoint");
 deleteClusterDeployment(deployment.getMetadata(), 
deploymentStatus, true);
 }
 break;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index f40322ff..1402c8d3 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.client.deployment.application.ApplicationConfiguration;
 import 
org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
@@ -80,7 +81,11 @@ public class NativeFlinkService extends AbstractFlinkService 
{
 public void cancelJob(
 FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration 
configuration)
 throws Exception {
-cancelJob(deployment, upgradeMode, configuration, false);
+// prior to Flink 1.15, ensure removal of orphaned config maps
+// https://issues.apache.org/jira/browse/FLINK-30004
+boolean deleteClusterAfterSavepoint =
+
!deployment.getSpec().getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_14);
+cancelJob(deployment, upgradeMode, configuration, 
deleteClusterAfterSavepoint);
 }
 
 @Override
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index c79663ef..fb204c1a 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -56,6 +56,8 @@ import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -114,8 +116,9 @@ public class NativeFlinkServiceTest {
 assertNull(jobStatus.getSavepointInfo().getLastSavepoint());
 }
 
-@Test
-public void testCancelJobWithSavepointUpgradeMode() throws Exception {
+@ParameterizedTest
+@EnumSource(FlinkVersion.class)
+public void testCancelJobWithSavepointUpgradeMode(FlinkVersion 
flinkVersion) throws Exception {
 final TestingClusterClient testingClusterClient =
 new TestingClusterClient

[flink] branch master updated: [FLINK-27479] add logic to manage the availability future with respect to the underlying reader future

2022-10-09 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 9dad2029461 [FLINK-27479] add logic to manage the availability future 
with respect to the underlying reader future
9dad2029461 is described below

commit 9dad2029461e6f48a29f25c50d642239bc2e5721
Author: Mason Chen 
AuthorDate: Wed Aug 31 18:04:02 2022 -0700

[FLINK-27479] add logic to manage the availability future with respect to 
the underlying reader future

formatting
---
 .../base/source/hybrid/HybridSourceReader.java |  27 +---
 .../base/source/hybrid/HybridSourceReaderTest.java | 166 +
 2 files changed, 173 insertions(+), 20 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
index 855f85ddd3f..2f113078fe3 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
@@ -96,12 +96,8 @@ public class HybridSourceReader implements 
SourceReader
 new SourceReaderFinishedEvent(currentSourceIndex));
 if (!isFinalSource) {
 // More splits may arrive for a subsequent reader.
-// InputStatus.NOTHING_AVAILABLE suspends poll, requires 
completion of the
-// availability future after receiving more splits to resume.
-if (availabilityFuture.isDone()) {
-// reset to avoid continued polling
-availabilityFuture = new CompletableFuture();
-}
+// InputStatus.NOTHING_AVAILABLE suspends poll, complete the
+// availability future in the switchover to the next reader
 return InputStatus.NOTHING_AVAILABLE;
 }
 }
@@ -133,6 +129,10 @@ public class HybridSourceReader implements 
SourceReader
 
 @Override
 public CompletableFuture isAvailable() {
+if (availabilityFuture.isDone()) {
+availabilityFuture = currentReader.isAvailable();
+}
+
 return availabilityFuture;
 }
 
@@ -185,10 +185,6 @@ public class HybridSourceReader implements 
SourceReader
 switchedSources.put(sse.sourceIndex(), sse.source());
 setCurrentReader(sse.sourceIndex());
 isFinalSource = sse.isFinalSource();
-if (!availabilityFuture.isDone()) {
-// continue polling
-availabilityFuture.complete(null);
-}
 } else {
 currentReader.handleSourceEvents(sourceEvent);
 }
@@ -231,16 +227,7 @@ public class HybridSourceReader implements 
SourceReader
 reader.start();
 currentSourceIndex = index;
 currentReader = reader;
-currentReader
-.isAvailable()
-.whenComplete(
-(result, ex) -> {
-if (ex == null) {
-availabilityFuture.complete(result);
-} else {
-availabilityFuture.completeExceptionally(ex);
-}
-});
+availabilityFuture.complete(null);
 LOG.debug(
 "Reader started: subtask={} sourceIndex={} {}",
 readerContext.getIndexOfSubtask(),
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
index b9c2f566115..02231378a36 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
@@ -24,7 +24,14 @@ import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.mocks.MockSource;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
 import org.apache.flink.connector.base.source.reader.mocks.MockBaseSo

[flink-kubernetes-operator] branch main updated: [FLINK-29109] Set jobId when upgrade mode is stateless (Flink < 1.16)

2022-09-26 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new 6f0914bb [FLINK-29109] Set jobId when upgrade mode is stateless (Flink 
< 1.16)
6f0914bb is described below

commit 6f0914bbd296c9daf2664afe0a77d1df4f2e157e
Author: Thomas Weise 
AuthorDate: Sun Sep 25 13:24:19 2022 -0400

[FLINK-29109] Set jobId when upgrade mode is stateless (Flink < 1.16)
---
 .../deployment/ApplicationReconciler.java  | 33 ++
 .../kubernetes/operator/TestingFlinkService.java   |  4 +++
 .../deployment/ApplicationReconcilerTest.java  | 29 ---
 3 files changed, 62 insertions(+), 4 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index a4384edd..c0f53519 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -17,13 +17,16 @@
 
 package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.PipelineOptionsInternal;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
@@ -172,6 +175,9 @@ public class ApplicationReconciler
 
flinkService.deleteClusterDeployment(relatedResource.getMetadata(), status, 
true);
 flinkService.waitForClusterShutdown(deployConfig);
 }
+
+setJobIdIfNecessary(spec, status, deployConfig);
+
 eventRecorder.triggerEvent(
 relatedResource,
 EventRecorder.Type.Normal,
@@ -186,6 +192,33 @@ public class ApplicationReconciler
 relatedResource.getMetadata(), spec, deployConfig, 
kubernetesClient);
 }
 
+private void setJobIdIfNecessary(
+FlinkDeploymentSpec spec, FlinkDeploymentStatus status, 
Configuration deployConfig) {
+// https://issues.apache.org/jira/browse/FLINK-19358
+// https://issues.apache.org/jira/browse/FLINK-29109
+if (spec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_15)) {
+return;
+}
+
+if (deployConfig.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID) != 
null) {
+// user managed, don't touch
+return;
+}
+
+// generate jobId initially or rotate on every deployment when mode is 
stateless
+if (status.getJobStatus().getJobId() == null
+|| spec.getJob().getUpgradeMode() == UpgradeMode.STATELESS) {
+String jobId = JobID.generate().toHexString();
+// record before first deployment to ensure we use it on any retry
+status.getJobStatus().setJobId(jobId);
+LOG.info("Assigning JobId override to {}", jobId);
+}
+
+String jobId = status.getJobStatus().getJobId();
+LOG.debug("Setting {} to {}", 
PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId);
+deployConfig.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId);
+}
+
 @Override
 protected void cancelJob(
 FlinkDeployment deployment,
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index f97e5fa7..27b453d0 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -24,6 +24,7 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.

[flink-kubernetes-operator] branch main updated: [FLINK-29251] Send CREATED status and Cancel event via FlinkResourceListener

2022-09-14 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new 84b08d00 [FLINK-29251] Send CREATED status and Cancel event via 
FlinkResourceListener
84b08d00 is described below

commit 84b08d00f67552dbd4bd97d49ab6d39f1cd87f7d
Author: Matyas Orhidi 
AuthorDate: Mon Sep 12 11:03:50 2022 +0200

[FLINK-29251] Send CREATED status and Cancel event via FlinkResourceListener
---
 .../controller/FlinkDeploymentController.java  |  9 +-
 .../controller/FlinkSessionJobController.java  |  9 +-
 .../operator/observer/JobStatusObserver.java   |  2 +-
 .../reconciler/deployment/SessionReconciler.java   |  2 +-
 .../kubernetes/operator/utils/EventRecorder.java   |  3 +-
 .../kubernetes/operator/utils/StatusRecorder.java  |  4 +++
 .../controller/FlinkDeploymentControllerTest.java  | 16 +--
 .../listener/FlinkResourceListenerTest.java| 32 --
 8 files changed, 50 insertions(+), 27 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index a0b0a161..0026ac54 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -84,7 +84,14 @@ public class FlinkDeploymentController
 
 @Override
 public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
-LOG.info("Deleting FlinkDeployment");
+String msg = "Cleaning up " + FlinkDeployment.class.getSimpleName();
+LOG.info(msg);
+eventRecorder.triggerEvent(
+flinkApp,
+EventRecorder.Type.Normal,
+EventRecorder.Reason.Cleanup,
+EventRecorder.Component.Operator,
+msg);
 statusRecorder.updateStatusFromCache(flinkApp);
 try {
 observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
index d037571e..61dc9512 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -107,7 +107,14 @@ public class FlinkSessionJobController
 
 @Override
 public DeleteControl cleanup(FlinkSessionJob sessionJob, Context context) {
-LOG.info("Deleting FlinkSessionJob");
+String msg = "Cleaning up " + FlinkSessionJob.class.getSimpleName();
+LOG.info(msg);
+eventRecorder.triggerEvent(
+sessionJob,
+EventRecorder.Type.Normal,
+EventRecorder.Reason.Cleanup,
+EventRecorder.Component.Operator,
+msg);
 statusRecorder.removeCachedStatus(sessionJob);
 return reconciler.cleanup(sessionJob, context);
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index 2c6b4bc6..51cf72ad 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -147,7 +147,7 @@ public abstract class JobStatusObserver {
 eventRecorder.triggerEvent(
 resource,
 EventRecorder.Type.Normal,
-EventRecorder.Reason.StatusChanged,
+EventRecorder.Reason.JobStatusChanged,
 EventRecorder.Component.Job,
 message);
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 1131a5e9..43a62e38 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/k

[flink-kubernetes-operator] branch main updated: [FLINK-29100] Relax upgrade checks to allow stateless restart when no stable spec is present

2022-08-31 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new 6d7c8cee [FLINK-29100] Relax upgrade checks to allow stateless restart 
when no stable spec is present
6d7c8cee is described below

commit 6d7c8ceebb9ee0b83eb7036c57003177b7f2fd82
Author: Thomas Weise 
AuthorDate: Tue Aug 30 21:17:33 2022 -0400

[FLINK-29100] Relax upgrade checks to allow stateless restart when no 
stable spec is present
---
 .../deployment/ApplicationReconciler.java  | 41 +++---
 .../deployment/ApplicationReconcilerTest.java  | 38 
 2 files changed, 75 insertions(+), 4 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 42aabaad..43520df3 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -98,12 +99,19 @@ public class ApplicationReconciler
 
.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED)
 && FlinkUtils.isKubernetesHAActivated(deployConfig)
 && FlinkUtils.isKubernetesHAActivated(observeConfig)
-&& flinkService.isHaMetadataAvailable(deployConfig)
 && !flinkVersionChanged(
 ReconciliationUtils.getDeployedSpec(deployment), 
deployment.getSpec())) {
-LOG.info(
-"Job is not running but HA metadata is available for last 
state restore, ready for upgrade");
-return Optional.of(UpgradeMode.LAST_STATE);
+
+if (!flinkService.isHaMetadataAvailable(deployConfig)) {
+if 
(deployment.getStatus().getReconciliationStatus().getLastStableSpec() == null) {
+// initial deployment failure, reset to allow for spec 
change to proceed
+return resetOnMissingStableSpec(deployment, deployConfig);
+}
+} else {
+LOG.info(
+"Job is not running but HA metadata is available for 
last state restore, ready for upgrade");
+return Optional.of(UpgradeMode.LAST_STATE);
+}
 }
 
 if (status.getJobManagerDeploymentStatus() == 
JobManagerDeploymentStatus.MISSING
@@ -120,6 +128,31 @@ public class ApplicationReconciler
 return Optional.empty();
 }
 
+private Optional resetOnMissingStableSpec(
+FlinkDeployment deployment, Configuration deployConfig) {
+// initial deployment failure, reset to allow for spec change to 
proceed
+flinkService.deleteClusterDeployment(
+deployment.getMetadata(), deployment.getStatus(), false);
+flinkService.waitForClusterShutdown(deployConfig);
+if (!flinkService.isHaMetadataAvailable(deployConfig)) {
+LOG.info(
+"Job never entered stable state. Clearing previous spec to 
reset for initial deploy");
+// TODO: lastSpecWithMeta.f1.isFirstDeployment() is false
+// 
ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(deployment);
+
deployment.getStatus().getReconciliationStatus().setLastReconciledSpec(null);
+// UPGRADING triggers immediate reconciliation
+deployment
+.getStatus()
+.getReconciliationStatus()
+.setState(ReconciliationState.UPGRADING);
+return Optional.empty();
+} else {
+// proceed with upgrade if deployment succeeded between check and 
delete
+LOG.info("Found HA state after deployment deletion, falling back 
to stateful upgrade");
+return Optional.of(UpgradeMode.LAST_STATE);
+}
+}
+
 @Override
 p

[flink] branch master updated: [FLINK-28977] NullPointerException in HybridSourceSplitEnumerator.close (#20587)

2022-08-23 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new ee4d27411b3 [FLINK-28977] NullPointerException in 
HybridSourceSplitEnumerator.close (#20587)
ee4d27411b3 is described below

commit ee4d27411b3bf1cfcc4171aab777eb8bf08f1550
Author: Michael <89284672+mbene...@users.noreply.github.com>
AuthorDate: Tue Aug 23 05:28:13 2022 -0700

[FLINK-28977] NullPointerException in HybridSourceSplitEnumerator.close 
(#20587)
---
 .../connector/base/source/hybrid/HybridSourceSplitEnumerator.java| 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
index 61baabeb941..c09653ec71b 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
@@ -246,7 +246,10 @@ public class HybridSourceSplitEnumerator
 
 @Override
 public void close() throws IOException {
-currentEnumerator.close();
+// close may be called before currentEnumerator was initialized
+if (currentEnumerator != null) {
+currentEnumerator.close();
+}
 }
 
 private void switchEnumerator() {



[flink] branch release-1.15 updated: [FLINK-28817][connector/common] NullPointerException in HybridSource when restoring from checkpoint

2022-08-13 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
 new e5570e3e33a [FLINK-28817][connector/common] NullPointerException in 
HybridSource when restoring from checkpoint
e5570e3e33a is described below

commit e5570e3e33ac33fd1b31d38c86ac6a291e7bc47e
Author: Qishang Zhong 
AuthorDate: Sat Aug 13 11:21:58 2022 +0800

[FLINK-28817][connector/common] NullPointerException in HybridSource when 
restoring from checkpoint
---
 .../source/hybrid/HybridSourceSplitEnumerator.java|  6 +-
 .../connector/base/source/hybrid/SwitchedSources.java |  8 +++-
 .../hybrid/HybridSourceSplitEnumeratorTest.java   | 19 +++
 3 files changed, 31 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
index d27de221af8..61baabeb941 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
@@ -218,7 +218,11 @@ public class HybridSourceSplitEnumerator
 }
 
 if (subtaskSourceIndex < currentSourceIndex) {
-subtaskSourceIndex++;
+// find initial or next index for the reader
+subtaskSourceIndex =
+subtaskSourceIndex == -1
+? switchedSources.getFirstSourceIndex()
+: ++subtaskSourceIndex;
 sendSwitchSourceEvent(subtaskId, subtaskSourceIndex);
 return;
 }
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchedSources.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchedSources.java
index 7911612d258..68128d1faed 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchedSources.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchedSources.java
@@ -25,10 +25,12 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
 
 /** Sources that participated in switching with cached serializers. */
 class SwitchedSources {
-private final Map sources = new HashMap<>();
+private final SortedMap sources = new TreeMap<>();
 private final Map> 
cachedSerializers =
 new HashMap<>();
 
@@ -45,4 +47,8 @@ class SwitchedSources {
 public void put(int sourceIndex, Source source) {
 sources.put(sourceIndex, Preconditions.checkNotNull(source));
 }
+
+public int getFirstSourceIndex() {
+return sources.firstKey();
+}
 }
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
index 7bcf69c5e72..8b5cb096586 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
@@ -182,6 +182,25 @@ public class HybridSourceSplitEnumeratorTest {
 Matchers.iterableWithSize(1));
 }
 
+@Test
+public void testRestoreEnumeratorAfterFirstSourceWithoutRestoredSplits() 
throws Exception {
+setupEnumeratorAndTriggerSourceSwitch();
+HybridSourceEnumeratorState enumeratorState = 
enumerator.snapshotState(0);
+MockSplitEnumerator underlyingEnumerator = 
getCurrentEnumerator(enumerator);
+Assert.assertThat(
+(List) 
Whitebox.getInternalState(underlyingEnumerator, "splits"),
+Matchers.iterableWithSize(0));
+enumerator =
+(HybridSourceSplitEnumerator) 
source.restoreEnumerator(context, enumeratorState);
+enumerator.start();
+// subtask starts at -1 since it has no splits after restore
+enumerator.handleSourceEvent(SUBTASK0, new 
SourceReaderFinishedEvent(-1));
+underlyingEnumerator = getCurrentEnumerator(enumerator);
+Assert.assertThat(
+   

[flink] branch master updated (1455e51c7cd -> 6e80d90b161)

2022-08-12 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 1455e51c7cd [FLINK-26771][hive] Fix incomparable exception between 
boolean type and numeric type in Hive dialect
 add 6e80d90b161 [FLINK-28817] NullPointerException in HybridSource when 
restoring from checkpoint (#20530)

No new revisions were added by this update.

Summary of changes:
 .../source/hybrid/HybridSourceSplitEnumerator.java  |  6 +-
 .../base/source/hybrid/SwitchedSources.java |  8 +++-
 .../hybrid/HybridSourceSplitEnumeratorTest.java | 21 +
 3 files changed, 33 insertions(+), 2 deletions(-)



[flink] branch release-1.15 updated: [FLINK-27255] [flink-avro] flink-avro does not support ser/de of large avro schema (#19645)

2022-05-13 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
 new 20768b2f53f [FLINK-27255] [flink-avro] flink-avro does not support 
ser/de of large avro schema (#19645)
20768b2f53f is described below

commit 20768b2f53fb8c126d984bc85be2d34fdaa487c1
Author: Haizhou Zhao 
AuthorDate: Tue May 10 16:18:38 2022 -0700

[FLINK-27255] [flink-avro] flink-avro does not support ser/de of large avro 
schema (#19645)

Co-authored-by: Haizhou Zhao 
---
 .../avro/typeutils/GenericRecordAvroTypeInfo.java  | 10 -
 .../avro/typeutils/SerializableAvroSchema.java | 11 +++--
 .../typeutils/AvroGenericRecordTypeInfoTest.java   | 50 ++
 .../typeutils/AvroSerializerGenericRecordTest.java |  7 +--
 ...a => AvroSerializerLargeGenericRecordTest.java} | 11 ++---
 .../flink/formats/avro/utils/AvroTestUtils.java| 30 +
 6 files changed, 102 insertions(+), 17 deletions(-)

diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java
index 387014077d2..28332db5a46 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java
@@ -28,6 +28,7 @@ import org.apache.avro.generic.GenericRecord;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -104,10 +105,15 @@ public class GenericRecordAvroTypeInfo extends 
TypeInformation {
 }
 
 private void writeObject(ObjectOutputStream oos) throws IOException {
-oos.writeUTF(schema.toString());
+byte[] schemaStrInBytes = 
schema.toString(false).getBytes(StandardCharsets.UTF_8);
+oos.writeInt(schemaStrInBytes.length);
+oos.write(schemaStrInBytes);
 }
 
 private void readObject(ObjectInputStream ois) throws 
ClassNotFoundException, IOException {
-this.schema = new Schema.Parser().parse(ois.readUTF());
+int len = ois.readInt();
+byte[] content = new byte[len];
+ois.readFully(content);
+this.schema = new Schema.Parser().parse(new String(content, 
StandardCharsets.UTF_8));
 }
 }
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java
index 32714951a61..12458752428 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
 
 /** A wrapper for Avro {@link Schema}, that is Java serializable. */
 @Internal
@@ -52,14 +53,18 @@ final class SerializableAvroSchema implements Serializable {
 oos.writeBoolean(false);
 } else {
 oos.writeBoolean(true);
-oos.writeUTF(schema.toString(false));
+byte[] schemaStrInBytes = 
schema.toString(false).getBytes(StandardCharsets.UTF_8);
+oos.writeInt(schemaStrInBytes.length);
+oos.write(schemaStrInBytes);
 }
 }
 
 private void readObject(ObjectInputStream ois) throws 
ClassNotFoundException, IOException {
 if (ois.readBoolean()) {
-String schema = ois.readUTF();
-this.schema = new Parser().parse(schema);
+int len = ois.readInt();
+byte[] content = new byte[len];
+ois.readFully(content);
+this.schema = new Parser().parse(new String(content, 
StandardCharsets.UTF_8));
 } else {
 this.schema = null;
 }
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericRecordTypeInfoTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericRecordTypeInfoTest.java
new file mode 100644
index 000..dd2b6c98907
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericRecordTypeInfoTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE f

[flink] branch release-1.15 updated: [FLINK-27465] Handle conversion of negative long to timestamp in AvroRowDeserializationSchema

2022-05-13 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
 new 50d280daaa8 [FLINK-27465] Handle conversion of negative long to 
timestamp in AvroRowDeserializationSchema
50d280daaa8 is described below

commit 50d280daaa8a814e2a102dce622e84640875150a
Author: Thomas Weise 
AuthorDate: Mon May 2 21:50:18 2022 -0700

[FLINK-27465] Handle conversion of negative long to timestamp in 
AvroRowDeserializationSchema
---
 .../org/apache/flink/formats/avro/AvroRowDeserializationSchema.java  | 5 +
 1 file changed, 5 insertions(+)

diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
index cbbd956d067..8f72d02499d 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
@@ -349,6 +349,11 @@ public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema<
 long seconds = micros / MICROS_PER_SECOND - offsetMillis / 
1000;
 int nanos =
 ((int) (micros % MICROS_PER_SECOND)) * 1000 - 
offsetMillis % 1000 * 1000;
+if (nanos < 0) {
+// can't set negative nanos on timestamp
+seconds--;
+nanos = (int) (MICROS_PER_SECOND * 1000 + nanos);
+}
 Timestamp timestamp = new Timestamp(seconds * 1000L);
 timestamp.setNanos(nanos);
 return timestamp;



[flink] branch release-1.14 updated: [FLINK-27255] [flink-avro] flink-avro does not support ser/de of large avro schema (#19705)

2022-05-11 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
 new d3be0d0154a [FLINK-27255] [flink-avro] flink-avro does not support 
ser/de of large avro schema (#19705)
d3be0d0154a is described below

commit d3be0d0154aad2a870448c49ce764a7f49d8bb7e
Author: Haizhou Zhao 
AuthorDate: Wed May 11 17:41:32 2022 -0700

[FLINK-27255] [flink-avro] flink-avro does not support ser/de of large avro 
schema (#19705)

Co-authored-by: Haizhou Zhao 
---
 .../avro/typeutils/GenericRecordAvroTypeInfo.java  | 10 -
 .../avro/typeutils/SerializableAvroSchema.java | 11 +++--
 .../typeutils/AvroGenericRecordTypeInfoTest.java   | 50 ++
 .../typeutils/AvroSerializerGenericRecordTest.java |  7 +--
 ...a => AvroSerializerLargeGenericRecordTest.java} | 11 ++---
 .../flink/formats/avro/utils/AvroTestUtils.java| 30 +
 6 files changed, 102 insertions(+), 17 deletions(-)

diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java
index 387014077d2..28332db5a46 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java
@@ -28,6 +28,7 @@ import org.apache.avro.generic.GenericRecord;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -104,10 +105,15 @@ public class GenericRecordAvroTypeInfo extends 
TypeInformation {
 }
 
 private void writeObject(ObjectOutputStream oos) throws IOException {
-oos.writeUTF(schema.toString());
+byte[] schemaStrInBytes = 
schema.toString(false).getBytes(StandardCharsets.UTF_8);
+oos.writeInt(schemaStrInBytes.length);
+oos.write(schemaStrInBytes);
 }
 
 private void readObject(ObjectInputStream ois) throws 
ClassNotFoundException, IOException {
-this.schema = new Schema.Parser().parse(ois.readUTF());
+int len = ois.readInt();
+byte[] content = new byte[len];
+ois.readFully(content);
+this.schema = new Schema.Parser().parse(new String(content, 
StandardCharsets.UTF_8));
 }
 }
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java
index 32714951a61..12458752428 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
 
 /** A wrapper for Avro {@link Schema}, that is Java serializable. */
 @Internal
@@ -52,14 +53,18 @@ final class SerializableAvroSchema implements Serializable {
 oos.writeBoolean(false);
 } else {
 oos.writeBoolean(true);
-oos.writeUTF(schema.toString(false));
+byte[] schemaStrInBytes = 
schema.toString(false).getBytes(StandardCharsets.UTF_8);
+oos.writeInt(schemaStrInBytes.length);
+oos.write(schemaStrInBytes);
 }
 }
 
 private void readObject(ObjectInputStream ois) throws 
ClassNotFoundException, IOException {
 if (ois.readBoolean()) {
-String schema = ois.readUTF();
-this.schema = new Parser().parse(schema);
+int len = ois.readInt();
+byte[] content = new byte[len];
+ois.readFully(content);
+this.schema = new Parser().parse(new String(content, 
StandardCharsets.UTF_8));
 } else {
 this.schema = null;
 }
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericRecordTypeInfoTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericRecordTypeInfoTest.java
new file mode 100644
index 000..b91053f60c8
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericRecordTypeInfoTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE f

[flink] branch master updated (1dcad2272e9 -> 610164d57dd)

2022-05-10 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 1dcad2272e9 [hotfix][docs-zh] Fix two inaccessible links. (#19678)
 add 610164d57dd [FLINK-27255] [flink-avro] flink-avro does not support 
ser/de of large avro schema (#19645)

No new revisions were added by this update.

Summary of changes:
 .../avro/typeutils/GenericRecordAvroTypeInfo.java  | 10 ++--
 .../avro/typeutils/SerializableAvroSchema.java | 11 +---
 ...est.java => AvroGenericRecordTypeInfoTest.java} | 21 ---
 .../typeutils/AvroSerializerGenericRecordTest.java |  7 ++---
 ...a => AvroSerializerLargeGenericRecordTest.java} | 11 +++-
 .../flink/formats/avro/utils/AvroTestUtils.java| 30 ++
 6 files changed, 64 insertions(+), 26 deletions(-)
 copy 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/{AvroTypeInfoTest.java
 => AvroGenericRecordTypeInfoTest.java} (63%)
 copy 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/{AvroSerializerGenericRecordTest.java
 => AvroSerializerLargeGenericRecordTest.java} (76%)



[flink] branch master updated (a112465efe0 -> 4c8f323d9ec)

2022-05-04 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from a112465efe0 [FLINK-27442][Formats][Avro Confluent] Add Confluent repo 
to module flink-sql-avro-confluent-registry
 add 4c8f323d9ec [FLINK-27465] Handle conversion of negative long to 
timestamp in AvroRowDeserializationSchema

No new revisions were added by this update.

Summary of changes:
 .../org/apache/flink/formats/avro/AvroRowDeserializationSchema.java  | 5 +
 1 file changed, 5 insertions(+)



[flink] branch master updated (2e632af6a75 -> c65899f7790)

2022-04-25 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 2e632af6a75 [FLINK-27369][table-planner] Fix the type mismatch error 
in RemoveUnreachableCoalesceArgumentsRule
 add c65899f7790 [FLINK-27381][connector/common] Use Arrays.hashcode for 
HybridSourceSplit wrappedSplitBytes

No new revisions were added by this update.

Summary of changes:
 .../apache/flink/connector/base/source/hybrid/HybridSourceSplit.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



[flink-web] branch asf-site updated: [FLINK-26781] Link Flink Kubernetes operator doc site to Flink Website

2022-03-22 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 9b9c20f  [FLINK-26781] Link Flink Kubernetes operator doc site to 
Flink Website
9b9c20f is described below

commit 9b9c20fadfb8a5508be3889e1b1ce54210768bc0
Author: czy006 <944989...@qq.com>
AuthorDate: Tue Mar 22 21:31:30 2022 +0800

[FLINK-26781] Link Flink Kubernetes operator doc site to Flink Website
---
 _config.yml   | 11 +++
 _includes/navbar.html |  3 +++
 2 files changed, 14 insertions(+)

diff --git a/_config.yml b/_config.yml
index 7ba6b8b..ca0c310 100644
--- a/_config.yml
+++ b/_config.yml
@@ -29,6 +29,13 @@ FLINK_ML_STABLE_SHORT: "2.0"
 FLINK_ML_GITHUB_URL: https://github.com/apache/flink-ml
 FLINK_ML_GITHUB_REPO_NAME: flink-ml
 
+#TODO If kubernetes operator stable version complete,replace it to stable 
version
+FLINK_KUBERNETES_OPERATOR_VERSION_STABLE: 1.0
+FLINK_KUBERNETES_OPERATOR_STABLE_SHORT: "1.0"
+
+FLINK_KUBERNETES_OPERATOR_URL: 
https://github.com/apache/flink-kubernetes-operator
+FLINK_KUBERNETES_OPERATOR_GITHUB_REPO_NAME: flink-kubernetes-operator
+
 # Example for scala dependent component:
 #  -
 #name: "Prometheus MetricReporter"
@@ -616,6 +623,10 @@ docs-statefun-snapshot: 
"https://nightlies.apache.org/flink/flink-statefun-docs-
 docs-ml-stable: "https://nightlies.apache.org/flink/flink-ml-docs-release-2.0;
 docs-ml-snapshot: "https://nightlies.apache.org/flink/flink-ml-docs-master;
 
+#TODO If kubernetes operator stable version complete,replace it to stable url
+docs-kubernetes-operator-stable: 
"https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main;
+docs-kubernetes-operator-snapshot: 
"https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main;
+
 # Used by the gh_link plugin
 jira: "https://issues.apache.org/jira/browse/FLINK;
 stackoverflow: "https://stackoverflow.com/search?q=flink;
diff --git a/_includes/navbar.html b/_includes/navbar.html
index f00f19e..e991d69 100755
--- a/_includes/navbar.html
+++ b/_includes/navbar.html
@@ -76,6 +76,7 @@
 With Flink 
 With Flink Stateful 
Functions 
 With Flink ML 
+With Flink 
Kubernetes Operator 
 {{ 
site.data.i18n[page.language].training_course }}
   
 
@@ -90,6 +91,8 @@
 Flink Stateful Functions Master (Latest Snapshot) 
 Flink 
ML {{site.FLINK_ML_STABLE_SHORT}} (Latest stable release) 
 Flink ML Master (Latest Snapshot) 
+Flink Kubernetes Operator 
{{site.FLINK_KUBERNETES_OPERATOR_STABLE_SHORT}} (Latest stable release) 

+Flink Kubernetes Operator Master (Latest Snapshot) 
   
 
 


[flink-kubernetes-operator] branch main updated: [FLINK-26473] Check for deployment errors when listJobs fails

2022-03-16 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new 4b7ffaa  [FLINK-26473] Check for deployment errors when listJobs fails
4b7ffaa is described below

commit 4b7ffaa47082a5baaa4150d6315a9307087e64c6
Author: Thomas Weise 
AuthorDate: Sun Mar 13 21:41:11 2022 -0700

[FLINK-26473] Check for deployment errors when listJobs fails
---
 .../exception/DeploymentFailedException.java   | 31 +++--
 .../kubernetes/operator/observer/BaseObserver.java | 37 --
 .../kubernetes/operator/observer/JobObserver.java  | 15 +++-
 .../kubernetes/operator/service/FlinkService.java  | 12 
 .../kubernetes/operator/utils/FlinkUtils.java  | 16 +++--
 .../flink/kubernetes/operator/TestUtils.java   | 37 +++---
 .../kubernetes/operator/TestingFlinkService.java   | 12 
 .../controller/FlinkDeploymentControllerTest.java  | 79 ++
 8 files changed, 210 insertions(+), 29 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
index b0ea7f3..572f065 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.exception;
 
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 
+import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
 import io.fabric8.kubernetes.api.model.Event;
 import io.fabric8.kubernetes.api.model.EventBuilder;
 import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
@@ -26,15 +27,33 @@ import 
io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
 /** Exception to signal terminal deployment failure. */
 public class DeploymentFailedException extends RuntimeException {
 public static final String COMPONENT_JOBMANAGER = "JobManagerDeployment";
+public static final String REASON_CRASH_LOOP_BACKOFF = "CrashLoopBackOff";
+
 private static final long serialVersionUID = -1070179896083579221L;
 
 public final String component;
-public final DeploymentCondition deployCondition;
+public final String type;
+public final String reason;
+public final String lastTransitionTime;
+public final String lastUpdateTime;
 
 public DeploymentFailedException(String component, DeploymentCondition 
deployCondition) {
 super(deployCondition.getMessage());
 this.component = component;
-this.deployCondition = deployCondition;
+this.type = deployCondition.getType();
+this.reason = deployCondition.getReason();
+this.lastTransitionTime = deployCondition.getLastTransitionTime();
+this.lastUpdateTime = deployCondition.getLastUpdateTime();
+}
+
+public DeploymentFailedException(
+String component, String type, ContainerStateWaiting stateWaiting) 
{
+super(stateWaiting.getMessage());
+this.component = component;
+this.type = type;
+this.reason = stateWaiting.getReason();
+this.lastTransitionTime = null;
+this.lastUpdateTime = null;
 }
 
 public static Event asEvent(DeploymentFailedException dfe, FlinkDeployment 
flinkApp) {
@@ -47,10 +66,10 @@ public class DeploymentFailedException extends 
RuntimeException {
 .withNamespace(flinkApp.getMetadata().getNamespace())
 .withUid(flinkApp.getMetadata().getUid())
 .endInvolvedObject()
-.withType(dfe.deployCondition.getType())
-.withReason(dfe.deployCondition.getReason())
-
.withFirstTimestamp(dfe.deployCondition.getLastTransitionTime())
-
.withLastTimestamp(dfe.deployCondition.getLastUpdateTime())
+.withType(dfe.type)
+.withReason(dfe.reason)
+.withFirstTimestamp(dfe.lastTransitionTime)
+.withLastTimestamp(dfe.lastUpdateTime)
 .withMessage(dfe.getMessage())
 .withNewMetadata()
 .withGenerateName(flinkApp.getMetadata().getName())
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
index 180cb6f..a694f6d 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apa

[flink-kubernetes-operator] branch main updated: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

2022-03-02 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new 4bac805  [FLINK-26432] Cleanly separate validator, observer and 
reconciler modules
4bac805 is described below

commit 4bac8059ec5d3c68da19beec22f29c434562516e
Author: Gyula Fora 
AuthorDate: Mon Feb 28 06:17:11 2022 +0100

[FLINK-26432] Cleanly separate validator, observer and reconciler modules

closes #26
---
 .../flink/kubernetes/operator/FlinkOperator.java   |   3 +
 .../controller/FlinkDeploymentController.java  |  20 ++-
 .../operator/crd/status/FlinkDeploymentStatus.java |   6 +-
 .../JobManagerDeploymentStatus.java|   4 +-
 .../operator/observer/JobStatusObserver.java   | 115 ---
 .../kubernetes/operator/observer/Observer.java | 161 +
 .../operator/reconciler/BaseReconciler.java|  74 +-
 .../operator/reconciler/JobReconciler.java |  27 +---
 .../operator/reconciler/SessionReconciler.java |   7 -
 .../flink/kubernetes/operator/TestUtils.java   |  17 +++
 .../kubernetes/operator/TestingFlinkService.java   |   7 +-
 .../controller/FlinkDeploymentControllerTest.java  |  47 +++---
 .../operator/observer/JobStatusObserverTest.java   |  77 --
 .../kubernetes/operator/observer/ObserverTest.java | 135 +
 .../operator/reconciler/JobReconcilerTest.java |   4 +-
 .../crds/flinkdeployments.flink.apache.org-v1.yml  |   7 +
 16 files changed, 394 insertions(+), 317 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 8fa72b9..fd9d60c 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -21,6 +21,7 @@ import 
org.apache.flink.kubernetes.operator.config.DefaultConfig;
 import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig;
 import 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
 import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
+import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -55,6 +56,7 @@ public class FlinkOperator {
 
 FlinkService flinkService = new FlinkService(client);
 
+Observer observer = new Observer(flinkService);
 JobReconciler jobReconciler = new JobReconciler(client, flinkService);
 SessionReconciler sessionReconciler = new SessionReconciler(client, 
flinkService);
 
@@ -66,6 +68,7 @@ public class FlinkOperator {
 client,
 namespace,
 validator,
+observer,
 jobReconciler,
 sessionReconciler);
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 5dff8f4..b9d5cd4 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import 
org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.reconciler.BaseReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
@@ -64,6 +65,7 @@ public class FlinkDeploymentController
 private final String operatorNamespace;
 
 private final FlinkDeploymentValidator validator;
+private final Observer observer;
 private final JobReconciler jobReconciler;
 private final SessionReconciler sessionReconciler;
 private final DefaultConfig defaultConfig;
@@ -73,12 +75,14 @@ public class FlinkDeploymentController
 KubernetesClient kubernetesClient,
 String operatorNamespace

[flink-kubernetes-operator] 02/02: [FLINK-26261] Refactor to simplify reconciliation logic

2022-02-27 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 684b4597764daefe53fb1399298324bec5bc738e
Author: Thomas Weise 
AuthorDate: Sat Feb 26 14:55:43 2022 -0800

[FLINK-26261] Refactor to simplify reconciliation logic
---
 .../flink/kubernetes/operator/FlinkOperator.java   |   3 -
 .../controller/FlinkDeploymentController.java  |  95 ---
 .../operator/observer/JobStatusObserver.java   |   4 -
 .../operator/reconciler/BaseReconciler.java| 103 +
 .../operator/reconciler/JobReconciler.java |  36 ++-
 .../operator/reconciler/SessionReconciler.java |  25 -
 .../kubernetes/operator/service/FlinkService.java  |   2 +-
 .../kubernetes/operator/TestingFlinkService.java   |   2 +-
 .../controller/FlinkDeploymentControllerTest.java  |  38 ++--
 .../operator/reconciler/JobReconcilerTest.java |  45 +++--
 10 files changed, 218 insertions(+), 135 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 7e8edf5..8fa72b9 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -21,7 +21,6 @@ import 
org.apache.flink.kubernetes.operator.config.DefaultConfig;
 import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig;
 import 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
 import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
-import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -56,7 +55,6 @@ public class FlinkOperator {
 
 FlinkService flinkService = new FlinkService(client);
 
-JobStatusObserver observer = new JobStatusObserver(flinkService);
 JobReconciler jobReconciler = new JobReconciler(client, flinkService);
 SessionReconciler sessionReconciler = new SessionReconciler(client, 
flinkService);
 
@@ -68,7 +66,6 @@ public class FlinkOperator {
 client,
 namespace,
 validator,
-observer,
 jobReconciler,
 sessionReconciler);
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index a6c6072..2e79926 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -23,16 +23,15 @@ import 
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import 
org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
-import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
+import org.apache.flink.kubernetes.operator.reconciler.BaseReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import 
org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
+import org.apache.flink.kubernetes.utils.Constants;
 
 import io.fabric8.kubernetes.api.model.apps.Deployment;
-import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
-import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -50,10 +49,8 @@ import 
io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.TimeUnit;
 
 /** Controller that runs the main reconcile loop for Flink deployments. */
 @ControllerConfiguration(generationAwareEventProcessing = false)
@@ -63,33

[flink-kubernetes-operator] 01/02: [FLINK-26261] Wait for JobManager deployment ready before accessing REST API

2022-02-27 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit becf19c82616e8f3a148c2ce27141b742f019ab3
Author: Thomas Weise 
AuthorDate: Thu Feb 24 14:26:30 2022 -0800

[FLINK-26261] Wait for JobManager deployment ready before accessing REST API
---
 .../controller/FlinkDeploymentController.java  | 76 +++---
 .../operator/observer/JobStatusObserver.java   |  7 ++
 .../kubernetes/operator/service/FlinkService.java  | 25 +++
 .../kubernetes/operator/TestingFlinkService.java   |  5 ++
 .../controller/FlinkDeploymentControllerTest.java  | 58 +
 5 files changed, 151 insertions(+), 20 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index f97d5fd..a6c6072 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -30,7 +30,11 @@ import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import 
org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
 
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
 import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
 import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
@@ -41,10 +45,12 @@ import 
io.javaoperatorsdk.operator.api.reconciler.Reconciler;
 import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import 
io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
+import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
@@ -58,6 +64,7 @@ public class FlinkDeploymentController
 private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDeploymentController.class);
 
 public static final int REFRESH_SECONDS = 60;
+public static final int PORT_READY_DELAY_SECONDS = 10;
 
 private final KubernetesClient kubernetesClient;
 
@@ -68,6 +75,7 @@ public class FlinkDeploymentController
 private final JobReconciler jobReconciler;
 private final SessionReconciler sessionReconciler;
 private final DefaultConfig defaultConfig;
+private final HashSet jobManagerDeployments = new HashSet<>();
 
 public FlinkDeploymentController(
 DefaultConfig defaultConfig,
@@ -96,6 +104,7 @@ public class FlinkDeploymentController
 operatorNamespace,
 kubernetesClient,
 true);
+jobManagerDeployments.remove(flinkApp.getMetadata().getSelfLink());
 return DeleteControl.defaultDelete();
 }
 
@@ -113,8 +122,11 @@ public class FlinkDeploymentController
 Configuration effectiveConfig =
 FlinkUtils.getEffectiveConfig(flinkApp, 
defaultConfig.getFlinkConfig());
 try {
-boolean successfulObserve = 
observer.observeFlinkJobStatus(flinkApp, effectiveConfig);
-if (successfulObserve) {
+// only check job status when the JM deployment is ready
+boolean shouldReconcile =
+
!jobManagerDeployments.contains(flinkApp.getMetadata().getSelfLink())
+|| observer.observeFlinkJobStatus(flinkApp, 
effectiveConfig);
+if (shouldReconcile) {
 reconcileFlinkDeployment(operatorNamespace, flinkApp, 
effectiveConfig);
 updateForReconciliationSuccess(flinkApp);
 }
@@ -125,6 +137,49 @@ public class FlinkDeploymentController
 } catch (Exception e) {
 throw new ReconciliationException(e);
 }
+
+return checkJobManagerDeployment(flinkApp, context, effectiveConfig);
+}
+
+private UpdateControl checkJobManagerDeployment(
+FlinkDeployment flinkApp, Context context, Configuration 
effectiveConfig) {
+if 
(!jobManagerDeployments.contains(flinkApp.getMetadata().getSe

[flink-kubernetes-operator] branch main updated (e2ac688 -> 684b459)

2022-02-27 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git.


from e2ac688  [hotfix] copy base config in config builder
 new becf19c  [FLINK-26261] Wait for JobManager deployment ready before 
accessing REST API
 new 684b459  [FLINK-26261] Refactor to simplify reconciliation logic

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/kubernetes/operator/FlinkOperator.java   |   3 -
 .../controller/FlinkDeploymentController.java  |  57 ++--
 .../operator/observer/JobStatusObserver.java   |   3 +
 .../operator/reconciler/BaseReconciler.java| 103 +
 .../operator/reconciler/JobReconciler.java |  36 ++-
 .../operator/reconciler/SessionReconciler.java |  25 -
 .../kubernetes/operator/service/FlinkService.java  |  25 +
 .../kubernetes/operator/TestingFlinkService.java   |   5 +
 .../controller/FlinkDeploymentControllerTest.java  |  42 +
 .../operator/reconciler/JobReconcilerTest.java |  45 +++--
 10 files changed, 279 insertions(+), 65 deletions(-)
 create mode 100644 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java


[flink-kubernetes-operator] branch main updated: [hotfix] Fix observer sort logic for job status merge

2022-02-23 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new 3e6a00f  [hotfix] Fix observer sort logic for job status merge
3e6a00f is described below

commit 3e6a00f0eb528ad85b91a99424258925ea9304d5
Author: zeus1ammon <45544051+zeus1am...@users.noreply.github.com>
AuthorDate: Wed Feb 23 11:16:55 2022 -0500

[hotfix] Fix observer sort logic for job status merge
---
 .../apache/flink/kubernetes/operator/observer/JobStatusObserver.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index c0dabcb..0740dea 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -89,7 +89,7 @@ public class JobStatusObserver {
 JobStatus newStatus = oldStatus;
 Collections.sort(
 clusterJobStatuses,
-(j1, j2) -> -1 * Long.compare(j1.getStartTime(), 
j1.getStartTime()));
+(j1, j2) -> -1 * Long.compare(j1.getStartTime(), 
j2.getStartTime()));
 JobStatusMessage newJob = clusterJobStatuses.get(0);
 
 if (newStatus == null) {


[flink-kubernetes-operator] branch main updated: [FLINK-26260] Support watching specific namespaces only

2022-02-22 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new d8f19bc  [FLINK-26260] Support watching specific namespaces only
d8f19bc is described below

commit d8f19bcea6d493d4cb5a30448c9f2865e6fef56b
Author: Gyula Fora 
AuthorDate: Mon Feb 21 11:09:02 2022 +0100

[FLINK-26260] Support watching specific namespaces only
---
 README.md  |  6 +++
 .../flink/kubernetes/operator/FlinkOperator.java   | 18 
 .../operator/controller/FlinkControllerConfig.java | 51 ++
 helm/flink-operator/templates/flink-operator.yaml  |  2 +
 helm/flink-operator/templates/webhook.yaml | 10 -
 helm/flink-operator/values.yaml|  3 ++
 6 files changed, 81 insertions(+), 9 deletions(-)

diff --git a/README.md b/README.md
index 26dd8fc..aecad31 100644
--- a/README.md
+++ b/README.md
@@ -10,6 +10,8 @@ The operator is managed helm chart. To install run:
  helm install flink-operator .
 ```
 
+### Validating webhook
+
 In order to use the webhook for FlinkDeployment validation, you must install 
the cert-manager on the Kubernetes cluster:
 ```
 kubectl apply -f 
https://github.com/jetstack/cert-manager/releases/download/v1.7.1/cert-manager.yaml
@@ -17,6 +19,10 @@ kubectl apply -f 
https://github.com/jetstack/cert-manager/releases/download/v1.7
 
 The webhook can be disabled during helm install by passing the `--set 
webhook.create=false` parameter or editing the `values.yaml` directly.
 
+### Watching only specific namespaces
+
+The operator supports watching a specific list of namespaces for 
FlinkDeployment resources. You can enable it by setting the `--set 
watchNamespaces={flink-test}` parameter.
+
 ## User Guide
 ### Create a new Flink deployment
 The flink-operator will watch the CRD resources and submit a new Flink 
deployment once the CR is applied.
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 8cfdef1..9484bea 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator;
 
+import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig;
 import 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
 import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
 import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
@@ -27,7 +28,6 @@ import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
 import io.javaoperatorsdk.operator.Operator;
-import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
 import io.javaoperatorsdk.operator.config.runtime.DefaultConfigurationService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,7 +35,8 @@ import org.slf4j.LoggerFactory;
 /** Main Class for Flink native k8s operator. */
 public class FlinkOperator {
 private static final Logger LOG = 
LoggerFactory.getLogger(FlinkOperator.class);
-private static final String ENV_FLINK_OPERATOR_CONF_DIR = 
"FLINK_OPERATOR_CONF_DIR";
+
+public static final String ENV_FLINK_OPERATOR_CONF_DIR = 
"FLINK_OPERATOR_CONF_DIR";
 
 public static void main(String... args) {
 
@@ -48,11 +49,9 @@ public class FlinkOperator {
 if (namespace == null) {
 namespace = "default";
 }
-Operator operator =
-new Operator(
-client,
-new 
ConfigurationServiceOverrider(DefaultConfigurationService.instance())
-.build());
+
+DefaultConfigurationService configurationService = 
DefaultConfigurationService.instance();
+Operator operator = new Operator(client, configurationService);
 
 FlinkService flinkService = new FlinkService(client);
 
@@ -64,7 +63,10 @@ public class FlinkOperator {
 new FlinkDeploymentController(
 client, namespace, observer, jobReconciler, 
sessionReconciler);
 
-operator.register(controller);
+FlinkControllerConfig controllerConfig = new 
FlinkControllerConfig(controller);
+controllerConfig.setConfigurationService(configurationService);
+
+operator.register(controller, controllerConfig);
 operator.installShutdownHook();
 operator.start();
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/

[flink-kubernetes-operator] branch main updated (b5d71f1 -> 0737641)

2022-02-12 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git.


 discard b5d71f1  Seed commit to be removed
 new 0737641  Seed commit

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (b5d71f1)
\
 N -- N -- N   refs/heads/main (0737641)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[flink-kubernetes-operator] 01/01: Seed commit

2022-02-12 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 07376419b83770c6f946d419263b376c7797c673
Author: Thomas Weise 
AuthorDate: Thu Feb 10 19:12:35 2022 -0800

Seed commit
---
 README.md | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/README.md b/README.md
new file mode 100644
index 000..4c9483b
--- /dev/null
+++ b/README.md
@@ -0,0 +1,2 @@
+# flink-kubernetes-operator
+Apache Flink Kubernetes Operator


[flink-kubernetes-operator] branch main created (now b5d71f1)

2022-02-10 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git.


  at b5d71f1  Seed commit to be removed

This branch includes the following new commits:

 new b5d71f1  Seed commit to be removed

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[flink-kubernetes-operator] 01/01: Seed commit to be removed

2022-02-10 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit b5d71f10c88bebc82519656f2be63631535737d8
Author: Thomas Weise 
AuthorDate: Thu Feb 10 19:12:35 2022 -0800

Seed commit to be removed
---
 README.md | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/README.md b/README.md
new file mode 100644
index 000..4c9483b
--- /dev/null
+++ b/README.md
@@ -0,0 +1,2 @@
+# flink-kubernetes-operator
+Apache Flink Kubernetes Operator


svn commit: r52332 - /release/flink/flink-1.14.2/

2022-01-30 Thread thw
Author: thw
Date: Mon Jan 31 04:50:36 2022
New Revision: 52332

Log:
remove flink-1.14.2

Removed:
release/flink/flink-1.14.2/



[flink-web] branch asf-site updated: Rebuild website

2022-01-19 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new e1657ab  Rebuild website
e1657ab is described below

commit e1657ab3ded5fcc17d0d4b69974839c95e15d6e8
Author: Thomas Weise 
AuthorDate: Wed Jan 19 17:32:53 2022 -0800

Rebuild website
---
 content/blog/feed.xml   | 469 +++
 content/blog/index.html |  38 +-
 content/blog/page10/index.html  |  38 +-
 content/blog/page11/index.html  |  40 +-
 content/blog/page12/index.html  |  38 +-
 content/blog/page13/index.html  |  36 +-
 content/blog/page14/index.html  |  37 +-
 content/blog/page15/index.html  |  38 +-
 content/blog/page16/index.html  |  44 ++-
 content/blog/page17/index.html  |  45 ++-
 content/blog/page18/index.html  |  25 ++
 content/blog/page2/index.html   |  40 +-
 content/blog/page3/index.html   |  40 +-
 content/blog/page4/index.html   |  38 +-
 content/blog/page5/index.html   |  36 +-
 content/blog/page6/index.html   |  38 +-
 content/blog/page7/index.html   |  40 +-
 content/blog/page8/index.html   |  38 +-
 content/blog/page9/index.html   |  36 +-
 content/downloads.html  |  31 +-
 content/index.html  |   8 +-
 content/news/2022/01/17/release-1.14.3.html | 566 
 content/q/gradle-quickstart.sh  |   2 +-
 content/q/quickstart-scala.sh   |   2 +-
 content/q/quickstart.sh |   2 +-
 content/q/sbt-quickstart.sh |   2 +-
 content/zh/downloads.html   |  35 +-
 content/zh/index.html   |   8 +-
 28 files changed, 1377 insertions(+), 433 deletions(-)

diff --git a/content/blog/feed.xml b/content/blog/feed.xml
index 024dca6..24fd2a2 100644
--- a/content/blog/feed.xml
+++ b/content/blog/feed.xml
@@ -7,6 +7,324 @@
 https://flink.apache.org/blog/feed.xml; rel="self" 
type="application/rss+xml" />
 
 
+Apache Flink 1.14.3 Release Announcement
+pThe Apache Flink community released the second bugfix 
version of the Apache Flink 1.14 series.
+The first bugfix release was 1.14.2, being an emergency release due to an 
Apache Log4j Zero Day (CVE-2021-44228). Flink 1.14.1 was abandoned. 
+That means that this Flink release is the first bugfix release of the Flink 
1.14 series which contains bugfixes not related to the mentioned CVE./p
+
+pThis release includes 164 fixes and minor improvements for Flink 
1.14.0. The list below includes bugfixes and improvements. For a complete list 
of all changes see:
+a 
href=https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12351075amp;projectId=12315522JIRA/a./p;
+
+pWe highly recommend all users to upgrade to Flink 1.14.3./p
+
+pUpdated Maven dependencies:/p
+
+div class=highlightprecode 
class=language-xmlspan 
class=ntlt;dependencygt;/span
+  span 
class=ntlt;groupIdgt;/spanorg.apache.flinkspan
 class=ntlt;/groupIdgt;/span
+  span 
class=ntlt;artifactIdgt;/spanflink-javaspan
 class=ntlt;/artifactIdgt;/span
+  span 
class=ntlt;versiongt;/span1.14.3span 
class=ntlt;/versiongt;/span
+span class=ntlt;/dependencygt;/span
+span class=ntlt;dependencygt;/span
+  span 
class=ntlt;groupIdgt;/spanorg.apache.flinkspan
 class=ntlt;/groupIdgt;/span
+  span 
class=ntlt;artifactIdgt;/spanflink-streaming-java_2.11span
 class=ntlt;/artifactIdgt;/span
+  span 
class=ntlt;versiongt;/span1.14.3span 
class=ntlt;/versiongt;/span
+span class=ntlt;/dependencygt;/span
+span class=ntlt;dependencygt;/span
+  span 
class=ntlt;groupIdgt;/spanorg.apache.flinkspan
 class=ntlt;/groupIdgt;/span
+  span 
class=ntlt;artifactIdgt;/spanflink-clients_2.11span
 class=ntlt;/artifactIdgt;/span
+  span 
class=ntlt;versiongt;/span1.14.3span 
class=ntlt;/versiongt;/span
+span 
class=ntlt;/dependencygt;/span/code/pre/div
+
+pYou can find the binaries on the updated a 
href=/downloads.htmlDownloads page/a./p
+
+div class=highlightprecodeRelease 
Notes - Flink - Version 1.14.3
+/code/pre/div
+
+h2Sub-task
+/h2
+ul
+li[a 
href=https://issues.apache.org/jira/browse/FLINK-24355FLINK-24355/a;]
 - Expose the flag for enabling checkpoints after tasks finish in the 
Web UI
+/li
+/ul
+
+h2Bug
+/h2
+ul
+li[a 
href=https://issues.apache.org/jira/browse/FLINK-15987FLINK-15987/a;]
 - SELECT 1.0e0 / 0.0e0 throws NumberFormatException
+/li
+li[a 
href=https://issues.apache.org/jira/browse/FLINK-17914FLINK-17914/a;]
 - HistoryServer deletes cached archives if archive listing fails
+/li
+li[a 
href=https://issues.apache.org/jira/browse/FLINK-19142FLINK-19142/a;]
 - Local

[flink-web] branch asf-site updated: fixup: Add Flink release 1.14.3

2022-01-19 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 309ccfe  fixup: Add Flink release 1.14.3
309ccfe is described below

commit 309ccfed5a6ee832cfd62a0a5e7750f54137c2b9
Author: Thomas Weise 
AuthorDate: Wed Jan 19 17:29:51 2022 -0800

fixup: Add Flink release 1.14.3
---
 _posts/2022-01-17-release-1.14.3.md | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/_posts/2022-01-17-release-1.14.3.md 
b/_posts/2022-01-17-release-1.14.3.md
index 09bcb76..89bfa5e 100644
--- a/_posts/2022-01-17-release-1.14.3.md
+++ b/_posts/2022-01-17-release-1.14.3.md
@@ -6,11 +6,12 @@ categories: news
 authors:
 - tweise:
   name: "Thomas Weise"
+  twitter: "thweise"
 - MartijnVisser:
   name: "Martijn Visser"
   twitter: "martijnvisser82"
 
-The Apache Flink community released the second bugfix version of the Apache 
Flink 1.14 series.
+excerpt: The Apache Flink community released the second bugfix version of the 
Apache Flink 1.14 series.
 
 ---
 
@@ -328,4 +329,4 @@ You can find the binaries on the updated [Downloads 
page]({{ site.baseurl }}/dow
 [FLINK-25472] - 
Update to Log4j 2.17.1
 
 
-
\ No newline at end of file
+


[flink] branch release-1.14 updated: Update japicmp configuration for 1.14.3

2022-01-19 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
 new bd1cd34  Update japicmp configuration for 1.14.3
bd1cd34 is described below

commit bd1cd343a97e9bb9cff53ffe3baae37fde1034c5
Author: Thomas Weise 
AuthorDate: Wed Jan 19 16:31:04 2022 -0800

Update japicmp configuration for 1.14.3
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 9b556cb..6bbfd62 100644
--- a/pom.xml
+++ b/pom.xml
@@ -155,7 +155,7 @@ under the License.
For Hadoop 2.7, the minor Hadoop version supported for 
flink-shaded-hadoop-2-uber is 2.7.5
-->

2.7.5
-   1.14.0
+   1.14.3
tools/japicmp-output
2.4.2
 


[flink-web] branch asf-site updated: Add Flink release 1.14.3

2022-01-19 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new c9d07f1  Add Flink release 1.14.3
c9d07f1 is described below

commit c9d07f112643f7670f65550818e0785e07c52024
Author: martijnvisser 
AuthorDate: Wed Jan 12 19:44:49 2022 +0100

Add Flink release 1.14.3
---
 _config.yml |  28 +--
 _posts/2022-01-17-release-1.14.3.md | 331 
 q/gradle-quickstart.sh  |   2 +-
 q/quickstart-scala.sh   |   2 +-
 q/quickstart.sh |   2 +-
 q/sbt-quickstart.sh |   2 +-
 6 files changed, 351 insertions(+), 16 deletions(-)

diff --git a/_config.yml b/_config.yml
index 96224b6..41464de 100644
--- a/_config.yml
+++ b/_config.yml
@@ -9,7 +9,7 @@ url: https://flink.apache.org
 
 DOCS_BASE_URL: https://nightlies.apache.org/flink/
 
-FLINK_VERSION_STABLE: 1.14.2
+FLINK_VERSION_STABLE: 1.14.3
 FLINK_VERSION_STABLE_SHORT: "1.14"
 
 FLINK_ISSUES_URL: https://issues.apache.org/jira/browse/FLINK
@@ -64,23 +64,23 @@ flink_releases:
   -
 version_short: "1.14"
 binary_release:
-  name: "Apache Flink 1.14.2"
+  name: "Apache Flink 1.14.3"
   scala_211:
 id: "1142-download_211"
-url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.11.tgz;
-asc_url: 
"https://downloads.apache.org/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.11.tgz.asc;
-sha512_url: 
"https://downloads.apache.org/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.11.tgz.sha512;
+url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.11.tgz;
+asc_url: 
"https://downloads.apache.org/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.11.tgz.asc;
+sha512_url: 
"https://downloads.apache.org/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.11.tgz.sha512;
   scala_212:
 id: "1142-download_212"
-url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.12.tgz;
-asc_url: 
"https://downloads.apache.org/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.12.tgz.asc;
-sha512_url: 
"https://downloads.apache.org/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.12.tgz.sha512;
+url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz;
+asc_url: 
"https://downloads.apache.org/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz.asc;
+sha512_url: 
"https://downloads.apache.org/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz.sha512;
 source_release:
-  name: "Apache Flink 1.14.2"
+  name: "Apache Flink 1.14.3"
   id: "1142-download-source"
-  url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.14.2/flink-1.14.2-src.tgz;
-  asc_url: 
"https://downloads.apache.org/flink/flink-1.14.2/flink-1.14.2-src.tgz.asc;
-  sha512_url: 
"https://downloads.apache.org/flink/flink-1.14.2/flink-1.14.2-src.tgz.sha512;
+  url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.14.3/flink-1.14.3-src.tgz;
+  asc_url: 
"https://downloads.apache.org/flink/flink-1.14.3/flink-1.14.3-src.tgz.asc;
+  sha512_url: 
"https://downloads.apache.org/flink/flink-1.14.3/flink-1.14.3-src.tgz.sha512;
 release_notes_url: 
"https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14;
   -
 version_short: "1.13"
@@ -205,6 +205,10 @@ release_archive:
 flink:
   -
 version_short: "1.14"
+version_long: 1.14.3
+release_date: 2022-01-17
+  -
+version_short: "1.14"
 version_long: 1.14.2
 release_date: 2021-12-16
   -
diff --git a/_posts/2022-01-17-release-1.14.3.md 
b/_posts/2022-01-17-release-1.14.3.md
new file mode 100644
index 000..09bcb76
--- /dev/null
+++ b/_posts/2022-01-17-release-1.14.3.md
@@ -0,0 +1,331 @@
+---
+layout: post
+title:  "Apache Flink 1.14.3 Release Announcement"
+date: 2022-01-17T08:00:00.000Z
+categories: news
+authors:
+- tweise:
+  name: "Thomas Weise"
+- MartijnVisser:
+  name: "Martijn Visser"
+  twitter: "martijnvisser82"
+
+The Apache Flink community released the second bugfix version of the Apache 
Flink 1.14 series.
+
+---
+
+The Apache Flink community released the second bugfix version of the Apache 
Flink 1.14 series.
+The first bugfix release was 1.14.2, being an emergency release due to an 
Apache Log4j Zero Day (CVE-2021-44228). Flink 1.14.1 was abandoned. 
+That means that this Flink release is the first bugfix release of the Flink 
1.14 series which

[flink-docker] branch master updated: Add 1.14.3 release

2022-01-18 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-docker.git


The following commit(s) were added to refs/heads/master by this push:
 new 312e9af  Add 1.14.3 release
312e9af is described below

commit 312e9af1313817d78e18c9d5b6dbdff3c4e408d5
Author: Thomas Weise 
AuthorDate: Mon Jan 17 13:26:17 2022 -0800

Add 1.14.3 release
---
 1.14/scala_2.11-java11-debian/Dockerfile   | 6 +++---
 1.14/scala_2.11-java11-debian/release.metadata | 2 +-
 1.14/scala_2.11-java8-debian/Dockerfile| 6 +++---
 1.14/scala_2.11-java8-debian/release.metadata  | 2 +-
 1.14/scala_2.12-java11-debian/Dockerfile   | 6 +++---
 1.14/scala_2.12-java11-debian/release.metadata | 2 +-
 1.14/scala_2.12-java8-debian/Dockerfile| 6 +++---
 1.14/scala_2.12-java8-debian/release.metadata  | 2 +-
 8 files changed, 16 insertions(+), 16 deletions(-)

diff --git a/1.14/scala_2.11-java11-debian/Dockerfile 
b/1.14/scala_2.11-java11-debian/Dockerfile
index 9c57ecc..b5228a2 100644
--- a/1.14/scala_2.11-java11-debian/Dockerfile
+++ b/1.14/scala_2.11-java11-debian/Dockerfile
@@ -44,9 +44,9 @@ RUN set -ex; \
   gosu nobody true
 
 # Configure Flink version
-ENV 
FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download=flink/flink-1.14.2/flink-1.14.2-bin-scala_2.11.tgz
 \
-
FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.11.tgz.asc
 \
-GPG_KEY=19F2195E1B4816D765A2C324C2EED7B111D464BA \
+ENV 
FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download=flink/flink-1.14.3/flink-1.14.3-bin-scala_2.11.tgz
 \
+
FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.11.tgz.asc
 \
+GPG_KEY=10409A66C7C2F297C8581C2A12DEE3E4D920A98C \
 CHECK_GPG=true
 
 # Prepare environment
diff --git a/1.14/scala_2.11-java11-debian/release.metadata 
b/1.14/scala_2.11-java11-debian/release.metadata
index cda57d1..84a6d6d 100644
--- a/1.14/scala_2.11-java11-debian/release.metadata
+++ b/1.14/scala_2.11-java11-debian/release.metadata
@@ -1,2 +1,2 @@
-Tags: 1.14.2-scala_2.11-java11, 1.14-scala_2.11-java11, scala_2.11-java11
+Tags: 1.14.3-scala_2.11-java11, 1.14-scala_2.11-java11, scala_2.11-java11
 Architectures: amd64
diff --git a/1.14/scala_2.11-java8-debian/Dockerfile 
b/1.14/scala_2.11-java8-debian/Dockerfile
index 76af456..6aff4df 100644
--- a/1.14/scala_2.11-java8-debian/Dockerfile
+++ b/1.14/scala_2.11-java8-debian/Dockerfile
@@ -44,9 +44,9 @@ RUN set -ex; \
   gosu nobody true
 
 # Configure Flink version
-ENV 
FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download=flink/flink-1.14.2/flink-1.14.2-bin-scala_2.11.tgz
 \
-
FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.11.tgz.asc
 \
-GPG_KEY=19F2195E1B4816D765A2C324C2EED7B111D464BA \
+ENV 
FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download=flink/flink-1.14.3/flink-1.14.3-bin-scala_2.11.tgz
 \
+
FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.11.tgz.asc
 \
+GPG_KEY=10409A66C7C2F297C8581C2A12DEE3E4D920A98C \
 CHECK_GPG=true
 
 # Prepare environment
diff --git a/1.14/scala_2.11-java8-debian/release.metadata 
b/1.14/scala_2.11-java8-debian/release.metadata
index c20e599..f405f57 100644
--- a/1.14/scala_2.11-java8-debian/release.metadata
+++ b/1.14/scala_2.11-java8-debian/release.metadata
@@ -1,2 +1,2 @@
-Tags: 1.14.2-scala_2.11-java8, 1.14-scala_2.11-java8, scala_2.11-java8, 
1.14.2-scala_2.11, 1.14-scala_2.11, scala_2.11
+Tags: 1.14.3-scala_2.11-java8, 1.14-scala_2.11-java8, scala_2.11-java8, 
1.14.3-scala_2.11, 1.14-scala_2.11, scala_2.11
 Architectures: amd64
diff --git a/1.14/scala_2.12-java11-debian/Dockerfile 
b/1.14/scala_2.12-java11-debian/Dockerfile
index 0b22c21..1bb36d3 100644
--- a/1.14/scala_2.12-java11-debian/Dockerfile
+++ b/1.14/scala_2.12-java11-debian/Dockerfile
@@ -44,9 +44,9 @@ RUN set -ex; \
   gosu nobody true
 
 # Configure Flink version
-ENV 
FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download=flink/flink-1.14.2/flink-1.14.2-bin-scala_2.12.tgz
 \
-
FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.12.tgz.asc
 \
-GPG_KEY=19F2195E1B4816D765A2C324C2EED7B111D464BA \
+ENV 
FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download=flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz
 \
+
FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz.asc
 \
+GPG_KEY=10409A66C7C2F297C8581C2A12DEE3E4D920A98C \
 CHECK_GPG=true
 
 # Prepare environment
diff --git a/1.14/scala_2.12-java11-debian/release.metadata 
b/1.14/scala_2.12-java11-debian/release.metadata
index 6bed731..23f2f57 100644
--- a/1.14/scala_2.12-java11-debian/release.metadata
+++ b/1.14/scala_2.12-java11-debian/release.metadata
@@ -1,2 +1,2 @@
-Tags: 1.14.2-scala_2.12-java11, 1.14

[flink-docker] branch dev-1.14 updated (dd51e81 -> a676953)

2022-01-17 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch dev-1.14
in repository https://gitbox.apache.org/repos/asf/flink-docker.git.


from dd51e81  Run tests for 1.14.2
 add 9362050  Add GPG key for 1.14.3 release
 add a676953  update run_travis_tests.sh

No new revisions were added by this update.

Summary of changes:
 add-version.sh  | 2 ++
 testing/run_travis_tests.sh | 2 +-
 2 files changed, 3 insertions(+), 1 deletion(-)


[flink] annotated tag release-1.14.3 updated (98997ea -> b36cd46)

2022-01-17 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to annotated tag release-1.14.3
in repository https://gitbox.apache.org/repos/asf/flink.git.


*** WARNING: tag release-1.14.3 was modified! ***

from 98997ea  (commit)
  to b36cd46  (tag)
 tagging 98997ea37ba08eae0f9aa6dd34823238097d8e0d (commit)
 replaces pre-apache-rename
  by Thomas Weise
  on Mon Jan 17 10:07:20 2022 -0800

- Log -
Release Flink 1.14.3
-BEGIN PGP SIGNATURE-

iQIzBAABCAAdFiCaZsfC8pfIWBwqEt7j5NkgqYwFAmHlsFgACgkQEt7j5Nkg
qYwY7A/+JH10viigIExKRdbel9V54UXLcFQdplaRYtt84hgq0yAcHfOOQ/ucMOAV
ltyzwwDrbyrC0TKUM8o5U+WaL3T5j5YWyLEd6Md6T3a9vCN2a4XdVlDCZiEIXwJR
oi4ACAXpiqFJKoGwE+gH/bp5bFYnq9v8s3sMM6SD9Wi1sYUPeFym64zf0eSAEuh3
vaUJkotKKE9/xuwo3YsuFM8pdQnOYjOGDEkDOuOiOnl4/uf1FLmBJH2gvMVsmryz
AGxiGAz4A9XSEEpLBFZRVsZv1AyK3p+mis3ewTA/Wn5HSYBzDBhjWeQHOdbDGkcb
rZCXu44FGRkGfGiOCSvaNR3HxrehuyJliJw+QV+vYVgMgXvXpmlCDDcZIxf8z0V+
akgzQPKZVrSEwV+J7RzAecr8jqvOJCiNgSuorkuSmZ1li0iAK+k3yJdDOmhMjuOq
hsrYY2+sW+o035p+kN8aoazSJPzFF+Yg/DHBgAxy7Nw/lfyJgHP7+b/jXvFPqsRK
rO83hUbtJXmtuIlhrm+UWEryQCBVrvndCOP9Oxgu1aVr1n6ZnRx3Ge4ytX+GCJGb
3JzERfNB0iPloQn6ASJaNFGa5c+EUhXlwFLOea+f8jMuJ4lnIqiM2r4D84jGIsRm
CD1QmHqLxZ+EAnYyfSp8Zzr8cuN6H5556h7E7ZKTIYUoFeTLoS0=
=x8Q4
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:


svn commit: r52111 - /dev/flink/flink-1.14.3-rc1/ /release/flink/flink-1.14.3/

2022-01-17 Thread thw
Author: thw
Date: Mon Jan 17 18:05:00 2022
New Revision: 52111

Log:
Release Flink 1.14.3

Added:
release/flink/flink-1.14.3/
  - copied from r52110, dev/flink/flink-1.14.3-rc1/
Removed:
dev/flink/flink-1.14.3-rc1/



[flink] annotated tag release-1.14.3-rc1 updated (98997ea -> d32b95f)

2022-01-10 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to annotated tag release-1.14.3-rc1
in repository https://gitbox.apache.org/repos/asf/flink.git.


*** WARNING: tag release-1.14.3-rc1 was modified! ***

from 98997ea  (commit)
  to d32b95f  (tag)
 tagging 98997ea37ba08eae0f9aa6dd34823238097d8e0d (commit)
 replaces pre-apache-rename
  by Thomas Weise
  on Mon Jan 10 17:05:11 2022 -0800

- Log -
release-1.14.3-rc1
-BEGIN PGP SIGNATURE-

iQIzBAABCAAdFiCaZsfC8pfIWBwqEt7j5NkgqYwFAmHc18cACgkQEt7j5Nkg
qYx4MBAAr3nARCydSDTLjb/HV2aJ+b9rVcwvtfu2nT5+6IE7XW2JPFhe5CGufwL+
kpEvkDi14mIhpl9LNo2jRil8ejUyUYB5uRVXd0lEw9ZS4zfzCN4/lN62nlRBY3WE
Qs1vLT+BH/jIGsl8unGLzJOV+ZRxSW7MkT+sOrWjMY32KBVfKneP3DouGex06msP
x1czJDoNNziYe3QNHyu0foNdTanvSW8FEKfKKQkqOAPhCswfphr4Tnj3grRMRDiv
U8y6TW5cik4ytRnyrWWLrO8Ou9lYFxn2NVDCPLdd03+guEvamCa0iPSZzTlmNdON
+YxyvAwoV3Jjb2lcjLNNd4F9qy/c3b9KWYZBbjVRyH/oIySTmgycZfu30EPM3Vht
ttpkUlPL6X0R27aA4qSZp5qtjhIIj+XNI2NPXXmyGMYP330bw8Ble0XSbECBuLoH
hOllMdamdwPzTyvqGR7cbg9Sw2L0j+OXCDDgUOp7ebn9CpSssSr0zxK4OwxeSl9r
BCRnl3BQ14N+cHto3Q0cNXYFMfGajYsENqgXQSCNzYFFBfWL56D5coBqaF7jdr+9
hZNNlMXfni/YCG3GRdRZwEwTiW7B7SivvOgcSkzscrMn2NKlDS5KviM5WltyVDDS
gaeXVz7DVFXUd8GILP+4OoWDT/vxmRt+SyWNx2Lkff8Y1W2IrOc=
=XzQk
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:


[flink] annotated tag release-1.14.3-rc1 updated (98997ea -> 217984e)

2022-01-10 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to annotated tag release-1.14.3-rc1
in repository https://gitbox.apache.org/repos/asf/flink.git.


*** WARNING: tag release-1.14.3-rc1 was modified! ***

from 98997ea  (commit)
  to 217984e  (tag)
 tagging 98997ea37ba08eae0f9aa6dd34823238097d8e0d (commit)
 replaces pre-apache-rename
  by Thomas Weise
  on Sat Jan 8 14:24:11 2022 -0800

- Log -
release-1.14.3-rc1
-BEGIN PGP SIGNATURE-

iQIzBAABCAAdFiCaZsfC8pfIWBwqEt7j5NkgqYwFAmHaDwsACgkQEt7j5Nkg
qYyejA//TW228QDDvgINrwKnjR3XWDu6CBgMbHEzT0a4pcpCq8OZE48Wa/XIyouh
8Q04Z2T4Ov1/vJqe5SSaqfrSpB6FD/bHQYE0pyxiTICj2AEevRKMOgTA2dccEO8g
zBiWjwI+yZFRVFb7spXuggLfkDagdWC0da2SUn7Uthf7nXUrGyk0WKvsaexgK1OM
cmF0y+9ov4yNPRqYBATGlhfjeBs/qHfZixmkoBxbjqo3B1hXzrKWd48YDk9bTA/l
Sd9RHD0maXmacicUcNmJ0X+Z43eGwbAhiHDj3VmzkiGoGiBrhStv3hHTFsVByKAb
ln1Fq1kpm03+K5HDViPIsI2h68hYyFDsIzJTIJgrkQQwRNhi19UxjANuRQsAXUii
yJyrdOQSobrKX8aNyo7AMJII8lxp9O2w2UxccwAYy6oVtua5tL/rNKERiiOOBy6X
Hejr/Tb/N16fIXL3BCDYq1JYV7ivPMsjhEAoqw4IR369En587R1K1NTRzc2s5573
vOIn830Q2gekt8S68KaHY5C1tox8kvtTvoGwufYEOHbVJmddti35eFiz0t4pF3DH
mOeLhOghOgDrRhZPiznXZAlpZHezzcoNYMWquyXJtICxgZSjnlKlw3EW/1yp9458
BY51/wzFZvRp3nWmSIhpPV7a9AkGwYZ+579ymUFMxwFbtRV4024=
=E6Mw
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:


svn commit: r52020 - in /dev/flink/flink-1.14.3-rc1: ./ python/

2022-01-10 Thread thw
Author: thw
Date: Tue Jan 11 00:56:27 2022
New Revision: 52020

Log:
Add flink-1.14.3-rc1

Added:
dev/flink/flink-1.14.3-rc1/
dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.11.tgz   (with props)
dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.11.tgz.asc
dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.11.tgz.sha512
dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.12.tgz   (with props)
dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.12.tgz.asc
dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.12.tgz.sha512
dev/flink/flink-1.14.3-rc1/flink-1.14.3-src.tgz   (with props)
dev/flink/flink-1.14.3-rc1/flink-1.14.3-src.tgz.asc
dev/flink/flink-1.14.3-rc1/flink-1.14.3-src.tgz.sha512
dev/flink/flink-1.14.3-rc1/python/
dev/flink/flink-1.14.3-rc1/python/apache-flink-1.14.3.tar.gz   (with props)
dev/flink/flink-1.14.3-rc1/python/apache-flink-1.14.3.tar.gz.asc
dev/flink/flink-1.14.3-rc1/python/apache-flink-1.14.3.tar.gz.sha512
dev/flink/flink-1.14.3-rc1/python/apache-flink-libraries-1.14.3.tar.gz   
(with props)
dev/flink/flink-1.14.3-rc1/python/apache-flink-libraries-1.14.3.tar.gz.asc

dev/flink/flink-1.14.3-rc1/python/apache-flink-libraries-1.14.3.tar.gz.sha512

dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp36-cp36m-macosx_10_9_x86_64.whl
   (with props)

dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp36-cp36m-macosx_10_9_x86_64.whl.asc

dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp36-cp36m-macosx_10_9_x86_64.whl.sha512

dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp36-cp36m-manylinux1_x86_64.whl
   (with props)

dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp36-cp36m-manylinux1_x86_64.whl.asc

dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp36-cp36m-manylinux1_x86_64.whl.sha512

dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp37-cp37m-macosx_10_9_x86_64.whl
   (with props)

dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp37-cp37m-macosx_10_9_x86_64.whl.asc

dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp37-cp37m-macosx_10_9_x86_64.whl.sha512

dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp37-cp37m-manylinux1_x86_64.whl
   (with props)

dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp37-cp37m-manylinux1_x86_64.whl.asc

dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp37-cp37m-manylinux1_x86_64.whl.sha512

dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp38-cp38-macosx_10_9_x86_64.whl
   (with props)

dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp38-cp38-macosx_10_9_x86_64.whl.asc

dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp38-cp38-macosx_10_9_x86_64.whl.sha512

dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp38-cp38-manylinux1_x86_64.whl
   (with props)

dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp38-cp38-manylinux1_x86_64.whl.asc

dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp38-cp38-manylinux1_x86_64.whl.sha512

Added: dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.11.tgz
==
Binary file - no diff available.

Propchange: dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.11.tgz
--
svn:mime-type = application/octet-stream

Added: dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.11.tgz.asc
==
--- dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.11.tgz.asc (added)
+++ dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.11.tgz.asc Tue Jan 11 
00:56:27 2022
@@ -0,0 +1,16 @@
+-BEGIN PGP SIGNATURE-
+
+iQIzBAABCAAdFiCaZsfC8pfIWBwqEt7j5NkgqYwFAmHcyIAACgkQEt7j5Nkg
+qYzbbQ/8CTxLjr9D3AXGGKeqCeBnIIxR7s17483QDqIOa7QLIew7MYpvqrBYK66B
+yHisA7XxL7K7a6zOGV5sO06pLpT9EjYjmxw41UkWAIwMP7GpyFMiu4LhN1LDcCTV
+xK7mSxgZ86NjUvoOIexM/pSkoVqSAiMHOa/Q5OzSWPPRo/byXyjUQ5eFb6R937LR
+t/Yj3V7wHPLQ30vpu2a+reHRNlmAiH2NKH4oRWVtBx0vIFkmqx2qWqdWevdrUlia
+aLrNfPqbw7hRhQEDofbjF9EoViTfvIjC/XuI8Xo1gA+A0YklaMy0HIsh31iNj6Mg
+VBxP0/Sh5wlwLsV6Zvj0eZ4bjLvmmv0NWRxZB0Oti4BpZQPz4iZ0T6j7YdmMBBNz
+KLE65C3E3jQBz0HnUKPIrqcTWVt3ad0YIk8+6v/t1eqxZculIJbHlH9nLlMc+hdW
+UgDGGlGY3pi644k08oi8hsEI5thckQOIRUjGx+3IvajU9lZL7+lqtFvPU8W7Ictg
+wcLm1cssKJRPvuIBN15pweFygsRvENpi2iQeo9YSg/ZySfymu66C2wm4oeUtxjoS
+69RvjDao6VgvS9Ld7w/9fACOTHFZTViq8IsEDkYLub+LRaanFjuc7XOQpxDNeJwB
+l6T9MPb029io5tBV0IDDYElG2+J7muqSHAAngYuV9uFj72MdeQc=
+=QqHN
+-END PGP SIGNATURE-

Added: dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.11.tgz.sha512
==
--- dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.11.tgz.sha512 (added)
+++ dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.11.tgz.sha512 Tue Jan 
11 00:56:27 2022
@@ -0,0 +1

[flink] 01/01: Commit for release 1.14.3

2022-01-09 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.14.3-rc1
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 98997ea37ba08eae0f9aa6dd34823238097d8e0d
Author: Thomas Weise 
AuthorDate: Sat Jan 8 14:23:54 2022 -0800

Commit for release 1.14.3
---
 docs/config.toml  | 2 +-
 flink-annotations/pom.xml | 2 +-
 flink-clients/pom.xml | 2 +-
 flink-connectors/flink-connector-base/pom.xml | 2 +-
 flink-connectors/flink-connector-cassandra/pom.xml| 2 +-
 flink-connectors/flink-connector-elasticsearch-base/pom.xml   | 2 +-
 flink-connectors/flink-connector-elasticsearch5/pom.xml   | 2 +-
 flink-connectors/flink-connector-elasticsearch6/pom.xml   | 2 +-
 flink-connectors/flink-connector-elasticsearch7/pom.xml   | 2 +-
 flink-connectors/flink-connector-files/pom.xml| 2 +-
 flink-connectors/flink-connector-gcp-pubsub/pom.xml   | 2 +-
 flink-connectors/flink-connector-hbase-1.4/pom.xml| 2 +-
 flink-connectors/flink-connector-hbase-2.2/pom.xml| 2 +-
 flink-connectors/flink-connector-hbase-base/pom.xml   | 2 +-
 flink-connectors/flink-connector-hive/pom.xml | 2 +-
 flink-connectors/flink-connector-jdbc/pom.xml | 2 +-
 flink-connectors/flink-connector-kafka/pom.xml| 2 +-
 flink-connectors/flink-connector-kinesis/pom.xml  | 2 +-
 flink-connectors/flink-connector-nifi/pom.xml | 2 +-
 flink-connectors/flink-connector-pulsar/pom.xml   | 2 +-
 flink-connectors/flink-connector-rabbitmq/pom.xml | 2 +-
 flink-connectors/flink-connector-twitter/pom.xml  | 2 +-
 flink-connectors/flink-file-sink-common/pom.xml   | 2 +-
 flink-connectors/flink-hadoop-compatibility/pom.xml   | 2 +-
 flink-connectors/flink-hcatalog/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-elasticsearch6/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-elasticsearch7/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-hbase-1.4/pom.xml| 2 +-
 flink-connectors/flink-sql-connector-hbase-2.2/pom.xml| 2 +-
 flink-connectors/flink-sql-connector-hive-1.2.2/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-hive-2.2.0/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-hive-2.3.6/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-hive-3.1.2/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-kafka/pom.xml| 2 +-
 flink-connectors/flink-sql-connector-kinesis/pom.xml  | 2 +-
 flink-connectors/pom.xml  | 2 +-
 flink-container/pom.xml   | 2 +-
 flink-contrib/flink-connector-wikiedits/pom.xml   | 2 +-
 flink-contrib/pom.xml | 2 +-
 flink-core/pom.xml| 2 +-
 flink-dist/pom.xml| 2 +-
 flink-docs/pom.xml| 2 +-
 flink-dstl/flink-dstl-dfs/pom.xml | 2 +-
 flink-dstl/pom.xml| 2 +-
 flink-end-to-end-tests/flink-batch-sql-test/pom.xml   | 2 +-
 flink-end-to-end-tests/flink-cli-test/pom.xml | 2 +-
 flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml| 2 +-
 .../flink-connector-gcp-pubsub-emulator-tests/pom.xml | 2 +-
 flink-end-to-end-tests/flink-dataset-allround-test/pom.xml| 2 +-
 .../flink-dataset-fine-grained-recovery-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-datastream-allround-test/pom.xml | 2 +-
 flink-end-to-end-tests/flink-distributed-cache-via-blob-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-elasticsearch7-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml| 2 +-
 flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-end-to-end-tests-hbase/pom.xml   | 2 +-
 flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-file-sink-test/pom.xml

[flink] branch release-1.14.3-rc1 created (now 98997ea)

2022-01-09 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch release-1.14.3-rc1
in repository https://gitbox.apache.org/repos/asf/flink.git.


  at 98997ea  Commit for release 1.14.3

This branch includes the following new commits:

 new 98997ea  Commit for release 1.14.3

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[flink] branch release-1.14 updated: [FLINK-24064][connector/common] HybridSource restore from savepoint

2021-09-06 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
 new cb71c37  [FLINK-24064][connector/common] HybridSource restore from 
savepoint
cb71c37 is described below

commit cb71c3721efa129972c8d9e6ace347b691ee80e1
Author: Thomas Weise 
AuthorDate: Tue Aug 31 05:48:40 2021 -0700

[FLINK-24064][connector/common] HybridSource restore from savepoint
---
 .../connector/base/source/hybrid/HybridSource.java | 19 ++
 .../source/hybrid/HybridSourceEnumeratorState.java | 17 --
 .../HybridSourceEnumeratorStateSerializer.java | 37 ++--
 .../base/source/hybrid/HybridSourceReader.java | 15 ++---
 .../base/source/hybrid/HybridSourceSplit.java  | 69 --
 .../source/hybrid/HybridSourceSplitEnumerator.java | 48 +--
 .../source/hybrid/HybridSourceSplitSerializer.java | 39 +++-
 .../base/source/hybrid/SwitchedSources.java| 48 +++
 .../base/source/hybrid/HybridSourceReaderTest.java | 36 ---
 .../hybrid/HybridSourceSplitEnumeratorTest.java| 36 ++-
 .../hybrid/HybridSourceSplitSerializerTest.java|  6 +-
 .../base/source/reader/mocks/MockBaseSource.java   |  3 +-
 12 files changed, 202 insertions(+), 171 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
index e3d66de..24acb6a 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
@@ -32,9 +32,7 @@ import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 /**
  * Hybrid source that switches underlying sources based on configured source 
chain.
@@ -91,14 +89,11 @@ import java.util.Map;
 public class HybridSource implements Source {
 
 private final List sources;
-// sources are populated per subtask at switch time
-private final Map switchedSources;
 
 /** Protected for subclass, use {@link #builder(Source)} to construct 
source. */
 protected HybridSource(List sources) {
 Preconditions.checkArgument(!sources.isEmpty());
 this.sources = sources;
-this.switchedSources = new HashMap<>(sources.size());
 }
 
 /** Builder for {@link HybridSource}. */
@@ -116,13 +111,13 @@ public class HybridSource implements Source createReader(SourceReaderContext 
readerContext)
 throws Exception {
-return new HybridSourceReader(readerContext, switchedSources);
+return new HybridSourceReader(readerContext);
 }
 
 @Override
 public SplitEnumerator 
createEnumerator(
 SplitEnumeratorContext enumContext) {
-return new HybridSourceSplitEnumerator(enumContext, sources, 0, 
switchedSources, null);
+return new HybridSourceSplitEnumerator(enumContext, sources, 0, null);
 }
 
 @Override
@@ -131,22 +126,18 @@ public class HybridSource implements Source getSplitSerializer() {
-return new HybridSourceSplitSerializer(switchedSources);
+return new HybridSourceSplitSerializer();
 }
 
 @Override
 public SimpleVersionedSerializer
 getEnumeratorCheckpointSerializer() {
-return new HybridSourceEnumeratorStateSerializer(switchedSources);
+return new HybridSourceEnumeratorStateSerializer();
 }
 
 /**
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java
index 2da99ee..95aadde 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java
@@ -21,18 +21,25 @@ package org.apache.flink.connector.base.source.hybrid;
 /** The state of hybrid source enumerator. */
 public class HybridSourceEnumeratorState {
 private final int currentSourceIndex;
-private final Object wrappedState;
+private byte[] wrappedStateBytes;
+private final int wrappedStateSerializerVersion;
 
-HybridSourceEnumeratorState(int currentSourceIndex, Object wrappedState) {
+HybridSourceEnumeratorState(
+int currentSourceIndex, byte[] wrappedStateByte

[flink] branch release-1.13 updated: [FLINK-24064][connector/common] HybridSource restore from savepoint

2021-09-06 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
 new c7ccd24  [FLINK-24064][connector/common] HybridSource restore from 
savepoint
c7ccd24 is described below

commit c7ccd24e8ee566689898f8ced76090bcce0f87fa
Author: Thomas Weise 
AuthorDate: Tue Aug 31 05:48:40 2021 -0700

[FLINK-24064][connector/common] HybridSource restore from savepoint
---
 .../connector/base/source/hybrid/HybridSource.java | 19 ++
 .../source/hybrid/HybridSourceEnumeratorState.java | 17 --
 .../HybridSourceEnumeratorStateSerializer.java | 37 ++--
 .../base/source/hybrid/HybridSourceReader.java | 15 ++---
 .../base/source/hybrid/HybridSourceSplit.java  | 69 --
 .../source/hybrid/HybridSourceSplitEnumerator.java | 48 +--
 .../source/hybrid/HybridSourceSplitSerializer.java | 39 +++-
 .../base/source/hybrid/SwitchedSources.java| 48 +++
 .../base/source/hybrid/HybridSourceReaderTest.java | 36 ---
 .../hybrid/HybridSourceSplitEnumeratorTest.java| 36 ++-
 .../hybrid/HybridSourceSplitSerializerTest.java|  6 +-
 .../base/source/reader/mocks/MockBaseSource.java   |  3 +-
 12 files changed, 202 insertions(+), 171 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
index e3d66de..24acb6a 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
@@ -32,9 +32,7 @@ import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 /**
  * Hybrid source that switches underlying sources based on configured source 
chain.
@@ -91,14 +89,11 @@ import java.util.Map;
 public class HybridSource implements Source {
 
 private final List sources;
-// sources are populated per subtask at switch time
-private final Map switchedSources;
 
 /** Protected for subclass, use {@link #builder(Source)} to construct 
source. */
 protected HybridSource(List sources) {
 Preconditions.checkArgument(!sources.isEmpty());
 this.sources = sources;
-this.switchedSources = new HashMap<>(sources.size());
 }
 
 /** Builder for {@link HybridSource}. */
@@ -116,13 +111,13 @@ public class HybridSource implements Source createReader(SourceReaderContext 
readerContext)
 throws Exception {
-return new HybridSourceReader(readerContext, switchedSources);
+return new HybridSourceReader(readerContext);
 }
 
 @Override
 public SplitEnumerator 
createEnumerator(
 SplitEnumeratorContext enumContext) {
-return new HybridSourceSplitEnumerator(enumContext, sources, 0, 
switchedSources, null);
+return new HybridSourceSplitEnumerator(enumContext, sources, 0, null);
 }
 
 @Override
@@ -131,22 +126,18 @@ public class HybridSource implements Source getSplitSerializer() {
-return new HybridSourceSplitSerializer(switchedSources);
+return new HybridSourceSplitSerializer();
 }
 
 @Override
 public SimpleVersionedSerializer
 getEnumeratorCheckpointSerializer() {
-return new HybridSourceEnumeratorStateSerializer(switchedSources);
+return new HybridSourceEnumeratorStateSerializer();
 }
 
 /**
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java
index 2da99ee..95aadde 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java
@@ -21,18 +21,25 @@ package org.apache.flink.connector.base.source.hybrid;
 /** The state of hybrid source enumerator. */
 public class HybridSourceEnumeratorState {
 private final int currentSourceIndex;
-private final Object wrappedState;
+private byte[] wrappedStateBytes;
+private final int wrappedStateSerializerVersion;
 
-HybridSourceEnumeratorState(int currentSourceIndex, Object wrappedState) {
+HybridSourceEnumeratorState(
+int currentSourceIndex, byte[] wrappedStateByte

[flink] branch master updated: [FLINK-24064][connector/common] HybridSource restore from savepoint

2021-09-03 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 2984d87  [FLINK-24064][connector/common] HybridSource restore from 
savepoint
2984d87 is described below

commit 2984d87ed7761a6ca345b8b79bcb6dd49db0442b
Author: Thomas Weise 
AuthorDate: Tue Aug 31 05:48:40 2021 -0700

[FLINK-24064][connector/common] HybridSource restore from savepoint
---
 .../connector/base/source/hybrid/HybridSource.java | 19 ++
 .../source/hybrid/HybridSourceEnumeratorState.java | 17 --
 .../HybridSourceEnumeratorStateSerializer.java | 37 ++--
 .../base/source/hybrid/HybridSourceReader.java | 15 ++---
 .../base/source/hybrid/HybridSourceSplit.java  | 69 --
 .../source/hybrid/HybridSourceSplitEnumerator.java | 48 +--
 .../source/hybrid/HybridSourceSplitSerializer.java | 39 +++-
 .../base/source/hybrid/SwitchedSources.java| 48 +++
 .../base/source/hybrid/HybridSourceReaderTest.java | 36 ---
 .../hybrid/HybridSourceSplitEnumeratorTest.java| 36 ++-
 .../hybrid/HybridSourceSplitSerializerTest.java|  6 +-
 .../base/source/reader/mocks/MockBaseSource.java   |  3 +-
 12 files changed, 202 insertions(+), 171 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
index e3d66de..24acb6a 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
@@ -32,9 +32,7 @@ import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 /**
  * Hybrid source that switches underlying sources based on configured source 
chain.
@@ -91,14 +89,11 @@ import java.util.Map;
 public class HybridSource implements Source {
 
 private final List sources;
-// sources are populated per subtask at switch time
-private final Map switchedSources;
 
 /** Protected for subclass, use {@link #builder(Source)} to construct 
source. */
 protected HybridSource(List sources) {
 Preconditions.checkArgument(!sources.isEmpty());
 this.sources = sources;
-this.switchedSources = new HashMap<>(sources.size());
 }
 
 /** Builder for {@link HybridSource}. */
@@ -116,13 +111,13 @@ public class HybridSource implements Source createReader(SourceReaderContext 
readerContext)
 throws Exception {
-return new HybridSourceReader(readerContext, switchedSources);
+return new HybridSourceReader(readerContext);
 }
 
 @Override
 public SplitEnumerator 
createEnumerator(
 SplitEnumeratorContext enumContext) {
-return new HybridSourceSplitEnumerator(enumContext, sources, 0, 
switchedSources, null);
+return new HybridSourceSplitEnumerator(enumContext, sources, 0, null);
 }
 
 @Override
@@ -131,22 +126,18 @@ public class HybridSource implements Source getSplitSerializer() {
-return new HybridSourceSplitSerializer(switchedSources);
+return new HybridSourceSplitSerializer();
 }
 
 @Override
 public SimpleVersionedSerializer
 getEnumeratorCheckpointSerializer() {
-return new HybridSourceEnumeratorStateSerializer(switchedSources);
+return new HybridSourceEnumeratorStateSerializer();
 }
 
 /**
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java
index 2da99ee..95aadde 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java
@@ -21,18 +21,25 @@ package org.apache.flink.connector.base.source.hybrid;
 /** The state of hybrid source enumerator. */
 public class HybridSourceEnumeratorState {
 private final int currentSourceIndex;
-private final Object wrappedState;
+private byte[] wrappedStateBytes;
+private final int wrappedStateSerializerVersion;
 
-HybridSourceEnumeratorState(int currentSourceIndex, Object wrappedState) {
+HybridSourceEnumeratorState(
+int currentSourceIndex, byte[] wrappedStateBytes, int 
serializerV

[flink] branch release-1.13 updated (dcfbf74 -> a8871ad)

2021-08-30 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git.


from dcfbf74  [FLINK-24033][table-planner] Propagate unique keys for 
fromChangelogStream
 add a8871ad  [FLINK-24010][connector/common] HybridSourceReader/Enumerator 
delegate checkpoint notifications

No new revisions were added by this update.

Summary of changes:
 .../base/source/hybrid/HybridSourceReader.java | 14 +
 .../source/hybrid/HybridSourceSplitEnumerator.java | 10 +++
 .../base/source/hybrid/HybridSourceReaderTest.java | 34 ++
 .../hybrid/HybridSourceSplitEnumeratorTest.java| 21 +
 4 files changed, 79 insertions(+)


[flink] branch release-1.14 updated: [FLINK-24010][connector/common] HybridSourceReader/Enumerator delegate checkpoint notifications

2021-08-30 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
 new 8c293c7  [FLINK-24010][connector/common] HybridSourceReader/Enumerator 
delegate checkpoint notifications
8c293c7 is described below

commit 8c293c7e03bf9c5221ae50843a3af445276afa24
Author: Thomas Weise 
AuthorDate: Thu Aug 26 14:22:01 2021 -0700

[FLINK-24010][connector/common] HybridSourceReader/Enumerator delegate 
checkpoint notifications
---
 .../base/source/hybrid/HybridSourceReader.java | 14 +
 .../source/hybrid/HybridSourceSplitEnumerator.java | 10 +++
 .../base/source/hybrid/HybridSourceReaderTest.java | 34 ++
 .../hybrid/HybridSourceSplitEnumeratorTest.java| 21 +
 4 files changed, 79 insertions(+)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
index cdd2e8b..28d4011 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
@@ -121,6 +121,20 @@ public class HybridSourceReader implements 
SourceReader
 }
 
 @Override
+public void notifyCheckpointComplete(long checkpointId) throws Exception {
+if (currentReader != null) {
+currentReader.notifyCheckpointComplete(checkpointId);
+}
+}
+
+@Override
+public void notifyCheckpointAborted(long checkpointId) throws Exception {
+if (currentReader != null) {
+currentReader.notifyCheckpointAborted(checkpointId);
+}
+}
+
+@Override
 public CompletableFuture isAvailable() {
 return availabilityFuture;
 }
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
index 5a2c010..0f2b036 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
@@ -176,6 +176,16 @@ public class HybridSourceSplitEnumerator
 }
 
 @Override
+public void notifyCheckpointComplete(long checkpointId) throws Exception {
+currentEnumerator.notifyCheckpointComplete(checkpointId);
+}
+
+@Override
+public void notifyCheckpointAborted(long checkpointId) throws Exception {
+currentEnumerator.notifyCheckpointAborted(checkpointId);
+}
+
+@Override
 public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
 LOG.debug(
 "handleSourceEvent {} subtask={} pendingSplits={}",
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
index 3bf77f3..7882333 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.mock.Whitebox;
 import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -165,6 +166,39 @@ public class HybridSourceReaderTest {
 reader.close();
 }
 
+@Test
+public void testDefaultMethodDelegation() throws Exception {
+TestingReaderContext readerContext = new TestingReaderContext();
+TestingReaderOutput readerOutput = new 
TestingReaderOutput<>();
+MockBaseSource source =
+new MockBaseSource(1, 1, Boundedness.BOUNDED) {
+@Override
+public SourceReader createReader(
+SourceReaderContext readerContext) {
+return Mockito.spy(super.createReader(readerContext));
+}
+};
+
+Map switchedSources = new HashMap<>();
+
+HybridSourceReader reader =
+new HybridSourceReader<>(readerContext, switchedSourc

[flink] branch master updated: [FLINK-24010][connector/common] HybridSourceReader/Enumerator delegate checkpoint notifications

2021-08-29 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new da944a8  [FLINK-24010][connector/common] HybridSourceReader/Enumerator 
delegate checkpoint notifications
da944a8 is described below

commit da944a8c90477d7b0210024028abd1011e250f14
Author: Thomas Weise 
AuthorDate: Thu Aug 26 14:22:01 2021 -0700

[FLINK-24010][connector/common] HybridSourceReader/Enumerator delegate 
checkpoint notifications
---
 .../base/source/hybrid/HybridSourceReader.java | 14 +
 .../source/hybrid/HybridSourceSplitEnumerator.java | 10 +++
 .../base/source/hybrid/HybridSourceReaderTest.java | 34 ++
 .../hybrid/HybridSourceSplitEnumeratorTest.java| 21 +
 4 files changed, 79 insertions(+)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
index cdd2e8b..28d4011 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
@@ -121,6 +121,20 @@ public class HybridSourceReader implements 
SourceReader
 }
 
 @Override
+public void notifyCheckpointComplete(long checkpointId) throws Exception {
+if (currentReader != null) {
+currentReader.notifyCheckpointComplete(checkpointId);
+}
+}
+
+@Override
+public void notifyCheckpointAborted(long checkpointId) throws Exception {
+if (currentReader != null) {
+currentReader.notifyCheckpointAborted(checkpointId);
+}
+}
+
+@Override
 public CompletableFuture isAvailable() {
 return availabilityFuture;
 }
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
index 5a2c010..0f2b036 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
@@ -176,6 +176,16 @@ public class HybridSourceSplitEnumerator
 }
 
 @Override
+public void notifyCheckpointComplete(long checkpointId) throws Exception {
+currentEnumerator.notifyCheckpointComplete(checkpointId);
+}
+
+@Override
+public void notifyCheckpointAborted(long checkpointId) throws Exception {
+currentEnumerator.notifyCheckpointAborted(checkpointId);
+}
+
+@Override
 public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
 LOG.debug(
 "handleSourceEvent {} subtask={} pendingSplits={}",
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
index 3bf77f3..7882333 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.mock.Whitebox;
 import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -165,6 +166,39 @@ public class HybridSourceReaderTest {
 reader.close();
 }
 
+@Test
+public void testDefaultMethodDelegation() throws Exception {
+TestingReaderContext readerContext = new TestingReaderContext();
+TestingReaderOutput readerOutput = new 
TestingReaderOutput<>();
+MockBaseSource source =
+new MockBaseSource(1, 1, Boundedness.BOUNDED) {
+@Override
+public SourceReader createReader(
+SourceReaderContext readerContext) {
+return Mockito.spy(super.createReader(readerContext));
+}
+};
+
+Map switchedSources = new HashMap<>();
+
+HybridSourceReader reader =
+new HybridSourceReader<>(readerContext, switchedSourc

[flink] 02/03: [FLINK-22791][docs] Documentation for HybridSource

2021-08-25 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f8a6d31c4e4e180421517f7e700c731e58be37d8
Author: Thomas Weise 
AuthorDate: Sun Aug 22 13:56:13 2021 -0700

[FLINK-22791][docs] Documentation for HybridSource
---
 .../docs/connectors/datastream/hybridsource.md | 100 +
 1 file changed, 100 insertions(+)

diff --git a/docs/content/docs/connectors/datastream/hybridsource.md 
b/docs/content/docs/connectors/datastream/hybridsource.md
new file mode 100644
index 000..02f5077
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/hybridsource.md
@@ -0,0 +1,100 @@
+---
+title: Hybrid Source
+weight: 8
+type: docs
+aliases:
+---
+
+
+# Hybrid Source
+
+`HybridSource` is a source that contains a list of concrete [sources]({{< ref 
"docs/dev/datastream/sources" >}}).
+It solves the problem of sequentially reading input from heterogeneous sources 
to produce a single input stream.
+
+For example, a bootstrap use case may need to read several days worth of 
bounded input from S3 before continuing with the latest unbounded input from 
Kafka.
+`HybridSource` switches from `FileSource` to `KafkaSource` when the bounded 
file input finishes without  interrupting the application.
+
+Prior to `HybridSource`, it was necessary to create a topology with multiple 
sources and define a switching mechanism in user land, which leads to 
operational complexity and inefficiency.
+
+With `HybridSource` the multiple sources appear as a single source in the 
Flink job graph and from `DataStream` API perspective.
+
+For more background see 
[FLIP-150](https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source)
+
+To use the connector, add the ```flink-connector-base``` dependency to your 
project:
+
+{{< artifact flink-connector-base >}}
+
+(Typically comes as transitive dependency with concrete sources.)
+
+## Start position for next source
+
+To arrange multiple sources in a `HybridSource`, all sources except the last 
one need to be bounded. Therefore, the sources typically need to be assigned a 
start and end position. The last source may be bounded in which case the 
`HybridSource` is bounded and unbounded otherwise.
+Details depend on the specific source and the external storage systems.
+
+Here we cover the most basic and then a more complex scenario, following the 
File/Kafka example. 
+
+ Fixed start position at graph construction time
+
+Example: Read till pre-determined switch time from files and then continue 
reading from Kafka.
+Each source covers an upfront known range and therefore the contained sources 
can be created upfront as if they were used directly:
+
+```java
+long switchTimestamp = ...; // derive from file input paths
+FileSource fileSource =
+  FileSource.forRecordStreamFormat(new TextLineFormat(), 
Path.fromLocalFile(testDir)).build();
+KafkaSource kafkaSource =
+  KafkaSource.builder()
+  
.setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
+  .build();
+HybridSource hybridSource =
+  HybridSource.builder(fileSource)
+  .addSource(kafkaSource)
+  .build();
+```  
+
+ Dynamic start position at switch time
+
+Example: File source reads a very large backlog, taking potentially longer 
than retention available for next source.
+Switch needs to occur at "current time - X". This requires the start time for 
the next source to be set at switch time.
+Here we require transfer of end position from the previous file enumerator for 
deferred construction of `KafkaSource`
+by implementing `SourceFactory`.
+
+Note that enumerators need to support getting the end timestamp. This may 
currently require a source customization.
+Adding support for dynamic end position to `FileSource` is tracked in 
[FLINK-23633](https://issues.apache.org/jira/browse/FLINK-23633).
+
+```java
+FileSource fileSource = CustomFileSource.readTillOneDayFromLatest();
+HybridSource hybridSource =
+HybridSource.builder(fileSource)
+.addSource(
+switchContext -> {
+  CustomFileSplitEnumerator previousEnumerator =
+  switchContext.getPreviousEnumerator();
+  // how to get timestamp depends on specific enumerator
+  long switchTimestamp = previousEnumerator.getEndTimestamp();
+  KafkaSource kafkaSource =
+  KafkaSource.builder()
+  
.setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
+  .build();
+  return kafkaSource;
+},
+Boundedness.CONTINUOUS_UNBOUNDED)
+.build();
+```


[flink] 01/03: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-08-25 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c72aa4b97848625f23bac331b8e916c6a3e2acd3
Author: Thomas Weise 
AuthorDate: Fri Feb 26 19:32:32 2021 -0800

[FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
---
 .../connector/base/source/hybrid/HybridSource.java | 257 ++
 .../source/hybrid/HybridSourceEnumeratorState.java |  38 ++
 .../HybridSourceEnumeratorStateSerializer.java | 106 ++
 .../base/source/hybrid/HybridSourceReader.java | 254 ++
 .../base/source/hybrid/HybridSourceSplit.java  |  94 +
 .../source/hybrid/HybridSourceSplitEnumerator.java | 390 +
 .../source/hybrid/HybridSourceSplitSerializer.java |  98 ++
 .../source/hybrid/SourceReaderFinishedEvent.java   |  49 +++
 .../base/source/hybrid/SwitchSourceEvent.java  |  62 
 .../base/source/hybrid/HybridSourceITCase.java | 246 +
 .../base/source/hybrid/HybridSourceReaderTest.java | 181 ++
 .../hybrid/HybridSourceSplitEnumeratorTest.java| 252 +
 .../hybrid/HybridSourceSplitSerializerTest.java|  52 +++
 .../base/source/hybrid/HybridSourceTest.java   | 115 ++
 14 files changed, 2194 insertions(+)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
new file mode 100644
index 000..e3d66de
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Hybrid source that switches underlying sources based on configured source 
chain.
+ *
+ * A simple example with FileSource and KafkaSource with fixed Kafka start 
position:
+ *
+ * {@code
+ * FileSource fileSource =
+ *   FileSource.forRecordStreamFormat(new TextLineFormat(), 
Path.fromLocalFile(testDir)).build();
+ * KafkaSource kafkaSource =
+ *   KafkaSource.builder()
+ *   .setBootstrapServers("localhost:9092")
+ *   .setGroupId("MyGroup")
+ *   .setTopics(Arrays.asList("quickstart-events"))
+ *   .setDeserializer(
+ *   
KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
+ *   .setStartingOffsets(OffsetsInitializer.earliest())
+ *   .build();
+ * HybridSource hybridSource =
+ *   HybridSource.builder(fileSource)
+ *   .addSource(kafkaSource)
+ *   .build();
+ * }
+ *
+ * A more complex example with Kafka start position derived from previous 
source:
+ *
+ * {@code
+ * HybridSource hybridSource =
+ * HybridSource.builder(fileSource)
+ * .addSource(
+ * switchContext -> {
+ *   StaticFileSplitEnumerator previousEnumerator =
+ *   switchContext.getPreviousEnumerator();
+ *   // how to get timestamp depends on specific enumerator
+ *   long timestamp = previousEnumerator.getEndTime

[flink] 03/03: [FLINK-22791][docs] HybridSource release availability for 1.13.x

2021-08-25 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f74fc331ac8305df0539fb1cbfa1d4722d8c1c56
Author: Thomas Weise 
AuthorDate: Tue Aug 24 17:06:18 2021 -0700

[FLINK-22791][docs] HybridSource release availability for 1.13.x
---
 docs/content/docs/connectors/datastream/hybridsource.md | 4 
 1 file changed, 4 insertions(+)

diff --git a/docs/content/docs/connectors/datastream/hybridsource.md 
b/docs/content/docs/connectors/datastream/hybridsource.md
index 02f5077..ff0c559 100644
--- a/docs/content/docs/connectors/datastream/hybridsource.md
+++ b/docs/content/docs/connectors/datastream/hybridsource.md
@@ -25,6 +25,10 @@ under the License.
 
 # Hybrid Source
 
+{{< hint info >}}
+This feature is available starting from release 1.13.3
+{{< /hint >}}
+
 `HybridSource` is a source that contains a list of concrete [sources]({{< ref 
"docs/dev/datastream/sources" >}}).
 It solves the problem of sequentially reading input from heterogeneous sources 
to produce a single input stream.
 


[flink] branch release-1.13 updated (4b682e8 -> f74fc33)

2021-08-25 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 4b682e8  [FLINK-23920][table-common] Keep primary key when inferring 
schema with SchemaTranslator
 new c72aa4b  [FLINK-22670][FLIP-150][connector/common] Hybrid source 
baseline
 new f8a6d31  [FLINK-22791][docs] Documentation for HybridSource
 new f74fc33  [FLINK-22791][docs] HybridSource release availability for 
1.13.x

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../docs/connectors/datastream/hybridsource.md | 104 ++
 .../connector/base/source/hybrid/HybridSource.java | 257 ++
 .../source/hybrid/HybridSourceEnumeratorState.java |  38 ++
 .../HybridSourceEnumeratorStateSerializer.java | 106 ++
 .../base/source/hybrid/HybridSourceReader.java | 254 ++
 .../base/source/hybrid/HybridSourceSplit.java  |  94 +
 .../source/hybrid/HybridSourceSplitEnumerator.java | 390 +
 .../source/hybrid/HybridSourceSplitSerializer.java |  98 ++
 .../source/hybrid/SourceReaderFinishedEvent.java   |  49 +++
 .../base/source/hybrid/SwitchSourceEvent.java  |  62 
 .../base/source/hybrid/HybridSourceITCase.java | 246 +
 .../base/source/hybrid/HybridSourceReaderTest.java | 181 ++
 .../hybrid/HybridSourceSplitEnumeratorTest.java| 252 +
 .../hybrid/HybridSourceSplitSerializerTest.java|  52 +++
 .../base/source/hybrid/HybridSourceTest.java   | 115 ++
 15 files changed, 2298 insertions(+)
 create mode 100644 docs/content/docs/connectors/datastream/hybridsource.md
 create mode 100644 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
 create mode 100644 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java
 create mode 100644 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorStateSerializer.java
 create mode 100644 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
 create mode 100644 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplit.java
 create mode 100644 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
 create mode 100644 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializer.java
 create mode 100644 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SourceReaderFinishedEvent.java
 create mode 100644 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchSourceEvent.java
 create mode 100644 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceITCase.java
 create mode 100644 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
 create mode 100644 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
 create mode 100644 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializerTest.java
 create mode 100644 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceTest.java


[flink] branch master updated (543f6b7 -> fb7fbd8)

2021-08-24 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 543f6b7  [hotfix] Add missing import AkkaOptions to 
MiniClusterConfiguration
 add fb7fbd8  [FLINK-22791][docs] Documentation for HybridSource

No new revisions were added by this update.

Summary of changes:
 .../docs/connectors/datastream/hybridsource.md | 100 +
 1 file changed, 100 insertions(+)
 create mode 100644 docs/content/docs/connectors/datastream/hybridsource.md


[flink] branch master updated (233e5d4 -> 23b048b)

2021-08-04 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 233e5d4  [hotfix][streaming] Removed useless calculateThroughput in 
the test
 add 23b048b  [FLINK-22670][FLIP-150][connector/common] Hybrid source 
baseline

No new revisions were added by this update.

Summary of changes:
 .../connector/base/source/hybrid/HybridSource.java | 257 ++
 .../source/hybrid/HybridSourceEnumeratorState.java |  38 ++
 .../HybridSourceEnumeratorStateSerializer.java | 106 ++
 .../base/source/hybrid/HybridSourceReader.java | 254 ++
 .../base/source/hybrid/HybridSourceSplit.java  |  94 +
 .../source/hybrid/HybridSourceSplitEnumerator.java | 390 +
 .../source/hybrid/HybridSourceSplitSerializer.java |  98 ++
 .../source/hybrid/SourceReaderFinishedEvent.java   |  49 +++
 .../base/source/hybrid/SwitchSourceEvent.java  |  62 
 .../base/source/hybrid/HybridSourceITCase.java | 246 +
 .../base/source/hybrid/HybridSourceReaderTest.java | 181 ++
 .../hybrid/HybridSourceSplitEnumeratorTest.java| 252 +
 .../hybrid/HybridSourceSplitSerializerTest.java|  52 +++
 .../base/source/hybrid/HybridSourceTest.java   | 115 ++
 14 files changed, 2194 insertions(+)
 create mode 100644 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
 create mode 100644 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java
 create mode 100644 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorStateSerializer.java
 create mode 100644 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
 create mode 100644 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplit.java
 create mode 100644 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
 create mode 100644 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializer.java
 create mode 100644 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SourceReaderFinishedEvent.java
 create mode 100644 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchSourceEvent.java
 create mode 100644 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceITCase.java
 create mode 100644 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
 create mode 100644 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
 create mode 100644 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializerTest.java
 create mode 100644 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceTest.java


[flink] branch master updated: [FLINK-22778] Upgrade to JUnit 4.13

2021-05-26 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 15b5a06  [FLINK-22778] Upgrade to JUnit 4.13
15b5a06 is described below

commit 15b5a06efa3eccd52d359ebf9c28e69392b6805d
Author: Thomas Weise 
AuthorDate: Tue May 25 20:31:07 2021 -0700

[FLINK-22778] Upgrade to JUnit 4.13
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 72f7ab6..ea7919f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -127,7 +127,7 @@ under the License.
1.10.0

1.2.0
2.3.1
-   4.12
+   4.13.2
2.21.0
2.0.4
1.3


[flink] branch release-1.12 updated (1de3d63 -> be33d47)

2021-05-06 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 1de3d63  [FLINK-17170][kinesis] Fix deadlock during 
stop-with-savepoint.
 add fc886c7  [FLINK-20114][connector/kafka] KafkaSourceReader should not 
commit offsets for partitions whose offsets have not been initialized.
 add 0786be4  [hotfix][connector/kafka] Reduce the offset commit logging 
verbosity from INFO to DEBUG.
 add 5909ecf  [FLINK-20114][connector/common] SourceOperatorStreamTask 
should update the numRecordsOutCount metric
 add 79ea3bf  [FLINK-20114][connector/kafka] KafkaSourceEnumerator should 
close the admin client early if periodic partition discovery is disabled.
 add 62fc4a1  [hotfix][connector/kafka] Remove the unused close.timeout.ms 
config.
 add c618de9  [FLINK-20114][connector/kafka] 
PartitionOffsetsRetrieverImpl.committedOffsets() should handle the case without 
committed offsets.
 add 1eeeaea  [FLINK-20114][connector/kafka] SourceOperatorStreamTask 
should check the committed offset first before using OffsetResetStrategy.
 add a0ba502  [FLINK-20114][connector/kafka] Auto offset commit should be 
disabled by default.
 add be33d47  [FLINK-20114][connector/kafka] Remove duplicated warning and 
remove redundant default value for partition.discovery.interval.ms

No new revisions were added by this update.

Summary of changes:
 .../connector/kafka/source/KafkaSourceBuilder.java |  9 +++-
 .../connector/kafka/source/KafkaSourceOptions.java |  8 +---
 .../source/enumerator/KafkaSourceEnumerator.java   | 17 +--
 .../initializer/SpecifiedOffsetsInitializer.java   |  6 ++
 .../kafka/source/reader/KafkaSourceReader.java | 24 ++
 .../initializer/OffsetsInitializerTest.java| 21 ++-
 .../kafka/source/reader/KafkaSourceReaderTest.java | 14 +
 .../io/StreamMultipleInputProcessorFactory.java|  6 +-
 .../runtime/tasks/SourceOperatorStreamTask.java| 18 ++--
 9 files changed, 92 insertions(+), 31 deletions(-)


[flink] branch release-1.12 updated (719bf5e -> dbdacac)

2021-03-24 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 719bf5e  [FLINK-21944][python] Perform null check before closing 
arrowSerializer
 add dbdacac  [FLINK-20533][datadog][release-1.12] Backport 
flink-metrics-datadog changes from master (#15328)

No new revisions were added by this update.

Summary of changes:
 docs/deployment/metric_reporters.md|   4 +
 docs/deployment/metric_reporters.zh.md |   4 +
 .../org/apache/flink/metrics/datadog/DCounter.java |   7 +-
 .../org/apache/flink/metrics/datadog/DGauge.java   |   6 +-
 .../apache/flink/metrics/datadog/DHistogram.java   |  81 +++
 .../org/apache/flink/metrics/datadog/DMeter.java   |   2 +-
 .../org/apache/flink/metrics/datadog/DMetric.java  |  48 ++--
 .../org/apache/flink/metrics/datadog/DSeries.java  |  23 +-
 .../flink/metrics/datadog/DatadogHttpReporter.java |  20 +-
 .../datadog/{DGauge.java => MetricMetaData.java}   |  48 ++--
 .../{DataCenter.java => StaticDMetric.java}|  18 +-
 .../metrics/datadog/DatadogHttpClientTest.java | 269 ++---
 12 files changed, 301 insertions(+), 229 deletions(-)
 create mode 100644 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DHistogram.java
 copy 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/{DGauge.java
 => MetricMetaData.java} (53%)
 copy 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/{DataCenter.java
 => StaticDMetric.java} (73%)


[flink] branch release-1.12 updated: [FLINK-21169][kafka] flink-connector-base dependency should be scope compile

2021-02-02 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
 new d25ed8d  [FLINK-21169][kafka] flink-connector-base dependency should 
be scope compile
d25ed8d is described below

commit d25ed8d9ef6cfd147f76e41dd205752a5b707c3f
Author: Thomas Weise 
AuthorDate: Wed Jan 27 18:52:56 2021 -0800

[FLINK-21169][kafka] flink-connector-base dependency should be scope compile
---
 flink-connectors/flink-connector-kafka/pom.xml | 1 -
 1 file changed, 1 deletion(-)

diff --git a/flink-connectors/flink-connector-kafka/pom.xml 
b/flink-connectors/flink-connector-kafka/pom.xml
index 52d2ae9..7369d6b 100644
--- a/flink-connectors/flink-connector-kafka/pom.xml
+++ b/flink-connectors/flink-connector-kafka/pom.xml
@@ -87,7 +87,6 @@ under the License.
org.apache.flink
flink-connector-base
${project.version}
-   provided

 




[flink] branch master updated (dac3e72 -> a4f1174)

2021-01-27 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from dac3e72  [FLINK-21006] Fix hbase 1.4 tests on Hadoop 3.x
 add a4f1174  [FLINK-21059][kafka] KafkaSourceEnumerator does not honor 
consumer properties

No new revisions were added by this update.

Summary of changes:
 .../flink/connector/kafka/source/KafkaSource.java  |  2 +-
 .../source/enumerator/KafkaSourceEnumerator.java   | 11 +++--
 .../source/enumerator/KafkaEnumeratorTest.java | 56 --
 3 files changed, 61 insertions(+), 8 deletions(-)



[flink] branch master updated (09f1674 -> c7823ff)

2020-07-24 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 09f1674  [FLINK-18655][flink-runtime] Set 
failOnUnableToExtractRepoInfo to false for git-commit-id-plugin (#12941)
 add c7823ff  [FLINK-11547][flink-connector-kinesis] Fix 
JsonMappingException in DynamoDBStreamsSchema

No new revisions were added by this update.

Summary of changes:
 .../connectors/kinesis/serialization/DynamoDBStreamsSchema.java| 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)



[flink] branch release-1.10 updated (ff7b4f6 -> 99ea1aa)

2020-05-05 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git.


from ff7b4f6  [FLINK-17313] Fix type validation error when sink table ddl 
contains columns with precision of decimal/varchar (#11993)
 add 99ea1aa  [FLINK-17496][kinesis] Upgrade amazon-kinesis-producer to 
0.14.0

No new revisions were added by this update.

Summary of changes:
 flink-connectors/flink-connector-kinesis/pom.xml| 2 +-
 .../flink-connector-kinesis/src/main/resources/META-INF/NOTICE  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)



[flink] branch master updated (2849e39 -> a737cdc)

2020-05-05 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 2849e39  [FLINK-17515][yarn] Move YARN staging functionality to a 
separate class
 add a737cdc  [FLINK-17496][kinesis] Upgrade amazon-kinesis-producer to 
0.14.0

No new revisions were added by this update.

Summary of changes:
 flink-connectors/flink-connector-kinesis/pom.xml| 2 +-
 .../flink-connector-kinesis/src/main/resources/META-INF/NOTICE  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)



[flink] branch master updated (b098ce5 -> de5f8fa)

2020-03-04 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from b098ce5  [FLINK-15838] Dangling CountDownLatch.await(timeout)
 add de5f8fa  [FLINK-16393][kinesis] Skip record emitter thread creation 
w/o source sync

No new revisions were added by this update.

Summary of changes:
 .../kinesis/internals/KinesisDataFetcher.java  | 36 --
 1 file changed, 19 insertions(+), 17 deletions(-)



[flink] branch release-1.10 updated: [FLINK-13027][fs-connector] SFS bulk-encoded writer supports customized checkpoint policy

2020-01-04 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
 new c4acec8  [FLINK-13027][fs-connector] SFS bulk-encoded writer supports 
customized checkpoint policy
c4acec8 is described below

commit c4acec8975a03a57713a1cbc8543cbed1d83612d
Author: Ying 
AuthorDate: Fri Dec 20 15:12:09 2019 -0800

[FLINK-13027][fs-connector] SFS bulk-encoded writer supports customized 
checkpoint policy
---
 .../sink/filesystem/StreamingFileSink.java | 22 ++
 ...ingPolicy.java => CheckpointRollingPolicy.java} | 34 ++
 .../rollingpolicies/OnCheckpointRollingPolicy.java |  9 ++
 .../api/functions/sink/filesystem/TestUtils.java   |  3 ++
 4 files changed, 38 insertions(+), 30 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index cd4afc2..d8d9d6d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -73,7 +74,7 @@ import java.io.Serializable;
  * {@code "prefix-1-17.ext"} containing the data from {@code subtask 1} of the 
sink and is the {@code 17th} bucket
  * created by that subtask.
  * Part files roll based on the user-specified {@link RollingPolicy}. By 
default, a {@link DefaultRollingPolicy}
- * is used.
+ * is used for row-encoded sink output; a {@link OnCheckpointRollingPolicy} is 
used for bulk-encoded sink output.
  *
  * In some scenarios, the open buckets are required to change based on 
time. In these cases, the user
  * can specify a {@code bucketCheckInterval} (by default 1m) and the sink will 
check periodically and roll
@@ -268,7 +269,7 @@ public class StreamingFileSink
 
public  StreamingFileSink.RowFormatBuilder> withNewBucketAssignerAndPolicy(final 
BucketAssigner assigner, final RollingPolicy policy) {
Preconditions.checkState(bucketFactory.getClass() == 
DefaultBucketFactoryImpl.class, "newBuilderWithBucketAssignerAndPolicy() cannot 
be called after specifying a customized bucket factory");
-   return new RowFormatBuilder<>(basePath, encoder, 
Preconditions.checkNotNull(assigner), Preconditions.checkNotNull(policy), 
bucketCheckInterval, new DefaultBucketFactoryImpl<>(), outputFileConfig);
+   return new RowFormatBuilder(basePath, encoder, 
Preconditions.checkNotNull(assigner), Preconditions.checkNotNull(policy), 
bucketCheckInterval, new DefaultBucketFactoryImpl<>(), outputFileConfig);
}
 
/** Creates the actual sink. */
@@ -311,24 +312,29 @@ public class StreamingFileSink
 
private BucketAssigner bucketAssigner;
 
+   private CheckpointRollingPolicy rollingPolicy;
+
private BucketFactory bucketFactory;
 
private OutputFileConfig outputFileConfig;
 
protected BulkFormatBuilder(Path basePath, 
BulkWriter.Factory writerFactory, BucketAssigner assigner) {
-   this(basePath, writerFactory, assigner, 
DEFAULT_BUCKET_CHECK_INTERVAL, new DefaultBucketFactoryImpl<>(), 
OutputFileConfig.builder().build());
+   this(basePath, writerFactory, assigner, 
OnCheckpointRollingPolicy.build(), DEFAULT_BUCKET_CHECK_INTERVAL,
+   new DefaultBucketFactoryImpl<>(), 
OutputFileConfig.builder().build());
}
 
protected BulkFormatBuilder(
Path basePath,
BulkWriter.Factory writerFactory,
BucketAssigner assigner,
+   CheckpointRollingPolicy policy,
long bucketCheckInterval,
   

[flink] branch master updated (ec8ea8b -> df5eb21)

2020-01-03 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from ec8ea8b  [FLINK-13596][ml] Improve fallback conversion and test for 
Table transformation.
 add df5eb21  [FLINK-13027][fs-connector] SFS bulk-encoded writer supports 
customized checkpoint policy

No new revisions were added by this update.

Summary of changes:
 .../sink/filesystem/StreamingFileSink.java | 22 ++
 ...ingPolicy.java => CheckpointRollingPolicy.java} | 34 ++
 .../rollingpolicies/OnCheckpointRollingPolicy.java |  9 ++
 .../api/functions/sink/filesystem/TestUtils.java   |  3 ++
 4 files changed, 38 insertions(+), 30 deletions(-)
 copy 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/{OnCheckpointRollingPolicy.java
 => CheckpointRollingPolicy.java} (60%)



[flink] branch release-1.10 updated: [FLINK-15301] [kinesis] Exception propagation from record emitter to source thread

2019-12-30 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
 new c4e7baa  [FLINK-15301] [kinesis] Exception propagation from record 
emitter to source thread
c4e7baa is described below

commit c4e7baaedab0be13c08b9fc0b3e6bef9d8668791
Author: Thomas Weise 
AuthorDate: Tue Dec 17 14:56:43 2019 -0800

[FLINK-15301] [kinesis] Exception propagation from record emitter to source 
thread
---
 .../kinesis/internals/KinesisDataFetcher.java  | 14 +++-
 .../connectors/kinesis/util/RecordEmitter.java | 13 ++-
 .../kinesis/FlinkKinesisConsumerTest.java  | 26 +-
 3 files changed, 46 insertions(+), 7 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index afd6138..52d6293 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -497,7 +497,19 @@ public class KinesisDataFetcher {

Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_IDLE_INTERVAL_MILLIS)));
 
// run record emitter in separate thread since main 
thread is used for discovery
-   Thread thread = new Thread(this.recordEmitter);
+   Runnable recordEmitterRunnable = new Runnable() {
+   @Override
+   public void run() {
+   try {
+   recordEmitter.run();
+   } catch (Throwable error) {
+   // report the error that 
terminated the emitter loop to source thread
+   stopWithError(error);
+   }
+   }
+   };
+
+   Thread thread = new Thread(recordEmitterRunnable);
thread.setName("recordEmitter-" + 
runtimeContext.getTaskNameWithSubtasks());
thread.setDaemon(true);
thread.start();
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
index 95c3688..dfa3388 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
@@ -170,7 +170,14 @@ public abstract class RecordEmitter implements Runna
@Override
public void run() {
LOG.info("Starting emitter with maxLookaheadMillis: {}", 
this.maxLookaheadMillis);
+   emitRecords();
+   }
+
+   public void stop() {
+   running = false;
+   }
 
+   protected void emitRecords() {
// emit available records, ordered by timestamp
AsyncRecordQueue min = heads.poll();
runLoop:
@@ -248,12 +255,8 @@ public abstract class RecordEmitter implements Runna
}
}
 
-   public void stop() {
-   running = false;
-   }
-
/** Emit the record. This is specific to a connector, like the Kinesis 
data fetcher. */
-   public abstract void emit(T record, RecordQueue source);
+   protected abstract void emit(T record, RecordQueue source);
 
/** Summary of emit queues that can be used for logging. */
public String printInfo() {
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 1a910cb..d2d637c 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -75,6 +75,7 @@ import org.powermock.api.mockito.P

[flink] branch master updated (8525c37 -> c2c38b1)

2019-12-20 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 8525c37  [FLINK-15308][runtime] Remove data compression for pipelined 
partition
 add c2c38b1  [FLINK-15301] [kinesis] Exception propagation from record 
emitter to source thread

No new revisions were added by this update.

Summary of changes:
 .../kinesis/internals/KinesisDataFetcher.java  | 14 +++-
 .../connectors/kinesis/util/RecordEmitter.java | 13 ++-
 .../kinesis/FlinkKinesisConsumerTest.java  | 26 +-
 3 files changed, 46 insertions(+), 7 deletions(-)



[flink] branch release-1.9 updated: Update KPL version to 0.13.1

2019-10-24 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new e7650ff  Update KPL version to 0.13.1
e7650ff is described below

commit e7650ffa1eccb914a438f672b2311b826565b2b8
Author: Abhilasha Seth 
AuthorDate: Thu Oct 17 15:37:40 2019 +

Update KPL version to 0.13.1

This fixes
https://github.com/awslabs/amazon-kinesis-producer/issues/224. The issue
has been fixed in Flink 1.10.0 as part of FLINK-12847.
---
 flink-connectors/flink-connector-kinesis/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-kinesis/pom.xml 
b/flink-connectors/flink-connector-kinesis/pom.xml
index fcb9818..7efe5d7 100644
--- a/flink-connectors/flink-connector-kinesis/pom.xml
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -35,7 +35,7 @@ under the License.

1.11.319
1.9.0
-   0.12.9
+   0.13.1

1.4.0

 



[flink] branch release-1.8 updated: Upgrade KPL version to 0.13.1

2019-10-24 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
 new 148dedd  Upgrade KPL version to 0.13.1
148dedd is described below

commit 148dedd44f1f4fc537d8522d5f4cd04406031a9a
Author: Abhilasha Seth 
AuthorDate: Wed Oct 9 16:22:10 2019 -0700

Upgrade KPL version to 0.13.1

This commit mitigates the issue - 
https://github.com/awslabs/amazon-kinesis-producer/issues/224

This closes #14175
---
 flink-connectors/flink-connector-kinesis/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-kinesis/pom.xml 
b/flink-connectors/flink-connector-kinesis/pom.xml
index b131e0d..f79d225 100644
--- a/flink-connectors/flink-connector-kinesis/pom.xml
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -35,7 +35,7 @@ under the License.

1.11.319
1.9.0
-   0.12.9
+   0.13.1

1.4.0

 



[flink-web] branch asf-site updated: Rebuild website

2019-10-09 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new d2ee4f0  Rebuild website
d2ee4f0 is described below

commit d2ee4f0cc63408b86b029019df30b260fb2ff967
Author: Thomas Weise 
AuthorDate: Wed Oct 9 11:50:41 2019 -0700

Rebuild website
---
 content/2019/05/03/pulsar-flink.html   | 3 +++
 content/2019/05/14/temporal-tables.html| 3 +++
 content/2019/05/19/state-ttl.html  | 3 +++
 content/2019/06/05/flink-network-stack.html| 3 +++
 content/2019/06/26/broadcast-state.html| 3 +++
 content/2019/07/23/flink-network-stack-2.html  | 3 +++
 content/blog/index.html| 3 +++
 content/blog/page2/index.html  | 3 +++
 content/blog/page3/index.html  | 3 +++
 content/blog/page4/index.html  | 3 +++
 content/blog/page5/index.html  | 3 +++
 content/blog/page6/index.html  | 3 +++
 content/blog/page7/index.html  | 3 +++
 content/blog/page8/index.html  | 3 +++
 content/blog/page9/index.html  | 3 +++
 content/blog/release_1.0.0-changelog_known_issues.html | 3 +++
 content/blog/release_1.1.0-changelog.html  | 3 +++
 content/blog/release_1.2.0-changelog.html  | 3 +++
 content/blog/release_1.3.0-changelog.html  | 3 +++
 content/community.html | 3 +++
 content/contributing/code-style-and-quality-common.html| 3 +++
 content/contributing/code-style-and-quality-components.html| 3 +++
 content/contributing/code-style-and-quality-formatting.html| 3 +++
 content/contributing/code-style-and-quality-java.html  | 3 +++
 content/contributing/code-style-and-quality-preamble.html  | 3 +++
 content/contributing/code-style-and-quality-pull-requests.html | 3 +++
 content/contributing/code-style-and-quality-scala.html | 3 +++
 content/contributing/contribute-code.html  | 3 +++
 content/contributing/contribute-documentation.html | 3 +++
 content/contributing/how-to-contribute.html| 3 +++
 content/contributing/improve-website.html  | 3 +++
 content/contributing/reviewing-prs.html| 3 +++
 content/documentation.html | 3 +++
 content/downloads.html | 3 +++
 content/ecosystem.html | 7 +++
 content/faq.html   | 3 +++
 content/feature/2019/09/13/state-processor-api.html| 3 +++
 content/features/2017/07/04/flink-rescalable-state.html| 3 +++
 content/features/2018/01/30/incremental-checkpointing.html | 3 +++
 .../features/2018/03/01/end-to-end-exactly-once-apache-flink.html  | 3 +++
 content/features/2019/03/11/prometheus-monitoring.html | 3 +++
 content/flink-applications.html| 3 +++
 content/flink-architecture.html| 3 +++
 content/flink-operations.html  | 3 +++
 content/gettinghelp.html   | 3 +++
 content/index.html | 3 +++
 content/material.html  | 3 +++
 content/news/2014/08/26/release-0.6.html   | 3 +++
 content/news/2014/09/26/release-0.6.1.html | 3 +++
 content/news/2014/10/03/upcoming_events.html   | 3 +++
 content/news/2014/11/04/release-0.7.0.html | 3 +++
 content/news/2014/11/18/hadoop-compatibility.html  | 3 +++
 content/news/2015/01/06/december-in-flink.html | 3 +++
 content/news/2015/01/21/release-0.8.html   | 3 +++
 content/news/2015/02/04/january-in-flink.html  | 3 +++
 content/news/2015/02/09/streaming-example.html | 3 +++
 content/news/2015/03/02/february-2015-in-flink.html| 3 +++
 .../news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html| 3 +++
 content/news/2015/04/07/march-in-flink.html| 3 +++
 content/news/2015/04/13/release-0.9.0-milestone1.html

[flink] branch master updated (28c5264 -> 3f5c212)

2019-10-04 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 28c5264  [FLINK-13065][datastream][docs] Clarify key selector example
 add 3f5c212  [FLINK-14283][kinesis][docs] Update Kinesis consumer docs for 
recent feature additions

No new revisions were added by this update.

Summary of changes:
 docs/dev/connectors/kinesis.md | 66 +-
 1 file changed, 59 insertions(+), 7 deletions(-)



[flink] branch release-1.9 updated: [FLINK-14107][kinesis] Erroneous queue selection in record emitter may lead to deadlock

2019-09-18 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new 8186ec3  [FLINK-14107][kinesis] Erroneous queue selection in record 
emitter may lead to deadlock
8186ec3 is described below

commit 8186ec30e513f92ec91d14add875a2f7dd245af8
Author: Thomas Weise 
AuthorDate: Mon Sep 16 13:33:42 2019 -0700

[FLINK-14107][kinesis] Erroneous queue selection in record emitter may lead 
to deadlock
---
 .../connectors/kinesis/util/RecordEmitter.java |  5 ++
 .../connectors/kinesis/util/RecordEmitterTest.java | 66 --
 2 files changed, 65 insertions(+), 6 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
index da74b08..95c3688 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
@@ -236,6 +236,11 @@ public abstract class RecordEmitter implements Runna
}
if (record == null) {
this.emptyQueues.put(min, true);
+   } else if (nextQueue != null && nextQueue.headTimestamp 
> min.headTimestamp) {
+   // if we stopped emitting due to reaching max 
timestamp,
+   // the next queue may not be the new min
+   heads.offer(nextQueue);
+   nextQueue = min;
} else {
heads.offer(min);
}
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java
index 1948237..84949cc 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java
@@ -17,12 +17,14 @@
 
 package org.apache.flink.streaming.connectors.kinesis.util;
 
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
 
 import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -32,10 +34,10 @@ import java.util.concurrent.Executors;
 /** Test for {@link RecordEmitter}. */
 public class RecordEmitterTest {
 
-   static List results = 
Collections.synchronizedList(new ArrayList<>());
-
private class TestRecordEmitter extends RecordEmitter 
{
 
+   private List results = 
Collections.synchronizedList(new ArrayList<>());
+
private TestRecordEmitter() {
super(DEFAULT_QUEUE_CAPACITY);
}
@@ -68,14 +70,66 @@ public class RecordEmitterTest {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(emitter);
 
-   long timeout = System.currentTimeMillis() + 10_000;
-   while (results.size() != 4 && System.currentTimeMillis() < 
timeout) {
-   Thread.sleep(100);
+   Deadline dl = Deadline.fromNow(Duration.ofSeconds(10));
+   while (emitter.results.size() != 4 && dl.hasTimeLeft()) {
+   Thread.sleep(10);
}
emitter.stop();
executor.shutdownNow();
 
-   Assert.assertThat(results, Matchers.contains(one, five, two, 
ten));
+   Assert.assertThat(emitter.results, Matchers.contains(one, five, 
two, ten));
}
 
+   @Test
+   public void testRetainMinAfterReachingLimit() throws Exception {
+
+   TestRecordEmitter emitter = new TestRecordEmitter();
+
+   final TimestampedValue one = new 
TimestampedValue<>("1", 1);
+   final TimestampedValue two = new 
TimestampedValue<>("2", 2);
+   final TimestampedValue three = new 
TimestampedValue<>("3", 3);
+   final TimestampedValue ten = new 
TimestampedValue<>("10", 10);
+   final Timestam

[flink] branch release-1.8 updated: [FLINK-14107][kinesis] Erroneous queue selection in record emitter may lead to deadlock

2019-09-18 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
 new 4a4dae0  [FLINK-14107][kinesis] Erroneous queue selection in record 
emitter may lead to deadlock
4a4dae0 is described below

commit 4a4dae0d81b830d5c4245d61ca95dbffaf2a1f2d
Author: Thomas Weise 
AuthorDate: Mon Sep 16 13:33:42 2019 -0700

[FLINK-14107][kinesis] Erroneous queue selection in record emitter may lead 
to deadlock
---
 .../connectors/kinesis/util/RecordEmitter.java |  5 ++
 .../connectors/kinesis/util/RecordEmitterTest.java | 66 --
 2 files changed, 65 insertions(+), 6 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
index 17344b1..7fa21fe 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
@@ -239,6 +239,11 @@ public abstract class RecordEmitter implements Runna
}
if (record == null) {
this.emptyQueues.put(min, true);
+   } else if (nextQueue != null && nextQueue.headTimestamp 
> min.headTimestamp) {
+   // if we stopped emitting due to reaching max 
timestamp,
+   // the next queue may not be the new min
+   heads.offer(nextQueue);
+   nextQueue = min;
} else {
heads.offer(min);
}
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java
index 1948237..84949cc 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java
@@ -17,12 +17,14 @@
 
 package org.apache.flink.streaming.connectors.kinesis.util;
 
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
 
 import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -32,10 +34,10 @@ import java.util.concurrent.Executors;
 /** Test for {@link RecordEmitter}. */
 public class RecordEmitterTest {
 
-   static List results = 
Collections.synchronizedList(new ArrayList<>());
-
private class TestRecordEmitter extends RecordEmitter 
{
 
+   private List results = 
Collections.synchronizedList(new ArrayList<>());
+
private TestRecordEmitter() {
super(DEFAULT_QUEUE_CAPACITY);
}
@@ -68,14 +70,66 @@ public class RecordEmitterTest {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(emitter);
 
-   long timeout = System.currentTimeMillis() + 10_000;
-   while (results.size() != 4 && System.currentTimeMillis() < 
timeout) {
-   Thread.sleep(100);
+   Deadline dl = Deadline.fromNow(Duration.ofSeconds(10));
+   while (emitter.results.size() != 4 && dl.hasTimeLeft()) {
+   Thread.sleep(10);
}
emitter.stop();
executor.shutdownNow();
 
-   Assert.assertThat(results, Matchers.contains(one, five, two, 
ten));
+   Assert.assertThat(emitter.results, Matchers.contains(one, five, 
two, ten));
}
 
+   @Test
+   public void testRetainMinAfterReachingLimit() throws Exception {
+
+   TestRecordEmitter emitter = new TestRecordEmitter();
+
+   final TimestampedValue one = new 
TimestampedValue<>("1", 1);
+   final TimestampedValue two = new 
TimestampedValue<>("2", 2);
+   final TimestampedValue three = new 
TimestampedValue<>("3", 3);
+   final TimestampedValue ten = new 
TimestampedValue<>("10", 10);
+   final Timestam

[flink] branch master updated (f65ee55 -> 4a4a147)

2019-09-18 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from f65ee55  [FLINK-14017][python] Support to start up Python worker in 
process mode.
 add 4a4a147  [FLINK-14107][kinesis] Erroneous queue selection in record 
emitter may lead to deadlock

No new revisions were added by this update.

Summary of changes:
 .../connectors/kinesis/util/RecordEmitter.java |  5 ++
 .../connectors/kinesis/util/RecordEmitterTest.java | 66 --
 2 files changed, 65 insertions(+), 6 deletions(-)



[flink] branch release-1.9 updated: [hotfix][kinesis] Update emit record javadoc and don't count max watermark as timeout

2019-08-27 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new c905a4d  [hotfix][kinesis] Update emit record javadoc and don't count 
max watermark as timeout
c905a4d is described below

commit c905a4d323c0ed4985cdb9e5764efe13bc6183d0
Author: Thomas Weise 
AuthorDate: Mon Aug 26 15:02:40 2019 -0700

[hotfix][kinesis] Update emit record javadoc and don't count max watermark 
as timeout
---
 .../streaming/connectors/kinesis/internals/KinesisDataFetcher.java   | 5 +++--
 .../connectors/kinesis/util/JobManagerWatermarkTracker.java  | 4 +++-
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index f38e6eb..80b724b 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -715,7 +715,7 @@ public class KinesisDataFetcher {
// 

 
/**
-* Atomic operation to collect a record and update state to the 
sequence number of the record.
+* Prepare a record and hand it over to the {@link RecordEmitter}, 
which may collect it asynchronously.
 * This method is called by {@link ShardConsumer}s.
 *
 * @param record the record to collect
@@ -752,7 +752,8 @@ public class KinesisDataFetcher {
}
 
/**
-* Actual record emission called from the record emitter.
+* Atomic operation to collect a record and update state to the 
sequence number of the record.
+* This method is called from the record emitter.
 *
 * Responsible for tracking per shard watermarks and emit timestamps 
extracted from
 * the record, when a watermark assigner was configured.
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
index f150bb0..1581024 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
@@ -144,7 +144,9 @@ public class JobManagerWatermarkTracker extends 
WatermarkTracker {
WatermarkState ws = e.getValue();
if (ws.lastUpdated + updateTimeoutMillis < 
currentTime) {
// ignore outdated entry
-   updateTimeoutCount++;
+   if (ws.watermark < Long.MAX_VALUE) {
+   updateTimeoutCount++;
+   }
continue;
}
globalWatermark = Math.min(ws.watermark, 
globalWatermark);



[flink] branch master updated (0437ad2 -> b6bd8f6)

2019-08-27 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 0437ad2  [FLINK-13841][hive] Extend Hive version support to all 1.2 
and 2.3 versions
 add b6bd8f6  [hotfix][kinesis] Update emit record javadoc and don't count 
max watermark as timeout

No new revisions were added by this update.

Summary of changes:
 .../streaming/connectors/kinesis/internals/KinesisDataFetcher.java   | 5 +++--
 .../connectors/kinesis/util/JobManagerWatermarkTracker.java  | 4 +++-
 2 files changed, 6 insertions(+), 3 deletions(-)



[flink] branch release-1.9 updated: [FLINK-12595][kinesis] Interrupt thread at right time to avoid deadlock

2019-07-22 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new 7ea55e9  [FLINK-12595][kinesis] Interrupt thread at right time to 
avoid deadlock
7ea55e9 is described below

commit 7ea55e967bc450b3b744edcaea23834646e439cd
Author: Shannon Carey 
AuthorDate: Sat Jul 20 14:15:50 2019 -0500

[FLINK-12595][kinesis] Interrupt thread at right time to avoid deadlock

- Inside testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown,
consumerThread.interrupt() was getting absorbed inside
KinesisDataFetcher's while(running) loop, therefore
TestableKinesisDataFetcherForShardConsumerException's awaitTermination()
wasn't getting interrupted by it. This led to deadlock, with
KinesisDataFetcher waiting on the test code to send the interrupt, and
the test code waiting for KinesisDataFetcher to throw the expected
exception.
- Now, the test code waits until KinesisDataFetcher is inside
awaitTermination() before producing the interrupt, so it can be sure
that the interrupt it produces will be received/handled inside
awaitTermination().
---
 .../kinesis/internals/KinesisDataFetcherTest.java |  6 +-
 ...stableKinesisDataFetcherForShardConsumerException.java | 15 ++-
 2 files changed, 19 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 5255e61..2815193 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -846,7 +846,7 @@ public class KinesisDataFetcherTest extends TestLogger {
DummyFlinkKinesisConsumer consumer = new 
DummyFlinkKinesisConsumer<>(
TestUtils.getStandardProperties(), fetcher, 1, 0);
 
-   CheckedThread consumerThread = new CheckedThread() {
+   CheckedThread consumerThread = new 
CheckedThread("FlinkKinesisConsumer") {
@Override
public void go() throws Exception {
consumer.run(new TestSourceContext<>());
@@ -858,6 +858,10 @@ public class KinesisDataFetcherTest extends TestLogger {
// ShardConsumer exception (from deserializer) will result in 
fetcher being shut down.
fetcher.waitUntilShutdown(20, TimeUnit.SECONDS);
 
+   // Ensure that KinesisDataFetcher has exited its while(running) 
loop and is inside its awaitTermination()
+   // method before we interrupt its thread, so that our interrupt 
doesn't get absorbed by any other mechanism.
+   fetcher.waitUntilAwaitTermination(20, TimeUnit.SECONDS);
+
// Interrupt the thread so that 
KinesisDataFetcher#awaitTermination() will throw InterruptedException.
consumerThread.interrupt();
 
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
index c08b7af..6ae4391 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kinesis.testutils;
 
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
@@ -32,6 +33,8 @@ import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -39,6 +42,10 @@ import java.util.concurrent.atomic.AtomicReference;
  * {@link #awaitTermination(

[flink] branch master updated (5da6e4c -> 091266d)

2019-07-22 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 5da6e4c  [FLINK-13206]replace 'use database' with 'use' in sql client 
parser
 add 091266d  [FLINK-12595][kinesis] Interrupt thread at right time to 
avoid deadlock

No new revisions were added by this update.

Summary of changes:
 .../kinesis/internals/KinesisDataFetcherTest.java |  6 +-
 ...stableKinesisDataFetcherForShardConsumerException.java | 15 ++-
 2 files changed, 19 insertions(+), 2 deletions(-)



[flink] branch release-1.8 updated: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer

2019-05-31 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
 new 184e8e0  [FLINK-10921] [kinesis] Shard watermark synchronization in 
Kinesis consumer
184e8e0 is described below

commit 184e8e00fd20519a037cf0bf4e3dba7132d75fa1
Author: Thomas Weise 
AuthorDate: Wed May 22 21:42:15 2019 -0700

[FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer
---
 .../connectors/kinesis/FlinkKinesisConsumer.java   |  18 +-
 .../kinesis/config/ConsumerConfigConstants.java|  11 +
 .../internals/DynamoDBStreamsDataFetcher.java  |   1 +
 .../kinesis/internals/KinesisDataFetcher.java  | 292 +++--
 .../kinesis/util/JobManagerWatermarkTracker.java   | 179 +
 .../connectors/kinesis/util/RecordEmitter.java | 269 +++
 .../connectors/kinesis/util/WatermarkTracker.java  | 114 
 .../kinesis/FlinkKinesisConsumerMigrationTest.java |   2 +-
 .../kinesis/FlinkKinesisConsumerTest.java  | 185 +
 .../kinesis/internals/ShardConsumerTest.java   |   9 +-
 .../testutils/TestableKinesisDataFetcher.java  |   1 +
 .../util/JobManagerWatermarkTrackerTest.java   | 101 +++
 .../connectors/kinesis/util/RecordEmitterTest.java |  81 ++
 .../kinesis/util/WatermarkTrackerTest.java | 108 
 14 files changed, 1342 insertions(+), 29 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 3c5e3c7..5b24ded 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -45,6 +45,7 @@ import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.slf4j.Logger;
@@ -126,6 +127,7 @@ public class FlinkKinesisConsumer extends 
RichParallelSourceFunction imple
private KinesisShardAssigner shardAssigner = 
KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER;
 
private AssignerWithPeriodicWatermarks periodicWatermarkAssigner;
+   private WatermarkTracker watermarkTracker;
 
// 

//  Runtime state
@@ -254,6 +256,20 @@ public class FlinkKinesisConsumer extends 
RichParallelSourceFunction imple
ClosureCleaner.clean(this.periodicWatermarkAssigner, true);
}
 
+   public WatermarkTracker getWatermarkTracker() {
+   return this.watermarkTracker;
+   }
+
+   /**
+* Set the global watermark tracker. When set, it will be used by the 
fetcher
+* to align the shard consumers by event time.
+* @param watermarkTracker
+*/
+   public void setWatermarkTracker(WatermarkTracker watermarkTracker) {
+   this.watermarkTracker = watermarkTracker;
+   ClosureCleaner.clean(this.watermarkTracker, true);
+   }
+
// 

//  Source life cycle
// 

@@ -448,7 +464,7 @@ public class FlinkKinesisConsumer extends 
RichParallelSourceFunction imple
Properties configProps,
KinesisDeserializationSchema deserializationSchema) {
 
-   return new KinesisDataFetcher<>(streams, sourceContext, 
runtimeContext, configProps, deserializationSchema, shardAssigner, 
periodicWatermarkAssigner);
+   return new KinesisDataFetcher<>(streams, sourceContext, 
runtimeContext, configProps, deserializationSchema, shardAssigner, 
periodicWatermarkAssigner, watermarkTracker);
}
 
@VisibleForTesting
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 41ac6b8..2f5be97 100644
--- 
a/flink-connectors/flink-conne

[flink] branch master updated: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer

2019-05-29 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 793a784  [FLINK-10921] [kinesis] Shard watermark synchronization in 
Kinesis consumer
793a784 is described below

commit 793a78407aa22530448efbf18b714952eac40aba
Author: Thomas Weise 
AuthorDate: Wed May 22 21:42:15 2019 -0700

[FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer
---
 .../connectors/kinesis/FlinkKinesisConsumer.java   |  18 +-
 .../kinesis/config/ConsumerConfigConstants.java|  11 +
 .../internals/DynamoDBStreamsDataFetcher.java  |   1 +
 .../kinesis/internals/KinesisDataFetcher.java  | 292 +++--
 .../kinesis/util/JobManagerWatermarkTracker.java   | 179 +
 .../connectors/kinesis/util/RecordEmitter.java | 269 +++
 .../connectors/kinesis/util/WatermarkTracker.java  | 114 
 .../kinesis/FlinkKinesisConsumerMigrationTest.java |   2 +-
 .../kinesis/FlinkKinesisConsumerTest.java  | 185 +
 .../kinesis/internals/ShardConsumerTest.java   |   9 +-
 .../testutils/TestableKinesisDataFetcher.java  |   1 +
 .../util/JobManagerWatermarkTrackerTest.java   | 101 +++
 .../connectors/kinesis/util/RecordEmitterTest.java |  81 ++
 .../kinesis/util/WatermarkTrackerTest.java | 108 
 14 files changed, 1342 insertions(+), 29 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 3c5e3c7..5b24ded 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -45,6 +45,7 @@ import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.slf4j.Logger;
@@ -126,6 +127,7 @@ public class FlinkKinesisConsumer extends 
RichParallelSourceFunction imple
private KinesisShardAssigner shardAssigner = 
KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER;
 
private AssignerWithPeriodicWatermarks periodicWatermarkAssigner;
+   private WatermarkTracker watermarkTracker;
 
// 

//  Runtime state
@@ -254,6 +256,20 @@ public class FlinkKinesisConsumer extends 
RichParallelSourceFunction imple
ClosureCleaner.clean(this.periodicWatermarkAssigner, true);
}
 
+   public WatermarkTracker getWatermarkTracker() {
+   return this.watermarkTracker;
+   }
+
+   /**
+* Set the global watermark tracker. When set, it will be used by the 
fetcher
+* to align the shard consumers by event time.
+* @param watermarkTracker
+*/
+   public void setWatermarkTracker(WatermarkTracker watermarkTracker) {
+   this.watermarkTracker = watermarkTracker;
+   ClosureCleaner.clean(this.watermarkTracker, true);
+   }
+
// 

//  Source life cycle
// 

@@ -448,7 +464,7 @@ public class FlinkKinesisConsumer extends 
RichParallelSourceFunction imple
Properties configProps,
KinesisDeserializationSchema deserializationSchema) {
 
-   return new KinesisDataFetcher<>(streams, sourceContext, 
runtimeContext, configProps, deserializationSchema, shardAssigner, 
periodicWatermarkAssigner);
+   return new KinesisDataFetcher<>(streams, sourceContext, 
runtimeContext, configProps, deserializationSchema, shardAssigner, 
periodicWatermarkAssigner, watermarkTracker);
}
 
@VisibleForTesting
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 41ac6b8..2f5be97 100644
--- 
a/flink-connectors/flink-connector-kine

[flink] branch release-1.8 updated: [FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469)

2019-05-22 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
 new 7b862dc  [FLINK-12539] [fs-connector] Make StreamingFileSink 
customizable (#8469)
7b862dc is described below

commit 7b862dcd91a2506f99dd6e1feacab57bc78fa4e6
Author: Kailash Dayanand 
AuthorDate: Wed May 22 10:40:27 2019 -0700

[FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469)
---
 .../streaming/api/functions/sink/filesystem/StreamingFileSink.java| 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index dc0b1c6..f5b1bf9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -127,7 +127,7 @@ public class StreamingFileSink
/**
 * Creates a new {@code StreamingFileSink} that writes files to the 
given base directory.
 */
-   private StreamingFileSink(
+   protected StreamingFileSink(
final StreamingFileSink.BucketsBuilder 
bucketsBuilder,
final long bucketCheckInterval) {
 
@@ -170,7 +170,7 @@ public class StreamingFileSink
/**
 * The base abstract class for the {@link RowFormatBuilder} and {@link 
BulkFormatBuilder}.
 */
-   private abstract static class BucketsBuilder implements 
Serializable {
+   protected abstract static class BucketsBuilder implements 
Serializable {
 
private static final long serialVersionUID = 1L;
 



[flink] branch master updated (f652bb5 -> 7748729)

2019-05-22 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from f652bb5  [FLINK-12447][build] Set minimum maven version to 3.1.1
 add 7748729  [FLINK-12539] [fs-connector] Make StreamingFileSink 
customizable (#8469)

No new revisions were added by this update.

Summary of changes:
 .../streaming/api/functions/sink/filesystem/StreamingFileSink.java| 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)



[flink] branch release-1.8 updated (9795457 -> cf065a0)

2019-04-01 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 9795457  [FLINK-12064] [core, State Backends] RocksDBKeyedStateBackend 
snapshots uses incorrect key serializer if reconfigure happens during restore
 new edf082a  [FLINK-11501] [kafka] Add ratelimiting to Kafka consumer 
(#7679)
 new e42fa0e  [FLINK-11826] Ignore flaky testRateLimitedConsumer
 new cf065a0  [FLINK-11826][tests] Harden 
Kafka09ITCase#testRateLimitedConsumer

The 16089 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../connectors/kafka/FlinkKafkaConsumer010.java|  10 +-
 .../connectors/kafka/internal/Kafka010Fetcher.java |   6 +-
 .../kafka/internal/Kafka010FetcherTest.java|   6 +-
 .../connectors/kafka/FlinkKafkaConsumer09.java |  26 +++-
 .../connectors/kafka/internal/Kafka09Fetcher.java  |   7 +-
 .../kafka/internal/KafkaConsumerThread.java|  67 +-
 .../streaming/connectors/kafka/Kafka09ITCase.java  | 142 +
 .../kafka/internal/Kafka09FetcherTest.java |   6 +-
 .../kafka/internal/KafkaConsumerThreadTest.java| 139 +++-
 flink-core/pom.xml |   6 +
 .../io/ratelimiting/FlinkConnectorRateLimiter.java |  57 -
 .../GuavaFlinkConnectorRateLimiter.java|  79 
 12 files changed, 507 insertions(+), 44 deletions(-)
 copy 
flink-java/src/main/java/org/apache/flink/api/java/summarize/ObjectColumnSummary.java
 => 
flink-core/src/main/java/org/apache/flink/api/common/io/ratelimiting/FlinkConnectorRateLimiter.java
 (53%)
 create mode 100644 
flink-core/src/main/java/org/apache/flink/api/common/io/ratelimiting/GuavaFlinkConnectorRateLimiter.java



[flink] branch release-1.8 updated: [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test

2019-03-22 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
 new 11af452  [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test
11af452 is described below

commit 11af4523801164539e186d836462f5884b561941
Author: Thomas Weise 
AuthorDate: Thu Feb 28 16:11:50 2019 -0800

[FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test
---
 .../flink-streaming-kinesis-test/pom.xml   |  92 +++
 .../streaming/kinesis/test/KinesisExample.java |  91 +++
 .../streaming/kinesis/test/KinesisExampleTest.java | 127 
 .../kinesis/test/KinesisPubsubClient.java  | 128 +
 flink-end-to-end-tests/pom.xml |  21 
 flink-end-to-end-tests/run-pre-commit-tests.sh |   1 +
 flink-end-to-end-tests/test-scripts/common.sh  |   2 +-
 .../test-scripts/test_streaming_kinesis.sh |  63 ++
 8 files changed, 524 insertions(+), 1 deletion(-)

diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml 
b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
new file mode 100644
index 000..2774964
--- /dev/null
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
@@ -0,0 +1,92 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.8-SNAPSHOT
+   ..
+   
+   4.0.0
+
+   
flink-streaming-kinesis-test_${scala.binary.version}
+   flink-streaming-kinesis-test
+   jar
+
+   
+   
+   org.apache.flink
+   
flink-streaming-kafka-test-base_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kinesis_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   junit
+   junit
+   ${junit.version}
+   compile
+   
+   
+
+   
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   fat-jar-kinesis-example
+   package
+   
+   shade
+   
+   
+   
false
+   
false
+   
false
+   
+   
+   
org.apache.flink.streaming.kinesis.test.KinesisExample
+   
+   
+   
KinesisExample
+   
+   
+   
+   
+   
+   
+
+
diff --git 
a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java
 
b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java
new file mode 100644
index 000..4957c35
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtai

[flink] branch master updated: [FLINK-9007] [kinesis] [e2e] Build Kinesis test under include-kinesis profile

2019-03-18 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 4250819  [FLINK-9007] [kinesis] [e2e] Build Kinesis test under 
include-kinesis profile
4250819 is described below

commit 425081951162a2a5ea027cc8ee3137c688f3aada
Author: Thomas Weise 
AuthorDate: Sun Mar 17 11:23:24 2019 -0700

[FLINK-9007] [kinesis] [e2e] Build Kinesis test under include-kinesis 
profile
---
 flink-end-to-end-tests/pom.xml | 22 +-
 1 file changed, 21 insertions(+), 1 deletion(-)

diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 9f77483..c6674f4 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -66,9 +66,29 @@ under the License.
flink-streaming-kafka-test
flink-streaming-kafka011-test
flink-streaming-kafka010-test
-   flink-streaming-kinesis-test

 
+   
+   
+   
+   
+   include-kinesis
+   
+   
+   include-kinesis
+   
+   
+   
+   flink-streaming-kinesis-test
+   
+   
+   
+






[flink] branch master updated: [hotfix] [FLINK-9007] [kinesis] [e2e] Disable record aggregation

2019-03-16 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 92c7808  [hotfix] [FLINK-9007] [kinesis] [e2e] Disable record 
aggregation
92c7808 is described below

commit 92c7808e9a2d02b17d4cfddf2afe675260fe4a59
Author: Thomas Weise 
AuthorDate: Sat Mar 16 13:14:15 2019 -0700

[hotfix] [FLINK-9007] [kinesis] [e2e] Disable record aggregation
---
 .../java/org/apache/flink/streaming/kinesis/test/KinesisExample.java| 2 ++
 1 file changed, 2 insertions(+)

diff --git 
a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java
 
b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java
index 3c52f1f..4957c35 100644
--- 
a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java
+++ 
b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java
@@ -62,6 +62,8 @@ public class KinesisExample {
Properties producerProperties = new 
Properties(parameterTool.getProperties());
// producer needs region even when URL is specified

producerProperties.putIfAbsent(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+   // test driver does not deaggregate
+   producerProperties.putIfAbsent("AggregationEnabled", 
String.valueOf(false));
 
// KPL does not recognize endpoint URL..
String kinesisUrl = 
producerProperties.getProperty(ConsumerConfigConstants.AWS_ENDPOINT);



[flink] branch master updated: [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test

2019-03-15 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 19570c6  [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test
19570c6 is described below

commit 19570c6c2938628a7e44c6f2bf3ce7ec04959ae6
Author: Thomas Weise 
AuthorDate: Thu Feb 28 16:11:50 2019 -0800

[FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test
---
 .../flink-streaming-kinesis-test/pom.xml   |  92 +++
 .../streaming/kinesis/test/KinesisExample.java |  89 ++
 .../streaming/kinesis/test/KinesisExampleTest.java | 127 
 .../kinesis/test/KinesisPubsubClient.java  | 128 +
 flink-end-to-end-tests/pom.xml |   1 +
 flink-end-to-end-tests/run-pre-commit-tests.sh |   1 +
 flink-end-to-end-tests/test-scripts/common.sh  |   2 +-
 .../test-scripts/test_streaming_kinesis.sh |  63 ++
 8 files changed, 502 insertions(+), 1 deletion(-)

diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml 
b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
new file mode 100644
index 000..d16f25f
--- /dev/null
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
@@ -0,0 +1,92 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.9-SNAPSHOT
+   ..
+   
+   4.0.0
+
+   
flink-streaming-kinesis-test_${scala.binary.version}
+   flink-streaming-kinesis-test
+   jar
+
+   
+   
+   org.apache.flink
+   
flink-streaming-kafka-test-base_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kinesis_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   junit
+   junit
+   ${junit.version}
+   compile
+   
+   
+
+   
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   fat-jar-kinesis-example
+   package
+   
+   shade
+   
+   
+   
false
+   
false
+   
false
+   
+   
+   
org.apache.flink.streaming.kinesis.test.KinesisExample
+   
+   
+   
KinesisExample
+   
+   
+   
+   
+   
+   
+
+
diff --git 
a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java
 
b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java
new file mode 100644
index 000..3c52f1f
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtai

[flink] branch master updated: [FLINK-11826] Ignore flaky testRateLimitedConsumer

2019-03-05 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 56afa2e  [FLINK-11826] Ignore flaky testRateLimitedConsumer
56afa2e is described below

commit 56afa2e32a841922417e4ed02282d96c430875c2
Author: Thomas Weise 
AuthorDate: Tue Mar 5 12:37:09 2019 -0800

[FLINK-11826] Ignore flaky testRateLimitedConsumer
---
 .../java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java | 2 ++
 1 file changed, 2 insertions(+)

diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index cd73872..5a4e775 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
@@ -166,6 +167,7 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 * a desired rate of 3 bytes / second. Based on the execution time, the 
test asserts that this rate was not surpassed.
 * If no rate limiter is set on the consumer, the test should fail.
 */
+   @Ignore
@Test(timeout = 6)
public void testRateLimitedConsumer() throws Exception {
final String testTopic = "testRateLimitedConsumer";



  1   2   >