(flink) branch master updated (39779829b88 -> 7c4dec63eb4)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 39779829b88 [FLINK-34954][core] Kryo Input bug fix add 7c4dec63eb4 [FLINK-28048][connectors] Introduce Source API alternative to FiniteTestSource (#23777) No new revisions were added by this update. Summary of changes: .../f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e | 118 +++ flink-connectors/flink-connector-datagen/pom.xml | 16 +++ ...tion.java => IndexLookupGeneratorFunction.java} | 145 --- .../datagen/source/DataGeneratorSource.java| 2 +- ...ittingSourceReaderWithCheckpointsInBetween.java | 158 + .../datagen/source/TestDataGenerators.java | 83 +++ flink-connectors/flink-connector-hive/pom.xml | 8 ++ .../flink/connectors/hive/HiveTableSinkITCase.java | 49 --- .../connectors/hive/HiveTableSourceITCase.java | 40 +++--- .../source/lib/util/IteratorSourceReaderBase.java | 10 +- flink-formats/flink-avro/pom.xml | 8 ++ .../formats/avro/AvroStreamingFileSinkITCase.java | 25 +++- flink-formats/flink-compress/pom.xml | 8 ++ .../formats/compress/CompressionFactoryITCase.java | 10 +- flink-formats/flink-csv/pom.xml| 8 ++ flink-formats/flink-hadoop-bulk/pom.xml| 8 ++ .../bulk/HadoopPathBasedPartFileWriterITCase.java | 15 +- flink-formats/flink-json/pom.xml | 8 ++ flink-formats/flink-orc/pom.xml| 8 ++ .../flink/orc/writer/OrcBulkWriterITCase.java | 10 +- flink-formats/flink-parquet/pom.xml| 8 ++ .../avro/AvroParquetStreamingFileSinkITCase.java | 21 ++- .../ParquetProtoStreamingFileSinkITCase.java | 10 +- flink-formats/flink-sequence-file/pom.xml | 8 ++ .../SequenceStreamingFileSinkITCase.java | 12 +- flink-table/flink-table-planner/pom.xml| 10 +- .../runtime/stream/sql/CompactionITCaseBase.java | 20 +-- flink-tests/pom.xml| 2 + .../flink/test/streaming/runtime/SinkITCase.java | 75 +- 29 files changed, 652 insertions(+), 251 deletions(-) copy flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/{FromElementsGeneratorFunction.java => IndexLookupGeneratorFunction.java} (61%) create mode 100644 flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DoubleEmittingSourceReaderWithCheckpointsInBetween.java create mode 100644 flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/TestDataGenerators.java
(flink) branch master updated: [hotfix][docs] config key for parquet int64 option (#23909)
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 6334923af1e [hotfix][docs] config key for parquet int64 option (#23909) 6334923af1e is described below commit 6334923af1e31f5e3d08425ecca27e0ee021dae8 Author: Thomas Weise AuthorDate: Tue Dec 12 10:36:16 2023 -0500 [hotfix][docs] config key for parquet int64 option (#23909) --- docs/content/docs/connectors/table/formats/parquet.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/content/docs/connectors/table/formats/parquet.md b/docs/content/docs/connectors/table/formats/parquet.md index 0e53f4b919e..2b543c0eaab 100644 --- a/docs/content/docs/connectors/table/formats/parquet.md +++ b/docs/content/docs/connectors/table/formats/parquet.md @@ -110,7 +110,7 @@ Data Type Mapping Currently, Parquet format type mapping is compatible with Apache Hive, but by default not with Apache Spark: - Timestamp: mapping timestamp type to int96 whatever the precision is. -- Spark compatibility requires int64 via config option `timestamp.time.unit` (see above). +- Spark compatibility requires int64 via config option `write.int64.timestamp` (see above). - Decimal: mapping decimal type to fixed length byte array according to the precision. The following table lists the type mapping from Flink type to Parquet type.
(flink) branch master updated: [FLINK-25565][Formats][Parquet] timestamp int64 option tidy up (#23900)
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 548e4b5188b [FLINK-25565][Formats][Parquet] timestamp int64 option tidy up (#23900) 548e4b5188b is described below commit 548e4b5188bb3f092206182d779a909756408660 Author: Thomas Weise AuthorDate: Mon Dec 11 08:05:53 2023 -0500 [FLINK-25565][Formats][Parquet] timestamp int64 option tidy up (#23900) --- docs/content/docs/connectors/table/formats/parquet.md | 5 +++-- .../flink/formats/parquet/vector/reader/TimestampColumnReader.java | 6 +++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/docs/content/docs/connectors/table/formats/parquet.md b/docs/content/docs/connectors/table/formats/parquet.md index 75c524f238f..0e53f4b919e 100644 --- a/docs/content/docs/connectors/table/formats/parquet.md +++ b/docs/content/docs/connectors/table/formats/parquet.md @@ -107,9 +107,10 @@ For example, you can configure `parquet.compression=GZIP` to enable gzip compres Data Type Mapping -Currently, Parquet format type mapping is compatible with Apache Hive, but different with Apache Spark: +Currently, Parquet format type mapping is compatible with Apache Hive, but by default not with Apache Spark: - Timestamp: mapping timestamp type to int96 whatever the precision is. +- Spark compatibility requires int64 via config option `timestamp.time.unit` (see above). - Decimal: mapping decimal type to fixed length byte array according to the precision. The following table lists the type mapping from Flink type to Parquet type. @@ -185,7 +186,7 @@ The following table lists the type mapping from Flink type to Parquet type. TIMESTAMP - INT96 + INT96 (or INT64) diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java index aa544f4e91c..7a36edc573b 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java @@ -159,8 +159,8 @@ public class TimestampColumnReader extends AbstractColumnReader
(flink) branch release-1.18 updated: [FLINK-25565][Formats][Parquet] write and read parquet int64 timestamp (#18304) (#23887)
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.18 by this push: new 4379f2c39fb [FLINK-25565][Formats][Parquet] write and read parquet int64 timestamp (#18304) (#23887) 4379f2c39fb is described below commit 4379f2c39fbdaeb78f874f6dd461d20b8a8961cd Author: Thomas Weise AuthorDate: Fri Dec 8 13:49:04 2023 -0500 [FLINK-25565][Formats][Parquet] write and read parquet int64 timestamp (#18304) (#23887) Co-authored-by: Bo Cui --- .../docs/connectors/table/formats/parquet.md | 14 ++ .../docs/connectors/table/formats/parquet.md | 14 ++ .../formats/parquet/ParquetFileFormatFactory.java | 15 ++ .../parquet/ParquetVectorizedInputFormat.java | 13 +- .../formats/parquet/row/ParquetRowDataBuilder.java | 13 +- .../formats/parquet/row/ParquetRowDataWriter.java | 67 - .../parquet/utils/ParquetSchemaConverter.java | 50 +++- .../formats/parquet/vector/ParquetDictionary.java | 20 +- .../parquet/vector/ParquetSplitReaderUtil.java | 3 +- .../vector/reader/AbstractColumnReader.java| 2 +- .../vector/reader/TimestampColumnReader.java | 101 +++- .../formats/parquet/ParquetTimestampITCase.java| 280 + .../parquet/row/ParquetRowDataWriterTest.java | 14 ++ .../vector/ParquetInt64TimestampReaderTest.java| 69 + .../runtime/stream/FsStreamingSinkITCaseBase.scala | 65 +++-- 15 files changed, 683 insertions(+), 57 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/formats/parquet.md b/docs/content.zh/docs/connectors/table/formats/parquet.md index 92536d00f98..295b11ef41b 100644 --- a/docs/content.zh/docs/connectors/table/formats/parquet.md +++ b/docs/content.zh/docs/connectors/table/formats/parquet.md @@ -84,6 +84,20 @@ Format 参数 Boolean 使用 UTC 时区或本地时区在纪元时间和 LocalDateTime 之间进行转换。Hive 0.x/1.x/2.x 使用本地时区,但 Hive 3.x 使用 UTC 时区。 + + timestamp.time.unit + optional + micros + String + 根据TimeUnit在Timestamp和int64之间进行转换,可选值nanos/micros/millis。 + + + write.int64.timestamp + optional + false + Boolean + 以int64替代int96存储parquet Timestamp。 注意:Timestamp将于时区无关(从不转换为不同时区)。 + diff --git a/docs/content/docs/connectors/table/formats/parquet.md b/docs/content/docs/connectors/table/formats/parquet.md index 0b7ba9c42f5..75c524f238f 100644 --- a/docs/content/docs/connectors/table/formats/parquet.md +++ b/docs/content/docs/connectors/table/formats/parquet.md @@ -84,6 +84,20 @@ Format Options Boolean Use UTC timezone or local timezone to the conversion between epoch time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x use UTC timezone. + + timestamp.time.unit + optional + micros + String + Store parquet int64/LogicalTypes timestamps in this time unit, value is nanos/micros/millis. + + + write.int64.timestamp + optional + false + Boolean + Write parquet timestamp as int64/LogicalTypes instead of int96/OriginalTypes. Note: Timestamp will be time zone agnostic (NEVER converted to a different time zone). + diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java index 14f257899c1..8be727c7947 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java @@ -68,6 +68,21 @@ public class ParquetFileFormatFactory implements BulkReaderFormatFactory, BulkWr + " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x" + " use UTC timezone"); +public static final ConfigOption TIMESTAMP_TIME_UNIT = +key("timestamp.time.unit") +.stringType() +.defaultValue("micros") +.withDescription( +"Store parquet int64/LogicalTypes timestamps in this time unit, value is nanos/micros/millis"); + +public static final ConfigOption WRITE_INT64_TIMESTAMP = +key("write.int64.timestamp") +.booleanType() +.defaultValue(false) +.withDescription( +"Write parquet timestamp as int64/LogicalTypes instead of int96/OriginalTypes. " ++ "Note: Timestamp will be time zone agnostic (NEVER converted to
(flink) branch master updated (43fec308b32 -> 978bff3bb90)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 43fec308b32 [FLINK-33599] Run restore tests with RocksDB state backend (#23883) add 978bff3bb90 [FLINK-25565][Formats][Parquet] write and read parquet int64 timestamp (#18304) No new revisions were added by this update. Summary of changes: .../docs/connectors/table/formats/parquet.md | 14 ++ .../docs/connectors/table/formats/parquet.md | 14 ++ .../formats/parquet/ParquetFileFormatFactory.java | 15 ++ .../parquet/ParquetVectorizedInputFormat.java | 13 +- .../formats/parquet/row/ParquetRowDataBuilder.java | 13 +- .../formats/parquet/row/ParquetRowDataWriter.java | 67 - .../parquet/utils/ParquetSchemaConverter.java | 50 +++- .../formats/parquet/vector/ParquetDictionary.java | 20 +- .../parquet/vector/ParquetSplitReaderUtil.java | 3 +- .../vector/reader/AbstractColumnReader.java| 2 +- .../vector/reader/TimestampColumnReader.java | 101 +++- .../formats/parquet/ParquetTimestampITCase.java| 280 + .../parquet/row/ParquetRowDataWriterTest.java | 14 ++ .../vector/ParquetInt64TimestampReaderTest.java| 69 + .../runtime/stream/FsStreamingSinkITCaseBase.scala | 65 +++-- 15 files changed, 683 insertions(+), 57 deletions(-) create mode 100644 flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTimestampITCase.java create mode 100644 flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetInt64TimestampReaderTest.java
(flink) branch master updated (7eaa5db3024 -> acd3e7ab66c)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 7eaa5db3024 [FLINK-33581][python] Deprecate getter/setter methods related to state backend in python flink. add acd3e7ab66c [FLINK-27529] [connector/common] Fix Integer Comparison For HybridSource sourceIndex (#23703) No new revisions were added by this update. Summary of changes: .../source/hybrid/HybridSourceSplitEnumerator.java | 5 +- .../hybrid/HybridSourceSplitEnumeratorTest.java| 56 ++ 2 files changed, 59 insertions(+), 2 deletions(-)
[flink] branch master updated: [FLINK-32884] [flink-clients] Sending messageHeaders with decorated customHeaders for PyFlink client (#23452)
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new c5516533a57 [FLINK-32884] [flink-clients] Sending messageHeaders with decorated customHeaders for PyFlink client (#23452) c5516533a57 is described below commit c5516533a5705297fe3cfd33860e59da30750f69 Author: Elkhan Dadash AuthorDate: Tue Sep 26 22:30:12 2023 -0700 [FLINK-32884] [flink-clients] Sending messageHeaders with decorated customHeaders for PyFlink client (#23452) --- .../org/apache/flink/client/program/rest/RestClusterClient.java | 2 +- .../org/apache/flink/client/program/rest/UrlPrefixDecorator.java | 9 + .../apache/flink/client/program/rest/RestClusterClientTest.java | 6 ++ .../flink/runtime/rest/messages/CustomHeadersDecorator.java | 9 + 4 files changed, 25 insertions(+), 1 deletion(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index b05dc331853..cc5a6d2d833 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -1021,7 +1021,7 @@ public class RestClusterClient implements ClusterClient { restClient.sendRequest( webMonitorBaseUrl.getHost(), webMonitorBaseUrl.getPort(), -messageHeaders, +headers, messageParameters, request, filesToUpload); diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java index bd6398176c5..52f1b8cf56e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java @@ -107,4 +107,13 @@ public class UrlPrefixDecorator< public Collection> getSupportedAPIVersions() { return decorated.getSupportedAPIVersions(); } + +@Override +public Collection> getResponseTypeParameters() { +return decorated.getResponseTypeParameters(); +} + +public MessageHeaders getDecorated() { +return decorated; +} } diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index 4f7e2f0b08d..e1db5c6d1b0 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -61,6 +61,7 @@ import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo; import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult; import org.apache.flink.runtime.rest.handler.async.TriggerResponse; import org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValueQueryParameter; +import org.apache.flink.runtime.rest.messages.CustomHeadersDecorator; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.EmptyResponseBody; @@ -909,6 +910,11 @@ class RestClusterClientTest { final AtomicBoolean firstSubmitRequestFailed = new AtomicBoolean(false); failHttpRequest = (messageHeaders, messageParameters, requestBody) -> { +messageHeaders = +((UrlPrefixDecorator) +((CustomHeadersDecorator) messageHeaders) +.getDecorated()) +.getDecorated(); if (messageHeaders instanceof JobExecutionResultHeaders) { return !firstExecutionResultPollFailed.getAndSet(true); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java index 979c849166c..05fa82
[flink] branch master updated: PyFlink remote execution should support URLs with paths and https scheme
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 5682472e402 PyFlink remote execution should support URLs with paths and https scheme 5682472e402 is described below commit 5682472e4029c25d4a2651a609c999029fa3281b Author: Elkhan Dadashov AuthorDate: Tue Sep 12 21:32:21 2023 -0700 PyFlink remote execution should support URLs with paths and https scheme --- .../generated/common_host_port_section.html| 6 ++ .../shortcodes/generated/rest_configuration.html | 6 ++ .../org/apache/flink/client/cli/DefaultCLI.java| 12 .../client/program/rest/RestClusterClient.java | 39 - .../apache/flink/client/cli/DefaultCLITest.java| 32 ++- .../client/program/rest/RestClusterClientTest.java | 66 +- .../apache/flink/configuration/RestOptions.java| 9 +++ .../main/java/org/apache/flink/util/NetUtils.java | 9 ++- .../java/org/apache/flink/util/NetUtilsTest.java | 8 +++ 9 files changed, 170 insertions(+), 17 deletions(-) diff --git a/docs/layouts/shortcodes/generated/common_host_port_section.html b/docs/layouts/shortcodes/generated/common_host_port_section.html index c2753d125e9..624278f7fb4 100644 --- a/docs/layouts/shortcodes/generated/common_host_port_section.html +++ b/docs/layouts/shortcodes/generated/common_host_port_section.html @@ -44,6 +44,12 @@ String The port that the server binds itself. Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple Rest servers are running on the same machine. + +rest.path +(none) +String +The path that should be used by clients to interact to the server which is accessible via URL. + rest.port 8081 diff --git a/docs/layouts/shortcodes/generated/rest_configuration.html b/docs/layouts/shortcodes/generated/rest_configuration.html index 467b4275acd..af006be2037 100644 --- a/docs/layouts/shortcodes/generated/rest_configuration.html +++ b/docs/layouts/shortcodes/generated/rest_configuration.html @@ -104,6 +104,12 @@ Long The maximum time in ms for a connection to stay idle before failing. + +rest.path +(none) +String +The path that should be used by clients to interact to the server which is accessible via URL. + rest.port 8081 diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java index 273e007a8ae..d7f2fb25ce4 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java @@ -21,6 +21,8 @@ package org.apache.flink.client.cli; import org.apache.flink.client.deployment.executors.RemoteExecutor; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.util.FlinkException; import org.apache.flink.util.NetUtils; @@ -29,6 +31,7 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import java.net.InetSocketAddress; +import java.net.URL; import static org.apache.flink.client.cli.CliFrontend.setJobManagerAddressInConfig; @@ -60,6 +63,11 @@ public class DefaultCLI extends AbstractCustomCommandLine { String addressWithPort = commandLine.getOptionValue(addressOption.getOpt()); InetSocketAddress jobManagerAddress = NetUtils.parseHostPortAddress(addressWithPort); setJobManagerAddressInConfig(resultingConfiguration, jobManagerAddress); + +URL url = NetUtils.getCorrectHostnamePort(addressWithPort); +resultingConfiguration.setString(RestOptions.PATH, url.getPath()); +resultingConfiguration.setBoolean( +SecurityOptions.SSL_REST_ENABLED, isHttpsProtocol(url)); } resultingConfiguration.setString(DeploymentOptions.TARGET, RemoteExecutor.NAME); @@ -68,6 +76,10 @@ public class DefaultCLI extends AbstractCustomCommandLine { return resultingConfiguration; } +private static boolean isHttpsProtocol(URL url) { +return url.getProtocol() != null && (url.getProtocol().equalsIgnoreCase("https")); +} + @Override public String getId() { return ID; diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterCli
[flink] branch master updated: [FLINK-32885 ] Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used by RestClusterClient for PyFlink remote execution
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new e7eeea033a6 [FLINK-32885 ] Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used by RestClusterClient for PyFlink remote execution e7eeea033a6 is described below commit e7eeea033a68e1ff6bf82132b5a59eb0a5a2d0ed Author: Elkhan Dadashov AuthorDate: Fri Sep 8 14:43:03 2023 -0700 [FLINK-32885 ] Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used by RestClusterClient for PyFlink remote execution --- .../java/org/apache/flink/client/ClientUtils.java | 23 ++ .../client/program/rest}/UrlPrefixDecorator.java | 13 --- .../flink/table/client/gateway/ExecutorImpl.java | 27 -- 3 files changed, 37 insertions(+), 26 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java index e6a39a5f1a0..b828534bea5 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java @@ -33,6 +33,7 @@ import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.PipelineExecutorServiceLoader; import org.apache.flink.runtime.client.JobInitializationException; import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.rest.HttpHeader; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkUserCodeClassLoaders; import org.apache.flink.util.SerializedThrowable; @@ -43,6 +44,8 @@ import org.slf4j.LoggerFactory; import java.net.URL; import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.concurrent.Executors; @@ -185,4 +188,24 @@ public enum ClientUtils { TimeUnit.MILLISECONDS); return scheduledExecutor; } + +public static Collection readHeadersFromEnvironmentVariable(String envVarName) { +List headers = new ArrayList<>(); +String rawHeaders = System.getenv(envVarName); + +if (rawHeaders != null) { +String[] lines = rawHeaders.split("\n"); +for (String line : lines) { +String[] keyValue = line.split(":", 2); +if (keyValue.length == 2) { +headers.add(new HttpHeader(keyValue[0], keyValue[1])); +} else { +LOG.info( +"Skipped a malformed header {} from FLINK_REST_CLIENT_HEADERS env variable. Expecting newline-separated headers in format header_name:header_value.", +line); +} +} +} +return headers; +} } diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/util/UrlPrefixDecorator.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java similarity index 91% rename from flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/util/UrlPrefixDecorator.java rename to flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java index 9e905029fc4..bd6398176c5 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/util/UrlPrefixDecorator.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.gateway.rest.header.util; +package org.apache.flink.client.program.rest; import org.apache.flink.runtime.rest.HttpMethodWrapper; import org.apache.flink.runtime.rest.RestClient; @@ -24,10 +24,12 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.MessageParameters; import org.apache.flink.runtime.rest.messages.RequestBody; import org.apache.flink.runtime.rest.messages.ResponseBody; -import org.apache.flink.table.gateway.rest.header.SqlGatewayMessageHeaders; +import org.apache.flink.runtime.rest.versioning.RestAPIVersion; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import java.util.Collection; + import static org.apache.flink.shaded.guava31.com.google.common.base.Preconditions.checkNotNull; /** @@ -41,7 +43,7 @@ import static org.apache.flink.shaded.guava31.com.google.common.base.Preconditio */ public class UrlPrefixDecorator< R extends RequestBody, P extends ResponseBody, M extends MessageParameters> -implements SqlGatewayMessageHe
[flink] branch master updated (079cd60510d -> 2b14c3ae834)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 079cd60510d [FLINK-32730][table-planner] Fix the bug of Scan reuse after projection pushdown without considering the dpp pattern add 2b14c3ae834 [docs] Add documentation for SQL Client URL support No new revisions were added by this update. Summary of changes: docs/content.zh/docs/dev/table/sqlClient.md | 20 docs/content/docs/dev/table/sqlClient.md| 19 +++ 2 files changed, 39 insertions(+)
[flink] branch master updated (d88ef0e355b -> 6e61b932a3e)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from d88ef0e355b [FLINK-32502][runtime] Remove the AbstractLeaderElectionService (#22959) add 6e61b932a3e [FLINK-32035][sql-client] Support HTTPS endpoints in SQL Client gateway mode (#22810) No new revisions were added by this update. Summary of changes: .../flink-end-to-end-tests-restclient}/pom.xml | 31 ++--- .../flink/runtime/rest/RestClientITCase.java | 142 + .../src/test/resources/log4j2-test.properties | 0 flink-end-to-end-tests/pom.xml | 1 + .../org/apache/flink/runtime/net/SSLUtils.java | 60 ++--- .../org/apache/flink/runtime/rest/RestClient.java | 37 +- .../flink/table/client/gateway/ExecutorImpl.java | 7 +- 7 files changed, 227 insertions(+), 51 deletions(-) copy {flink-state-backends/flink-statebackend-heap-spillable => flink-end-to-end-tests/flink-end-to-end-tests-restclient}/pom.xml (73%) create mode 100644 flink-end-to-end-tests/flink-end-to-end-tests-restclient/src/test/java/org/apache/flink/runtime/rest/RestClientITCase.java copy {flink-architecture-tests/flink-architecture-tests-production => flink-end-to-end-tests/flink-end-to-end-tests-restclient}/src/test/resources/log4j2-test.properties (100%)
[flink] branch master updated: [FLINK-32373][sql-client] Support passing headers with SQL Client gateway requests (#22816)
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 1354d2fae3f [FLINK-32373][sql-client] Support passing headers with SQL Client gateway requests (#22816) 1354d2fae3f is described below commit 1354d2fae3fde2a448ce1fac5dee7859973a93e1 Author: Alexander Fedulov <1492164+afedu...@users.noreply.github.com> AuthorDate: Thu Jul 6 21:34:36 2023 +0200 [FLINK-32373][sql-client] Support passing headers with SQL Client gateway requests (#22816) --- .../flink/configuration/ConfigConstants.java | 6 ++ .../org/apache/flink/runtime/rest/HttpHeader.java | 83 ++ .../org/apache/flink/runtime/rest/RestClient.java | 19 ++-- .../rest/messages/CustomHeadersDecorator.java | 120 + .../runtime/rest/messages/MessageHeaders.java | 14 +++ .../flink/table/client/gateway/ExecutorImpl.java | 57 -- .../table/client/gateway/ExecutorImplITCase.java | 19 7 files changed, 302 insertions(+), 16 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 3c7a89c9e00..a4968db4550 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -1756,6 +1756,12 @@ public final class ConfigConstants { /** The user lib directory name. */ public static final String DEFAULT_FLINK_USR_LIB_DIR = "usrlib"; +/** + * The environment variable name which contains a list of newline-separated HTTP headers for + * Flink's REST client. + */ +public static final String FLINK_REST_CLIENT_HEADERS = "FLINK_REST_CLIENT_HEADERS"; + // Encoding -- public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpHeader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpHeader.java new file mode 100644 index 000..06ee95bd451 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpHeader.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import java.util.Objects; + +/** Represents an HTTP header with a name and a value. */ +public class HttpHeader { + +/** The name of the HTTP header. */ +private final String name; + +/** The value of the HTTP header. */ +private final String value; + +/** + * Constructs an {@code HttpHeader} object with the specified name and value. + * + * @param name the name of the HTTP header + * @param value the value of the HTTP header + */ +public HttpHeader(String name, String value) { +this.name = name; +this.value = value; +} + +/** + * Returns the name of this HTTP header. + * + * @return the name of this HTTP header + */ +public String getName() { +return name; +} + +/** + * Returns the value of this HTTP header. + * + * @return the value of this HTTP header + */ +public String getValue() { +return value; +} + +@Override +public String toString() { +return "HttpHeader{" + "name='" + name + '\'' + ", value='" + value + '\'' + '}'; +} + +@Override +public boolean equals(Object other) { +if (this == other) { +return true; +} +if (other == null || getClass() != other.getClass()) { +return false; +} +HttpHeader that = (HttpHeader) other; +return Objects.equals(getName(), that.getName()) +&& Objects.equals(getValue(), that.getValue()); +} + +@Override +public int hashCode() { +return Objects.hash(getName(), getValue()
[flink] branch master updated: [FLINK-32030][sql-client] Add URLs support for SQL Client gateway mode (#22556)
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 4bd51ce122d [FLINK-32030][sql-client] Add URLs support for SQL Client gateway mode (#22556) 4bd51ce122d is described below commit 4bd51ce122d03a13cfd6fdf69325630679cd5053 Author: Alexander Fedulov <1492164+afedu...@users.noreply.github.com> AuthorDate: Tue May 16 18:52:06 2023 +0200 [FLINK-32030][sql-client] Add URLs support for SQL Client gateway mode (#22556) --- .../main/java/org/apache/flink/util/NetUtils.java | 12 +++ .../java/org/apache/flink/util/NetUtilsTest.java | 11 +++ .../org/apache/flink/runtime/rest/RestClient.java | 14 ++- .../apache/flink/table/client/cli/CliOptions.java | 7 +- .../flink/table/client/cli/CliOptionsParser.java | 31 ++- .../flink/table/client/gateway/Executor.java | 5 + .../flink/table/client/gateway/ExecutorImpl.java | 49 +++--- .../apache/flink/table/client/SqlClientTest.java | 17 +++- .../rest/header/util/UrlPrefixDecorator.java | 103 + 9 files changed, 231 insertions(+), 18 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java index fc3be6cf9b4..bab4d9b3208 100644 --- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java @@ -120,6 +120,18 @@ public class NetUtils { } } +/** + * Converts an InetSocketAddress to a URL. This method assigns the "http://; schema to the URL + * by default. + * + * @param socketAddress the InetSocketAddress to be converted + * @return a URL object representing the provided socket address with "http://; schema + */ +public static URL socketToUrl(InetSocketAddress socketAddress) { +String hostPort = socketAddress.getHostString() + ":" + socketAddress.getPort(); +return validateHostPortString(hostPort); +} + /** * Calls {@link ServerSocket#accept()} on the provided server socket, suppressing any thrown * {@link SocketTimeoutException}s. This is a workaround for the underlying JDK-8237858 bug in diff --git a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java index a835e2bad7c..6168da4474e 100644 --- a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java +++ b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java @@ -18,12 +18,14 @@ package org.apache.flink.util; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Test; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.MalformedURLException; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; @@ -32,6 +34,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Set; +import static org.apache.flink.util.NetUtils.socketToUrl; import static org.hamcrest.core.IsCollectionContaining.hasItems; import static org.hamcrest.core.IsNot.not; import static org.junit.Assert.assertEquals; @@ -343,4 +346,12 @@ public class NetUtilsTest extends TestLogger { NetUtils.unresolvedHostAndPortToNormalizedString(host, port)); } } + +@Test +public void testSocketToUrl() throws MalformedURLException { +InetSocketAddress socketAddress = new InetSocketAddress("foo.com", 8080); +URL expectedResult = new URL("http://foo.com:8080;); + + Assertions.assertThat(socketToUrl(socketAddress)).isEqualTo(expectedResult); +} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java index 2ab8fdac579..03a5c7b0742 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java @@ -123,6 +123,8 @@ public class RestClient implements AutoCloseableAsync { private final AtomicBoolean isRunning = new AtomicBoolean(true); +public static final String VERSION_PLACEHOLDER = "{{VERSION}}"; + @VisibleForTesting List outboundChannelHandlerFactories; public RestClient(Configuration configuration, Executor executor) @@ -353,7 +355,7 @@ public class RestClient implements AutoCloseableAsync { } String versionedHandlerURL = -"/" + apiVersion.getURLVersionPrefix() + messageHeaders.getTargetRestEndpointURL(); +constructVersionedHandlerUrl(messageHeaders, apiVersion.getURLVersionPrefix
[flink] branch master updated: [FLINK-29099][connectors/kinesis] Update global watermark for idle subtask (#20844)
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 61834da8298 [FLINK-29099][connectors/kinesis] Update global watermark for idle subtask (#20844) 61834da8298 is described below commit 61834da82984323484718e551c15e31ce023e026 Author: Seth Saperstein <99828679+sethsaperstein-l...@users.noreply.github.com> AuthorDate: Mon Nov 28 15:35:01 2022 -0800 [FLINK-29099][connectors/kinesis] Update global watermark for idle subtask (#20844) --- .../connectors/kinesis/internals/KinesisDataFetcher.java | 14 -- .../kinesis/util/JobManagerWatermarkTracker.java | 5 + .../kinesis/util/JobManagerWatermarkTrackerTest.java | 1 + 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index 4fcc80a250e..d6c7b296da8 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -1174,6 +1174,14 @@ public class KinesisDataFetcher { } } +LOG.debug( +"WatermarkEmitter subtask: {}, last watermark: {}, potential watermark: {}" ++ ", potential next watermark: {}", +indexOfThisConsumerSubtask, +lastWatermark, +potentialWatermark, +potentialNextWatermark); + // advance watermark if possible (watermarks can only be ascending) if (potentialWatermark == Long.MAX_VALUE) { if (shardWatermarks.isEmpty() || shardIdleIntervalMillis > 0) { @@ -1265,11 +1273,11 @@ public class KinesisDataFetcher { public void onProcessingTime(long timestamp) { if (nextWatermark != Long.MIN_VALUE) { long globalWatermark = lastGlobalWatermark; -// TODO: refresh watermark while idle if (!(isIdle && nextWatermark == propagatedLocalWatermark)) { globalWatermark = watermarkTracker.updateWatermark(nextWatermark); propagatedLocalWatermark = nextWatermark; } else { +globalWatermark = watermarkTracker.updateWatermark(Long.MIN_VALUE); LOG.info( "WatermarkSyncCallback subtask: {} is idle", indexOfThisConsumerSubtask); @@ -1279,12 +1287,14 @@ public class KinesisDataFetcher { lastLogged = System.currentTimeMillis(); LOG.info( "WatermarkSyncCallback subtask: {} local watermark: {}" -+ ", global watermark: {}, delta: {} timeouts: {}, emitter: {}", ++ ", global watermark: {}, delta: {} timeouts: {}, idle: {}" ++ ", emitter: {}", indexOfThisConsumerSubtask, nextWatermark, globalWatermark, nextWatermark - globalWatermark, watermarkTracker.getUpdateTimeoutCount(), +isIdle, recordEmitter.printInfo()); // Following is for debugging non-reproducible issue with stalled watermark diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java index b4c78438dca..51d055e2c96 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java @@ -129,6 +129,11 @@ public class JobManagerWatermarkTracker extends WatermarkTracker { } catch (Exception e) { throw new RuntimeException(e); } +// no op to get global watermark without updating it +if (value.watermark == Long.MIN_VALUE) { +addCount--; +return accumulator; +
[flink-kubernetes-operator] branch main updated: [FLINK-30004] Cleanup deployment after savepoint for Flink versions < 1.15
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new 7db0eb1e [FLINK-30004] Cleanup deployment after savepoint for Flink versions < 1.15 7db0eb1e is described below commit 7db0eb1e02a1430128f436bdecea9748ce81fe14 Author: Thomas Weise AuthorDate: Sun Nov 13 15:56:41 2022 -0500 [FLINK-30004] Cleanup deployment after savepoint for Flink versions < 1.15 --- .../operator/service/AbstractFlinkService.java| 1 + .../operator/service/NativeFlinkService.java | 7 ++- .../operator/service/NativeFlinkServiceTest.java | 19 +-- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 38b32d04..2208cf80 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -326,6 +326,7 @@ public abstract class AbstractFlinkService implements FlinkService { exception); } if (deleteClusterAfterSavepoint) { +LOG.info("Cleaning up deployment after stop-with-savepoint"); deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, true); } break; diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java index f40322ff..1402c8d3 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java @@ -27,6 +27,7 @@ import org.apache.flink.client.deployment.application.ApplicationConfiguration; import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.api.spec.JobSpec; import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; @@ -80,7 +81,11 @@ public class NativeFlinkService extends AbstractFlinkService { public void cancelJob( FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration configuration) throws Exception { -cancelJob(deployment, upgradeMode, configuration, false); +// prior to Flink 1.15, ensure removal of orphaned config maps +// https://issues.apache.org/jira/browse/FLINK-30004 +boolean deleteClusterAfterSavepoint = + !deployment.getSpec().getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_14); +cancelJob(deployment, upgradeMode, configuration, deleteClusterAfterSavepoint); } @Override diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java index c79663ef..fb204c1a 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java @@ -56,6 +56,8 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.util.ArrayList; import java.util.Arrays; @@ -114,8 +116,9 @@ public class NativeFlinkServiceTest { assertNull(jobStatus.getSavepointInfo().getLastSavepoint()); } -@Test -public void testCancelJobWithSavepointUpgradeMode() throws Exception { +@ParameterizedTest +@EnumSource(FlinkVersion.class) +public void testCancelJobWithSavepointUpgradeMode(FlinkVersion flinkVersion) throws Exception { final TestingClusterClient testingClusterClient = new TestingClusterClient
[flink] branch master updated: [FLINK-27479] add logic to manage the availability future with respect to the underlying reader future
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 9dad2029461 [FLINK-27479] add logic to manage the availability future with respect to the underlying reader future 9dad2029461 is described below commit 9dad2029461e6f48a29f25c50d642239bc2e5721 Author: Mason Chen AuthorDate: Wed Aug 31 18:04:02 2022 -0700 [FLINK-27479] add logic to manage the availability future with respect to the underlying reader future formatting --- .../base/source/hybrid/HybridSourceReader.java | 27 +--- .../base/source/hybrid/HybridSourceReaderTest.java | 166 + 2 files changed, 173 insertions(+), 20 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java index 855f85ddd3f..2f113078fe3 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java @@ -96,12 +96,8 @@ public class HybridSourceReader implements SourceReader new SourceReaderFinishedEvent(currentSourceIndex)); if (!isFinalSource) { // More splits may arrive for a subsequent reader. -// InputStatus.NOTHING_AVAILABLE suspends poll, requires completion of the -// availability future after receiving more splits to resume. -if (availabilityFuture.isDone()) { -// reset to avoid continued polling -availabilityFuture = new CompletableFuture(); -} +// InputStatus.NOTHING_AVAILABLE suspends poll, complete the +// availability future in the switchover to the next reader return InputStatus.NOTHING_AVAILABLE; } } @@ -133,6 +129,10 @@ public class HybridSourceReader implements SourceReader @Override public CompletableFuture isAvailable() { +if (availabilityFuture.isDone()) { +availabilityFuture = currentReader.isAvailable(); +} + return availabilityFuture; } @@ -185,10 +185,6 @@ public class HybridSourceReader implements SourceReader switchedSources.put(sse.sourceIndex(), sse.source()); setCurrentReader(sse.sourceIndex()); isFinalSource = sse.isFinalSource(); -if (!availabilityFuture.isDone()) { -// continue polling -availabilityFuture.complete(null); -} } else { currentReader.handleSourceEvents(sourceEvent); } @@ -231,16 +227,7 @@ public class HybridSourceReader implements SourceReader reader.start(); currentSourceIndex = index; currentReader = reader; -currentReader -.isAvailable() -.whenComplete( -(result, ex) -> { -if (ex == null) { -availabilityFuture.complete(result); -} else { -availabilityFuture.completeExceptionally(ex); -} -}); +availabilityFuture.complete(null); LOG.debug( "Reader started: subtask={} sourceIndex={} {}", readerContext.getIndexOfSubtask(), diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java index b9c2f566115..02231378a36 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java @@ -24,7 +24,14 @@ import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.mocks.MockSource; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.SourceReaderOptions; import org.apache.flink.connector.base.source.reader.mocks.MockBaseSo
[flink-kubernetes-operator] branch main updated: [FLINK-29109] Set jobId when upgrade mode is stateless (Flink < 1.16)
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new 6f0914bb [FLINK-29109] Set jobId when upgrade mode is stateless (Flink < 1.16) 6f0914bb is described below commit 6f0914bbd296c9daf2664afe0a77d1df4f2e157e Author: Thomas Weise AuthorDate: Sun Sep 25 13:24:19 2022 -0400 [FLINK-29109] Set jobId when upgrade mode is stateless (Flink < 1.16) --- .../deployment/ApplicationReconciler.java | 33 ++ .../kubernetes/operator/TestingFlinkService.java | 4 +++ .../deployment/ApplicationReconcilerTest.java | 29 --- 3 files changed, 62 insertions(+), 4 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java index a4384edd..c0f53519 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java @@ -17,13 +17,16 @@ package org.apache.flink.kubernetes.operator.reconciler.deployment; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.PipelineOptionsInternal; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; +import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus; @@ -172,6 +175,9 @@ public class ApplicationReconciler flinkService.deleteClusterDeployment(relatedResource.getMetadata(), status, true); flinkService.waitForClusterShutdown(deployConfig); } + +setJobIdIfNecessary(spec, status, deployConfig); + eventRecorder.triggerEvent( relatedResource, EventRecorder.Type.Normal, @@ -186,6 +192,33 @@ public class ApplicationReconciler relatedResource.getMetadata(), spec, deployConfig, kubernetesClient); } +private void setJobIdIfNecessary( +FlinkDeploymentSpec spec, FlinkDeploymentStatus status, Configuration deployConfig) { +// https://issues.apache.org/jira/browse/FLINK-19358 +// https://issues.apache.org/jira/browse/FLINK-29109 +if (spec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_15)) { +return; +} + +if (deployConfig.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID) != null) { +// user managed, don't touch +return; +} + +// generate jobId initially or rotate on every deployment when mode is stateless +if (status.getJobStatus().getJobId() == null +|| spec.getJob().getUpgradeMode() == UpgradeMode.STATELESS) { +String jobId = JobID.generate().toHexString(); +// record before first deployment to ensure we use it on any retry +status.getJobStatus().setJobId(jobId); +LOG.info("Assigning JobId override to {}", jobId); +} + +String jobId = status.getJobStatus().getJobId(); +LOG.debug("Setting {} to {}", PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId); +deployConfig.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId); +} + @Override protected void cancelJob( FlinkDeployment deployment, diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index f97e5fa7..27b453d0 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -24,6 +24,7 @@ import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.
[flink-kubernetes-operator] branch main updated: [FLINK-29251] Send CREATED status and Cancel event via FlinkResourceListener
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new 84b08d00 [FLINK-29251] Send CREATED status and Cancel event via FlinkResourceListener 84b08d00 is described below commit 84b08d00f67552dbd4bd97d49ab6d39f1cd87f7d Author: Matyas Orhidi AuthorDate: Mon Sep 12 11:03:50 2022 +0200 [FLINK-29251] Send CREATED status and Cancel event via FlinkResourceListener --- .../controller/FlinkDeploymentController.java | 9 +- .../controller/FlinkSessionJobController.java | 9 +- .../operator/observer/JobStatusObserver.java | 2 +- .../reconciler/deployment/SessionReconciler.java | 2 +- .../kubernetes/operator/utils/EventRecorder.java | 3 +- .../kubernetes/operator/utils/StatusRecorder.java | 4 +++ .../controller/FlinkDeploymentControllerTest.java | 16 +-- .../listener/FlinkResourceListenerTest.java| 32 -- 8 files changed, 50 insertions(+), 27 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index a0b0a161..0026ac54 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -84,7 +84,14 @@ public class FlinkDeploymentController @Override public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) { -LOG.info("Deleting FlinkDeployment"); +String msg = "Cleaning up " + FlinkDeployment.class.getSimpleName(); +LOG.info(msg); +eventRecorder.triggerEvent( +flinkApp, +EventRecorder.Type.Normal, +EventRecorder.Reason.Cleanup, +EventRecorder.Component.Operator, +msg); statusRecorder.updateStatusFromCache(flinkApp); try { observerFactory.getOrCreate(flinkApp).observe(flinkApp, context); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java index d037571e..61dc9512 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java @@ -107,7 +107,14 @@ public class FlinkSessionJobController @Override public DeleteControl cleanup(FlinkSessionJob sessionJob, Context context) { -LOG.info("Deleting FlinkSessionJob"); +String msg = "Cleaning up " + FlinkSessionJob.class.getSimpleName(); +LOG.info(msg); +eventRecorder.triggerEvent( +sessionJob, +EventRecorder.Type.Normal, +EventRecorder.Reason.Cleanup, +EventRecorder.Component.Operator, +msg); statusRecorder.removeCachedStatus(sessionJob); return reconciler.cleanup(sessionJob, context); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java index 2c6b4bc6..51cf72ad 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java @@ -147,7 +147,7 @@ public abstract class JobStatusObserver { eventRecorder.triggerEvent( resource, EventRecorder.Type.Normal, -EventRecorder.Reason.StatusChanged, +EventRecorder.Reason.JobStatusChanged, EventRecorder.Component.Job, message); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java index 1131a5e9..43a62e38 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/k
[flink-kubernetes-operator] branch main updated: [FLINK-29100] Relax upgrade checks to allow stateless restart when no stable spec is present
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new 6d7c8cee [FLINK-29100] Relax upgrade checks to allow stateless restart when no stable spec is present 6d7c8cee is described below commit 6d7c8ceebb9ee0b83eb7036c57003177b7f2fd82 Author: Thomas Weise AuthorDate: Tue Aug 30 21:17:33 2022 -0400 [FLINK-29100] Relax upgrade checks to allow stateless restart when no stable spec is present --- .../deployment/ApplicationReconciler.java | 41 +++--- .../deployment/ApplicationReconcilerTest.java | 38 2 files changed, 75 insertions(+), 4 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java index 42aabaad..43520df3 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java @@ -27,6 +27,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus; +import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState; import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.service.FlinkService; @@ -98,12 +99,19 @@ public class ApplicationReconciler .OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED) && FlinkUtils.isKubernetesHAActivated(deployConfig) && FlinkUtils.isKubernetesHAActivated(observeConfig) -&& flinkService.isHaMetadataAvailable(deployConfig) && !flinkVersionChanged( ReconciliationUtils.getDeployedSpec(deployment), deployment.getSpec())) { -LOG.info( -"Job is not running but HA metadata is available for last state restore, ready for upgrade"); -return Optional.of(UpgradeMode.LAST_STATE); + +if (!flinkService.isHaMetadataAvailable(deployConfig)) { +if (deployment.getStatus().getReconciliationStatus().getLastStableSpec() == null) { +// initial deployment failure, reset to allow for spec change to proceed +return resetOnMissingStableSpec(deployment, deployConfig); +} +} else { +LOG.info( +"Job is not running but HA metadata is available for last state restore, ready for upgrade"); +return Optional.of(UpgradeMode.LAST_STATE); +} } if (status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING @@ -120,6 +128,31 @@ public class ApplicationReconciler return Optional.empty(); } +private Optional resetOnMissingStableSpec( +FlinkDeployment deployment, Configuration deployConfig) { +// initial deployment failure, reset to allow for spec change to proceed +flinkService.deleteClusterDeployment( +deployment.getMetadata(), deployment.getStatus(), false); +flinkService.waitForClusterShutdown(deployConfig); +if (!flinkService.isHaMetadataAvailable(deployConfig)) { +LOG.info( +"Job never entered stable state. Clearing previous spec to reset for initial deploy"); +// TODO: lastSpecWithMeta.f1.isFirstDeployment() is false +// ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(deployment); + deployment.getStatus().getReconciliationStatus().setLastReconciledSpec(null); +// UPGRADING triggers immediate reconciliation +deployment +.getStatus() +.getReconciliationStatus() +.setState(ReconciliationState.UPGRADING); +return Optional.empty(); +} else { +// proceed with upgrade if deployment succeeded between check and delete +LOG.info("Found HA state after deployment deletion, falling back to stateful upgrade"); +return Optional.of(UpgradeMode.LAST_STATE); +} +} + @Override p
[flink] branch master updated: [FLINK-28977] NullPointerException in HybridSourceSplitEnumerator.close (#20587)
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new ee4d27411b3 [FLINK-28977] NullPointerException in HybridSourceSplitEnumerator.close (#20587) ee4d27411b3 is described below commit ee4d27411b3bf1cfcc4171aab777eb8bf08f1550 Author: Michael <89284672+mbene...@users.noreply.github.com> AuthorDate: Tue Aug 23 05:28:13 2022 -0700 [FLINK-28977] NullPointerException in HybridSourceSplitEnumerator.close (#20587) --- .../connector/base/source/hybrid/HybridSourceSplitEnumerator.java| 5 - 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java index 61baabeb941..c09653ec71b 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java @@ -246,7 +246,10 @@ public class HybridSourceSplitEnumerator @Override public void close() throws IOException { -currentEnumerator.close(); +// close may be called before currentEnumerator was initialized +if (currentEnumerator != null) { +currentEnumerator.close(); +} } private void switchEnumerator() {
[flink] branch release-1.15 updated: [FLINK-28817][connector/common] NullPointerException in HybridSource when restoring from checkpoint
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.15 by this push: new e5570e3e33a [FLINK-28817][connector/common] NullPointerException in HybridSource when restoring from checkpoint e5570e3e33a is described below commit e5570e3e33ac33fd1b31d38c86ac6a291e7bc47e Author: Qishang Zhong AuthorDate: Sat Aug 13 11:21:58 2022 +0800 [FLINK-28817][connector/common] NullPointerException in HybridSource when restoring from checkpoint --- .../source/hybrid/HybridSourceSplitEnumerator.java| 6 +- .../connector/base/source/hybrid/SwitchedSources.java | 8 +++- .../hybrid/HybridSourceSplitEnumeratorTest.java | 19 +++ 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java index d27de221af8..61baabeb941 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java @@ -218,7 +218,11 @@ public class HybridSourceSplitEnumerator } if (subtaskSourceIndex < currentSourceIndex) { -subtaskSourceIndex++; +// find initial or next index for the reader +subtaskSourceIndex = +subtaskSourceIndex == -1 +? switchedSources.getFirstSourceIndex() +: ++subtaskSourceIndex; sendSwitchSourceEvent(subtaskId, subtaskSourceIndex); return; } diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchedSources.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchedSources.java index 7911612d258..68128d1faed 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchedSources.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchedSources.java @@ -25,10 +25,12 @@ import org.apache.flink.util.Preconditions; import java.util.HashMap; import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; /** Sources that participated in switching with cached serializers. */ class SwitchedSources { -private final Map sources = new HashMap<>(); +private final SortedMap sources = new TreeMap<>(); private final Map> cachedSerializers = new HashMap<>(); @@ -45,4 +47,8 @@ class SwitchedSources { public void put(int sourceIndex, Source source) { sources.put(sourceIndex, Preconditions.checkNotNull(source)); } + +public int getFirstSourceIndex() { +return sources.firstKey(); +} } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java index 7bcf69c5e72..8b5cb096586 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java @@ -182,6 +182,25 @@ public class HybridSourceSplitEnumeratorTest { Matchers.iterableWithSize(1)); } +@Test +public void testRestoreEnumeratorAfterFirstSourceWithoutRestoredSplits() throws Exception { +setupEnumeratorAndTriggerSourceSwitch(); +HybridSourceEnumeratorState enumeratorState = enumerator.snapshotState(0); +MockSplitEnumerator underlyingEnumerator = getCurrentEnumerator(enumerator); +Assert.assertThat( +(List) Whitebox.getInternalState(underlyingEnumerator, "splits"), +Matchers.iterableWithSize(0)); +enumerator = +(HybridSourceSplitEnumerator) source.restoreEnumerator(context, enumeratorState); +enumerator.start(); +// subtask starts at -1 since it has no splits after restore +enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1)); +underlyingEnumerator = getCurrentEnumerator(enumerator); +Assert.assertThat( +
[flink] branch master updated (1455e51c7cd -> 6e80d90b161)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 1455e51c7cd [FLINK-26771][hive] Fix incomparable exception between boolean type and numeric type in Hive dialect add 6e80d90b161 [FLINK-28817] NullPointerException in HybridSource when restoring from checkpoint (#20530) No new revisions were added by this update. Summary of changes: .../source/hybrid/HybridSourceSplitEnumerator.java | 6 +- .../base/source/hybrid/SwitchedSources.java | 8 +++- .../hybrid/HybridSourceSplitEnumeratorTest.java | 21 + 3 files changed, 33 insertions(+), 2 deletions(-)
[flink] branch release-1.15 updated: [FLINK-27255] [flink-avro] flink-avro does not support ser/de of large avro schema (#19645)
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.15 by this push: new 20768b2f53f [FLINK-27255] [flink-avro] flink-avro does not support ser/de of large avro schema (#19645) 20768b2f53f is described below commit 20768b2f53fb8c126d984bc85be2d34fdaa487c1 Author: Haizhou Zhao AuthorDate: Tue May 10 16:18:38 2022 -0700 [FLINK-27255] [flink-avro] flink-avro does not support ser/de of large avro schema (#19645) Co-authored-by: Haizhou Zhao --- .../avro/typeutils/GenericRecordAvroTypeInfo.java | 10 - .../avro/typeutils/SerializableAvroSchema.java | 11 +++-- .../typeutils/AvroGenericRecordTypeInfoTest.java | 50 ++ .../typeutils/AvroSerializerGenericRecordTest.java | 7 +-- ...a => AvroSerializerLargeGenericRecordTest.java} | 11 ++--- .../flink/formats/avro/utils/AvroTestUtils.java| 30 + 6 files changed, 102 insertions(+), 17 deletions(-) diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java index 387014077d2..28332db5a46 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java @@ -28,6 +28,7 @@ import org.apache.avro.generic.GenericRecord; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.nio.charset.StandardCharsets; import java.util.Objects; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -104,10 +105,15 @@ public class GenericRecordAvroTypeInfo extends TypeInformation { } private void writeObject(ObjectOutputStream oos) throws IOException { -oos.writeUTF(schema.toString()); +byte[] schemaStrInBytes = schema.toString(false).getBytes(StandardCharsets.UTF_8); +oos.writeInt(schemaStrInBytes.length); +oos.write(schemaStrInBytes); } private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { -this.schema = new Schema.Parser().parse(ois.readUTF()); +int len = ois.readInt(); +byte[] content = new byte[len]; +ois.readFully(content); +this.schema = new Schema.Parser().parse(new String(content, StandardCharsets.UTF_8)); } } diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java index 32714951a61..12458752428 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; +import java.nio.charset.StandardCharsets; /** A wrapper for Avro {@link Schema}, that is Java serializable. */ @Internal @@ -52,14 +53,18 @@ final class SerializableAvroSchema implements Serializable { oos.writeBoolean(false); } else { oos.writeBoolean(true); -oos.writeUTF(schema.toString(false)); +byte[] schemaStrInBytes = schema.toString(false).getBytes(StandardCharsets.UTF_8); +oos.writeInt(schemaStrInBytes.length); +oos.write(schemaStrInBytes); } } private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { if (ois.readBoolean()) { -String schema = ois.readUTF(); -this.schema = new Parser().parse(schema); +int len = ois.readInt(); +byte[] content = new byte[len]; +ois.readFully(content); +this.schema = new Parser().parse(new String(content, StandardCharsets.UTF_8)); } else { this.schema = null; } diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericRecordTypeInfoTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericRecordTypeInfoTest.java new file mode 100644 index 000..dd2b6c98907 --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericRecordTypeInfoTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE f
[flink] branch release-1.15 updated: [FLINK-27465] Handle conversion of negative long to timestamp in AvroRowDeserializationSchema
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.15 by this push: new 50d280daaa8 [FLINK-27465] Handle conversion of negative long to timestamp in AvroRowDeserializationSchema 50d280daaa8 is described below commit 50d280daaa8a814e2a102dce622e84640875150a Author: Thomas Weise AuthorDate: Mon May 2 21:50:18 2022 -0700 [FLINK-27465] Handle conversion of negative long to timestamp in AvroRowDeserializationSchema --- .../org/apache/flink/formats/avro/AvroRowDeserializationSchema.java | 5 + 1 file changed, 5 insertions(+) diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java index cbbd956d067..8f72d02499d 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java @@ -349,6 +349,11 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema< long seconds = micros / MICROS_PER_SECOND - offsetMillis / 1000; int nanos = ((int) (micros % MICROS_PER_SECOND)) * 1000 - offsetMillis % 1000 * 1000; +if (nanos < 0) { +// can't set negative nanos on timestamp +seconds--; +nanos = (int) (MICROS_PER_SECOND * 1000 + nanos); +} Timestamp timestamp = new Timestamp(seconds * 1000L); timestamp.setNanos(nanos); return timestamp;
[flink] branch release-1.14 updated: [FLINK-27255] [flink-avro] flink-avro does not support ser/de of large avro schema (#19705)
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.14 by this push: new d3be0d0154a [FLINK-27255] [flink-avro] flink-avro does not support ser/de of large avro schema (#19705) d3be0d0154a is described below commit d3be0d0154aad2a870448c49ce764a7f49d8bb7e Author: Haizhou Zhao AuthorDate: Wed May 11 17:41:32 2022 -0700 [FLINK-27255] [flink-avro] flink-avro does not support ser/de of large avro schema (#19705) Co-authored-by: Haizhou Zhao --- .../avro/typeutils/GenericRecordAvroTypeInfo.java | 10 - .../avro/typeutils/SerializableAvroSchema.java | 11 +++-- .../typeutils/AvroGenericRecordTypeInfoTest.java | 50 ++ .../typeutils/AvroSerializerGenericRecordTest.java | 7 +-- ...a => AvroSerializerLargeGenericRecordTest.java} | 11 ++--- .../flink/formats/avro/utils/AvroTestUtils.java| 30 + 6 files changed, 102 insertions(+), 17 deletions(-) diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java index 387014077d2..28332db5a46 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java @@ -28,6 +28,7 @@ import org.apache.avro.generic.GenericRecord; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.nio.charset.StandardCharsets; import java.util.Objects; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -104,10 +105,15 @@ public class GenericRecordAvroTypeInfo extends TypeInformation { } private void writeObject(ObjectOutputStream oos) throws IOException { -oos.writeUTF(schema.toString()); +byte[] schemaStrInBytes = schema.toString(false).getBytes(StandardCharsets.UTF_8); +oos.writeInt(schemaStrInBytes.length); +oos.write(schemaStrInBytes); } private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { -this.schema = new Schema.Parser().parse(ois.readUTF()); +int len = ois.readInt(); +byte[] content = new byte[len]; +ois.readFully(content); +this.schema = new Schema.Parser().parse(new String(content, StandardCharsets.UTF_8)); } } diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java index 32714951a61..12458752428 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; +import java.nio.charset.StandardCharsets; /** A wrapper for Avro {@link Schema}, that is Java serializable. */ @Internal @@ -52,14 +53,18 @@ final class SerializableAvroSchema implements Serializable { oos.writeBoolean(false); } else { oos.writeBoolean(true); -oos.writeUTF(schema.toString(false)); +byte[] schemaStrInBytes = schema.toString(false).getBytes(StandardCharsets.UTF_8); +oos.writeInt(schemaStrInBytes.length); +oos.write(schemaStrInBytes); } } private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { if (ois.readBoolean()) { -String schema = ois.readUTF(); -this.schema = new Parser().parse(schema); +int len = ois.readInt(); +byte[] content = new byte[len]; +ois.readFully(content); +this.schema = new Parser().parse(new String(content, StandardCharsets.UTF_8)); } else { this.schema = null; } diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericRecordTypeInfoTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericRecordTypeInfoTest.java new file mode 100644 index 000..b91053f60c8 --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericRecordTypeInfoTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE f
[flink] branch master updated (1dcad2272e9 -> 610164d57dd)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 1dcad2272e9 [hotfix][docs-zh] Fix two inaccessible links. (#19678) add 610164d57dd [FLINK-27255] [flink-avro] flink-avro does not support ser/de of large avro schema (#19645) No new revisions were added by this update. Summary of changes: .../avro/typeutils/GenericRecordAvroTypeInfo.java | 10 ++-- .../avro/typeutils/SerializableAvroSchema.java | 11 +--- ...est.java => AvroGenericRecordTypeInfoTest.java} | 21 --- .../typeutils/AvroSerializerGenericRecordTest.java | 7 ++--- ...a => AvroSerializerLargeGenericRecordTest.java} | 11 +++- .../flink/formats/avro/utils/AvroTestUtils.java| 30 ++ 6 files changed, 64 insertions(+), 26 deletions(-) copy flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/{AvroTypeInfoTest.java => AvroGenericRecordTypeInfoTest.java} (63%) copy flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/{AvroSerializerGenericRecordTest.java => AvroSerializerLargeGenericRecordTest.java} (76%)
[flink] branch master updated (a112465efe0 -> 4c8f323d9ec)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from a112465efe0 [FLINK-27442][Formats][Avro Confluent] Add Confluent repo to module flink-sql-avro-confluent-registry add 4c8f323d9ec [FLINK-27465] Handle conversion of negative long to timestamp in AvroRowDeserializationSchema No new revisions were added by this update. Summary of changes: .../org/apache/flink/formats/avro/AvroRowDeserializationSchema.java | 5 + 1 file changed, 5 insertions(+)
[flink] branch master updated (2e632af6a75 -> c65899f7790)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 2e632af6a75 [FLINK-27369][table-planner] Fix the type mismatch error in RemoveUnreachableCoalesceArgumentsRule add c65899f7790 [FLINK-27381][connector/common] Use Arrays.hashcode for HybridSourceSplit wrappedSplitBytes No new revisions were added by this update. Summary of changes: .../apache/flink/connector/base/source/hybrid/HybridSourceSplit.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[flink-web] branch asf-site updated: [FLINK-26781] Link Flink Kubernetes operator doc site to Flink Website
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git The following commit(s) were added to refs/heads/asf-site by this push: new 9b9c20f [FLINK-26781] Link Flink Kubernetes operator doc site to Flink Website 9b9c20f is described below commit 9b9c20fadfb8a5508be3889e1b1ce54210768bc0 Author: czy006 <944989...@qq.com> AuthorDate: Tue Mar 22 21:31:30 2022 +0800 [FLINK-26781] Link Flink Kubernetes operator doc site to Flink Website --- _config.yml | 11 +++ _includes/navbar.html | 3 +++ 2 files changed, 14 insertions(+) diff --git a/_config.yml b/_config.yml index 7ba6b8b..ca0c310 100644 --- a/_config.yml +++ b/_config.yml @@ -29,6 +29,13 @@ FLINK_ML_STABLE_SHORT: "2.0" FLINK_ML_GITHUB_URL: https://github.com/apache/flink-ml FLINK_ML_GITHUB_REPO_NAME: flink-ml +#TODO If kubernetes operator stable version complete,replace it to stable version +FLINK_KUBERNETES_OPERATOR_VERSION_STABLE: 1.0 +FLINK_KUBERNETES_OPERATOR_STABLE_SHORT: "1.0" + +FLINK_KUBERNETES_OPERATOR_URL: https://github.com/apache/flink-kubernetes-operator +FLINK_KUBERNETES_OPERATOR_GITHUB_REPO_NAME: flink-kubernetes-operator + # Example for scala dependent component: # - #name: "Prometheus MetricReporter" @@ -616,6 +623,10 @@ docs-statefun-snapshot: "https://nightlies.apache.org/flink/flink-statefun-docs- docs-ml-stable: "https://nightlies.apache.org/flink/flink-ml-docs-release-2.0; docs-ml-snapshot: "https://nightlies.apache.org/flink/flink-ml-docs-master; +#TODO If kubernetes operator stable version complete,replace it to stable url +docs-kubernetes-operator-stable: "https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main; +docs-kubernetes-operator-snapshot: "https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main; + # Used by the gh_link plugin jira: "https://issues.apache.org/jira/browse/FLINK; stackoverflow: "https://stackoverflow.com/search?q=flink; diff --git a/_includes/navbar.html b/_includes/navbar.html index f00f19e..e991d69 100755 --- a/_includes/navbar.html +++ b/_includes/navbar.html @@ -76,6 +76,7 @@ With Flink With Flink Stateful Functions With Flink ML +With Flink Kubernetes Operator {{ site.data.i18n[page.language].training_course }} @@ -90,6 +91,8 @@ Flink Stateful Functions Master (Latest Snapshot) Flink ML {{site.FLINK_ML_STABLE_SHORT}} (Latest stable release) Flink ML Master (Latest Snapshot) +Flink Kubernetes Operator {{site.FLINK_KUBERNETES_OPERATOR_STABLE_SHORT}} (Latest stable release) +Flink Kubernetes Operator Master (Latest Snapshot)
[flink-kubernetes-operator] branch main updated: [FLINK-26473] Check for deployment errors when listJobs fails
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new 4b7ffaa [FLINK-26473] Check for deployment errors when listJobs fails 4b7ffaa is described below commit 4b7ffaa47082a5baaa4150d6315a9307087e64c6 Author: Thomas Weise AuthorDate: Sun Mar 13 21:41:11 2022 -0700 [FLINK-26473] Check for deployment errors when listJobs fails --- .../exception/DeploymentFailedException.java | 31 +++-- .../kubernetes/operator/observer/BaseObserver.java | 37 -- .../kubernetes/operator/observer/JobObserver.java | 15 +++- .../kubernetes/operator/service/FlinkService.java | 12 .../kubernetes/operator/utils/FlinkUtils.java | 16 +++-- .../flink/kubernetes/operator/TestUtils.java | 37 +++--- .../kubernetes/operator/TestingFlinkService.java | 12 .../controller/FlinkDeploymentControllerTest.java | 79 ++ 8 files changed, 210 insertions(+), 29 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java index b0ea7f3..572f065 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java @@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.exception; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import io.fabric8.kubernetes.api.model.ContainerStateWaiting; import io.fabric8.kubernetes.api.model.Event; import io.fabric8.kubernetes.api.model.EventBuilder; import io.fabric8.kubernetes.api.model.apps.DeploymentCondition; @@ -26,15 +27,33 @@ import io.fabric8.kubernetes.api.model.apps.DeploymentCondition; /** Exception to signal terminal deployment failure. */ public class DeploymentFailedException extends RuntimeException { public static final String COMPONENT_JOBMANAGER = "JobManagerDeployment"; +public static final String REASON_CRASH_LOOP_BACKOFF = "CrashLoopBackOff"; + private static final long serialVersionUID = -1070179896083579221L; public final String component; -public final DeploymentCondition deployCondition; +public final String type; +public final String reason; +public final String lastTransitionTime; +public final String lastUpdateTime; public DeploymentFailedException(String component, DeploymentCondition deployCondition) { super(deployCondition.getMessage()); this.component = component; -this.deployCondition = deployCondition; +this.type = deployCondition.getType(); +this.reason = deployCondition.getReason(); +this.lastTransitionTime = deployCondition.getLastTransitionTime(); +this.lastUpdateTime = deployCondition.getLastUpdateTime(); +} + +public DeploymentFailedException( +String component, String type, ContainerStateWaiting stateWaiting) { +super(stateWaiting.getMessage()); +this.component = component; +this.type = type; +this.reason = stateWaiting.getReason(); +this.lastTransitionTime = null; +this.lastUpdateTime = null; } public static Event asEvent(DeploymentFailedException dfe, FlinkDeployment flinkApp) { @@ -47,10 +66,10 @@ public class DeploymentFailedException extends RuntimeException { .withNamespace(flinkApp.getMetadata().getNamespace()) .withUid(flinkApp.getMetadata().getUid()) .endInvolvedObject() -.withType(dfe.deployCondition.getType()) -.withReason(dfe.deployCondition.getReason()) - .withFirstTimestamp(dfe.deployCondition.getLastTransitionTime()) - .withLastTimestamp(dfe.deployCondition.getLastUpdateTime()) +.withType(dfe.type) +.withReason(dfe.reason) +.withFirstTimestamp(dfe.lastTransitionTime) +.withLastTimestamp(dfe.lastUpdateTime) .withMessage(dfe.getMessage()) .withNewMetadata() .withGenerateName(flinkApp.getMetadata().getName()) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java index 180cb6f..a694f6d 100644 --- a/flink-kubernetes-operator/src/main/java/org/apa
[flink-kubernetes-operator] branch main updated: [FLINK-26432] Cleanly separate validator, observer and reconciler modules
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new 4bac805 [FLINK-26432] Cleanly separate validator, observer and reconciler modules 4bac805 is described below commit 4bac8059ec5d3c68da19beec22f29c434562516e Author: Gyula Fora AuthorDate: Mon Feb 28 06:17:11 2022 +0100 [FLINK-26432] Cleanly separate validator, observer and reconciler modules closes #26 --- .../flink/kubernetes/operator/FlinkOperator.java | 3 + .../controller/FlinkDeploymentController.java | 20 ++- .../operator/crd/status/FlinkDeploymentStatus.java | 6 +- .../JobManagerDeploymentStatus.java| 4 +- .../operator/observer/JobStatusObserver.java | 115 --- .../kubernetes/operator/observer/Observer.java | 161 + .../operator/reconciler/BaseReconciler.java| 74 +- .../operator/reconciler/JobReconciler.java | 27 +--- .../operator/reconciler/SessionReconciler.java | 7 - .../flink/kubernetes/operator/TestUtils.java | 17 +++ .../kubernetes/operator/TestingFlinkService.java | 7 +- .../controller/FlinkDeploymentControllerTest.java | 47 +++--- .../operator/observer/JobStatusObserverTest.java | 77 -- .../kubernetes/operator/observer/ObserverTest.java | 135 + .../operator/reconciler/JobReconcilerTest.java | 4 +- .../crds/flinkdeployments.flink.apache.org-v1.yml | 7 + 16 files changed, 394 insertions(+), 317 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index 8fa72b9..fd9d60c 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -21,6 +21,7 @@ import org.apache.flink.kubernetes.operator.config.DefaultConfig; import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig; import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController; import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils; +import org.apache.flink.kubernetes.operator.observer.Observer; import org.apache.flink.kubernetes.operator.reconciler.JobReconciler; import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler; import org.apache.flink.kubernetes.operator.service.FlinkService; @@ -55,6 +56,7 @@ public class FlinkOperator { FlinkService flinkService = new FlinkService(client); +Observer observer = new Observer(flinkService); JobReconciler jobReconciler = new JobReconciler(client, flinkService); SessionReconciler sessionReconciler = new SessionReconciler(client, flinkService); @@ -66,6 +68,7 @@ public class FlinkOperator { client, namespace, validator, +observer, jobReconciler, sessionReconciler); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index 5dff8f4..b9d5cd4 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -23,6 +23,7 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus; import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException; import org.apache.flink.kubernetes.operator.exception.ReconciliationException; +import org.apache.flink.kubernetes.operator.observer.Observer; import org.apache.flink.kubernetes.operator.reconciler.BaseReconciler; import org.apache.flink.kubernetes.operator.reconciler.JobReconciler; import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler; @@ -64,6 +65,7 @@ public class FlinkDeploymentController private final String operatorNamespace; private final FlinkDeploymentValidator validator; +private final Observer observer; private final JobReconciler jobReconciler; private final SessionReconciler sessionReconciler; private final DefaultConfig defaultConfig; @@ -73,12 +75,14 @@ public class FlinkDeploymentController KubernetesClient kubernetesClient, String operatorNamespace
[flink-kubernetes-operator] 02/02: [FLINK-26261] Refactor to simplify reconciliation logic
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit 684b4597764daefe53fb1399298324bec5bc738e Author: Thomas Weise AuthorDate: Sat Feb 26 14:55:43 2022 -0800 [FLINK-26261] Refactor to simplify reconciliation logic --- .../flink/kubernetes/operator/FlinkOperator.java | 3 - .../controller/FlinkDeploymentController.java | 95 --- .../operator/observer/JobStatusObserver.java | 4 - .../operator/reconciler/BaseReconciler.java| 103 + .../operator/reconciler/JobReconciler.java | 36 ++- .../operator/reconciler/SessionReconciler.java | 25 - .../kubernetes/operator/service/FlinkService.java | 2 +- .../kubernetes/operator/TestingFlinkService.java | 2 +- .../controller/FlinkDeploymentControllerTest.java | 38 ++-- .../operator/reconciler/JobReconcilerTest.java | 45 +++-- 10 files changed, 218 insertions(+), 135 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index 7e8edf5..8fa72b9 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -21,7 +21,6 @@ import org.apache.flink.kubernetes.operator.config.DefaultConfig; import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig; import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController; import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils; -import org.apache.flink.kubernetes.operator.observer.JobStatusObserver; import org.apache.flink.kubernetes.operator.reconciler.JobReconciler; import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler; import org.apache.flink.kubernetes.operator.service.FlinkService; @@ -56,7 +55,6 @@ public class FlinkOperator { FlinkService flinkService = new FlinkService(client); -JobStatusObserver observer = new JobStatusObserver(flinkService); JobReconciler jobReconciler = new JobReconciler(client, flinkService); SessionReconciler sessionReconciler = new SessionReconciler(client, flinkService); @@ -68,7 +66,6 @@ public class FlinkOperator { client, namespace, validator, -observer, jobReconciler, sessionReconciler); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index a6c6072..2e79926 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -23,16 +23,15 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus; import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException; import org.apache.flink.kubernetes.operator.exception.ReconciliationException; -import org.apache.flink.kubernetes.operator.observer.JobStatusObserver; +import org.apache.flink.kubernetes.operator.reconciler.BaseReconciler; import org.apache.flink.kubernetes.operator.reconciler.JobReconciler; import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.kubernetes.operator.utils.IngressUtils; import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator; +import org.apache.flink.kubernetes.utils.Constants; import io.fabric8.kubernetes.api.model.apps.Deployment; -import io.fabric8.kubernetes.api.model.apps.DeploymentSpec; -import io.fabric8.kubernetes.api.model.apps.DeploymentStatus; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.javaoperatorsdk.operator.api.reconciler.Context; @@ -50,10 +49,8 @@ import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; import java.util.List; import java.util.Optional; -import java.util.concurrent.TimeUnit; /** Controller that runs the main reconcile loop for Flink deployments. */ @ControllerConfiguration(generationAwareEventProcessing = false) @@ -63,33
[flink-kubernetes-operator] 01/02: [FLINK-26261] Wait for JobManager deployment ready before accessing REST API
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit becf19c82616e8f3a148c2ce27141b742f019ab3 Author: Thomas Weise AuthorDate: Thu Feb 24 14:26:30 2022 -0800 [FLINK-26261] Wait for JobManager deployment ready before accessing REST API --- .../controller/FlinkDeploymentController.java | 76 +++--- .../operator/observer/JobStatusObserver.java | 7 ++ .../kubernetes/operator/service/FlinkService.java | 25 +++ .../kubernetes/operator/TestingFlinkService.java | 5 ++ .../controller/FlinkDeploymentControllerTest.java | 58 + 5 files changed, 151 insertions(+), 20 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index f97d5fd..a6c6072 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -30,7 +30,11 @@ import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.kubernetes.operator.utils.IngressUtils; import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.apps.DeploymentSpec; +import io.fabric8.kubernetes.api.model.apps.DeploymentStatus; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; @@ -41,10 +45,12 @@ import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.RetryInfo; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; +import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -58,6 +64,7 @@ public class FlinkDeploymentController private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentController.class); public static final int REFRESH_SECONDS = 60; +public static final int PORT_READY_DELAY_SECONDS = 10; private final KubernetesClient kubernetesClient; @@ -68,6 +75,7 @@ public class FlinkDeploymentController private final JobReconciler jobReconciler; private final SessionReconciler sessionReconciler; private final DefaultConfig defaultConfig; +private final HashSet jobManagerDeployments = new HashSet<>(); public FlinkDeploymentController( DefaultConfig defaultConfig, @@ -96,6 +104,7 @@ public class FlinkDeploymentController operatorNamespace, kubernetesClient, true); +jobManagerDeployments.remove(flinkApp.getMetadata().getSelfLink()); return DeleteControl.defaultDelete(); } @@ -113,8 +122,11 @@ public class FlinkDeploymentController Configuration effectiveConfig = FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig()); try { -boolean successfulObserve = observer.observeFlinkJobStatus(flinkApp, effectiveConfig); -if (successfulObserve) { +// only check job status when the JM deployment is ready +boolean shouldReconcile = + !jobManagerDeployments.contains(flinkApp.getMetadata().getSelfLink()) +|| observer.observeFlinkJobStatus(flinkApp, effectiveConfig); +if (shouldReconcile) { reconcileFlinkDeployment(operatorNamespace, flinkApp, effectiveConfig); updateForReconciliationSuccess(flinkApp); } @@ -125,6 +137,49 @@ public class FlinkDeploymentController } catch (Exception e) { throw new ReconciliationException(e); } + +return checkJobManagerDeployment(flinkApp, context, effectiveConfig); +} + +private UpdateControl checkJobManagerDeployment( +FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) { +if (!jobManagerDeployments.contains(flinkApp.getMetadata().getSe
[flink-kubernetes-operator] branch main updated (e2ac688 -> 684b459)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git. from e2ac688 [hotfix] copy base config in config builder new becf19c [FLINK-26261] Wait for JobManager deployment ready before accessing REST API new 684b459 [FLINK-26261] Refactor to simplify reconciliation logic The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../flink/kubernetes/operator/FlinkOperator.java | 3 - .../controller/FlinkDeploymentController.java | 57 ++-- .../operator/observer/JobStatusObserver.java | 3 + .../operator/reconciler/BaseReconciler.java| 103 + .../operator/reconciler/JobReconciler.java | 36 ++- .../operator/reconciler/SessionReconciler.java | 25 - .../kubernetes/operator/service/FlinkService.java | 25 + .../kubernetes/operator/TestingFlinkService.java | 5 + .../controller/FlinkDeploymentControllerTest.java | 42 + .../operator/reconciler/JobReconcilerTest.java | 45 +++-- 10 files changed, 279 insertions(+), 65 deletions(-) create mode 100644 flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
[flink-kubernetes-operator] branch main updated: [hotfix] Fix observer sort logic for job status merge
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new 3e6a00f [hotfix] Fix observer sort logic for job status merge 3e6a00f is described below commit 3e6a00f0eb528ad85b91a99424258925ea9304d5 Author: zeus1ammon <45544051+zeus1am...@users.noreply.github.com> AuthorDate: Wed Feb 23 11:16:55 2022 -0500 [hotfix] Fix observer sort logic for job status merge --- .../apache/flink/kubernetes/operator/observer/JobStatusObserver.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java index c0dabcb..0740dea 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java @@ -89,7 +89,7 @@ public class JobStatusObserver { JobStatus newStatus = oldStatus; Collections.sort( clusterJobStatuses, -(j1, j2) -> -1 * Long.compare(j1.getStartTime(), j1.getStartTime())); +(j1, j2) -> -1 * Long.compare(j1.getStartTime(), j2.getStartTime())); JobStatusMessage newJob = clusterJobStatuses.get(0); if (newStatus == null) {
[flink-kubernetes-operator] branch main updated: [FLINK-26260] Support watching specific namespaces only
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new d8f19bc [FLINK-26260] Support watching specific namespaces only d8f19bc is described below commit d8f19bcea6d493d4cb5a30448c9f2865e6fef56b Author: Gyula Fora AuthorDate: Mon Feb 21 11:09:02 2022 +0100 [FLINK-26260] Support watching specific namespaces only --- README.md | 6 +++ .../flink/kubernetes/operator/FlinkOperator.java | 18 .../operator/controller/FlinkControllerConfig.java | 51 ++ helm/flink-operator/templates/flink-operator.yaml | 2 + helm/flink-operator/templates/webhook.yaml | 10 - helm/flink-operator/values.yaml| 3 ++ 6 files changed, 81 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 26dd8fc..aecad31 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,8 @@ The operator is managed helm chart. To install run: helm install flink-operator . ``` +### Validating webhook + In order to use the webhook for FlinkDeployment validation, you must install the cert-manager on the Kubernetes cluster: ``` kubectl apply -f https://github.com/jetstack/cert-manager/releases/download/v1.7.1/cert-manager.yaml @@ -17,6 +19,10 @@ kubectl apply -f https://github.com/jetstack/cert-manager/releases/download/v1.7 The webhook can be disabled during helm install by passing the `--set webhook.create=false` parameter or editing the `values.yaml` directly. +### Watching only specific namespaces + +The operator supports watching a specific list of namespaces for FlinkDeployment resources. You can enable it by setting the `--set watchNamespaces={flink-test}` parameter. + ## User Guide ### Create a new Flink deployment The flink-operator will watch the CRD resources and submit a new Flink deployment once the CR is applied. diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index 8cfdef1..9484bea 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -17,6 +17,7 @@ package org.apache.flink.kubernetes.operator; +import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig; import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController; import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils; import org.apache.flink.kubernetes.operator.observer.JobStatusObserver; @@ -27,7 +28,6 @@ import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.javaoperatorsdk.operator.Operator; -import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider; import io.javaoperatorsdk.operator.config.runtime.DefaultConfigurationService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +35,8 @@ import org.slf4j.LoggerFactory; /** Main Class for Flink native k8s operator. */ public class FlinkOperator { private static final Logger LOG = LoggerFactory.getLogger(FlinkOperator.class); -private static final String ENV_FLINK_OPERATOR_CONF_DIR = "FLINK_OPERATOR_CONF_DIR"; + +public static final String ENV_FLINK_OPERATOR_CONF_DIR = "FLINK_OPERATOR_CONF_DIR"; public static void main(String... args) { @@ -48,11 +49,9 @@ public class FlinkOperator { if (namespace == null) { namespace = "default"; } -Operator operator = -new Operator( -client, -new ConfigurationServiceOverrider(DefaultConfigurationService.instance()) -.build()); + +DefaultConfigurationService configurationService = DefaultConfigurationService.instance(); +Operator operator = new Operator(client, configurationService); FlinkService flinkService = new FlinkService(client); @@ -64,7 +63,10 @@ public class FlinkOperator { new FlinkDeploymentController( client, namespace, observer, jobReconciler, sessionReconciler); -operator.register(controller); +FlinkControllerConfig controllerConfig = new FlinkControllerConfig(controller); +controllerConfig.setConfigurationService(configurationService); + +operator.register(controller, controllerConfig); operator.installShutdownHook(); operator.start(); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/
[flink-kubernetes-operator] branch main updated (b5d71f1 -> 0737641)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git. discard b5d71f1 Seed commit to be removed new 0737641 Seed commit This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (b5d71f1) \ N -- N -- N refs/heads/main (0737641) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes:
[flink-kubernetes-operator] 01/01: Seed commit
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit 07376419b83770c6f946d419263b376c7797c673 Author: Thomas Weise AuthorDate: Thu Feb 10 19:12:35 2022 -0800 Seed commit --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md new file mode 100644 index 000..4c9483b --- /dev/null +++ b/README.md @@ -0,0 +1,2 @@ +# flink-kubernetes-operator +Apache Flink Kubernetes Operator
[flink-kubernetes-operator] branch main created (now b5d71f1)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git. at b5d71f1 Seed commit to be removed This branch includes the following new commits: new b5d71f1 Seed commit to be removed The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[flink-kubernetes-operator] 01/01: Seed commit to be removed
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit b5d71f10c88bebc82519656f2be63631535737d8 Author: Thomas Weise AuthorDate: Thu Feb 10 19:12:35 2022 -0800 Seed commit to be removed --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md new file mode 100644 index 000..4c9483b --- /dev/null +++ b/README.md @@ -0,0 +1,2 @@ +# flink-kubernetes-operator +Apache Flink Kubernetes Operator
svn commit: r52332 - /release/flink/flink-1.14.2/
Author: thw Date: Mon Jan 31 04:50:36 2022 New Revision: 52332 Log: remove flink-1.14.2 Removed: release/flink/flink-1.14.2/
[flink-web] branch asf-site updated: Rebuild website
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git The following commit(s) were added to refs/heads/asf-site by this push: new e1657ab Rebuild website e1657ab is described below commit e1657ab3ded5fcc17d0d4b69974839c95e15d6e8 Author: Thomas Weise AuthorDate: Wed Jan 19 17:32:53 2022 -0800 Rebuild website --- content/blog/feed.xml | 469 +++ content/blog/index.html | 38 +- content/blog/page10/index.html | 38 +- content/blog/page11/index.html | 40 +- content/blog/page12/index.html | 38 +- content/blog/page13/index.html | 36 +- content/blog/page14/index.html | 37 +- content/blog/page15/index.html | 38 +- content/blog/page16/index.html | 44 ++- content/blog/page17/index.html | 45 ++- content/blog/page18/index.html | 25 ++ content/blog/page2/index.html | 40 +- content/blog/page3/index.html | 40 +- content/blog/page4/index.html | 38 +- content/blog/page5/index.html | 36 +- content/blog/page6/index.html | 38 +- content/blog/page7/index.html | 40 +- content/blog/page8/index.html | 38 +- content/blog/page9/index.html | 36 +- content/downloads.html | 31 +- content/index.html | 8 +- content/news/2022/01/17/release-1.14.3.html | 566 content/q/gradle-quickstart.sh | 2 +- content/q/quickstart-scala.sh | 2 +- content/q/quickstart.sh | 2 +- content/q/sbt-quickstart.sh | 2 +- content/zh/downloads.html | 35 +- content/zh/index.html | 8 +- 28 files changed, 1377 insertions(+), 433 deletions(-) diff --git a/content/blog/feed.xml b/content/blog/feed.xml index 024dca6..24fd2a2 100644 --- a/content/blog/feed.xml +++ b/content/blog/feed.xml @@ -7,6 +7,324 @@ https://flink.apache.org/blog/feed.xml; rel="self" type="application/rss+xml" /> +Apache Flink 1.14.3 Release Announcement +pThe Apache Flink community released the second bugfix version of the Apache Flink 1.14 series. +The first bugfix release was 1.14.2, being an emergency release due to an Apache Log4j Zero Day (CVE-2021-44228). Flink 1.14.1 was abandoned. +That means that this Flink release is the first bugfix release of the Flink 1.14 series which contains bugfixes not related to the mentioned CVE./p + +pThis release includes 164 fixes and minor improvements for Flink 1.14.0. The list below includes bugfixes and improvements. For a complete list of all changes see: +a href=https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12351075amp;projectId=12315522JIRA/a./p; + +pWe highly recommend all users to upgrade to Flink 1.14.3./p + +pUpdated Maven dependencies:/p + +div class=highlightprecode class=language-xmlspan class=ntlt;dependencygt;/span + span class=ntlt;groupIdgt;/spanorg.apache.flinkspan class=ntlt;/groupIdgt;/span + span class=ntlt;artifactIdgt;/spanflink-javaspan class=ntlt;/artifactIdgt;/span + span class=ntlt;versiongt;/span1.14.3span class=ntlt;/versiongt;/span +span class=ntlt;/dependencygt;/span +span class=ntlt;dependencygt;/span + span class=ntlt;groupIdgt;/spanorg.apache.flinkspan class=ntlt;/groupIdgt;/span + span class=ntlt;artifactIdgt;/spanflink-streaming-java_2.11span class=ntlt;/artifactIdgt;/span + span class=ntlt;versiongt;/span1.14.3span class=ntlt;/versiongt;/span +span class=ntlt;/dependencygt;/span +span class=ntlt;dependencygt;/span + span class=ntlt;groupIdgt;/spanorg.apache.flinkspan class=ntlt;/groupIdgt;/span + span class=ntlt;artifactIdgt;/spanflink-clients_2.11span class=ntlt;/artifactIdgt;/span + span class=ntlt;versiongt;/span1.14.3span class=ntlt;/versiongt;/span +span class=ntlt;/dependencygt;/span/code/pre/div + +pYou can find the binaries on the updated a href=/downloads.htmlDownloads page/a./p + +div class=highlightprecodeRelease Notes - Flink - Version 1.14.3 +/code/pre/div + +h2Sub-task +/h2 +ul +li[a href=https://issues.apache.org/jira/browse/FLINK-24355FLINK-24355/a;] - Expose the flag for enabling checkpoints after tasks finish in the Web UI +/li +/ul + +h2Bug +/h2 +ul +li[a href=https://issues.apache.org/jira/browse/FLINK-15987FLINK-15987/a;] - SELECT 1.0e0 / 0.0e0 throws NumberFormatException +/li +li[a href=https://issues.apache.org/jira/browse/FLINK-17914FLINK-17914/a;] - HistoryServer deletes cached archives if archive listing fails +/li +li[a href=https://issues.apache.org/jira/browse/FLINK-19142FLINK-19142/a;] - Local
[flink-web] branch asf-site updated: fixup: Add Flink release 1.14.3
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git The following commit(s) were added to refs/heads/asf-site by this push: new 309ccfe fixup: Add Flink release 1.14.3 309ccfe is described below commit 309ccfed5a6ee832cfd62a0a5e7750f54137c2b9 Author: Thomas Weise AuthorDate: Wed Jan 19 17:29:51 2022 -0800 fixup: Add Flink release 1.14.3 --- _posts/2022-01-17-release-1.14.3.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/_posts/2022-01-17-release-1.14.3.md b/_posts/2022-01-17-release-1.14.3.md index 09bcb76..89bfa5e 100644 --- a/_posts/2022-01-17-release-1.14.3.md +++ b/_posts/2022-01-17-release-1.14.3.md @@ -6,11 +6,12 @@ categories: news authors: - tweise: name: "Thomas Weise" + twitter: "thweise" - MartijnVisser: name: "Martijn Visser" twitter: "martijnvisser82" -The Apache Flink community released the second bugfix version of the Apache Flink 1.14 series. +excerpt: The Apache Flink community released the second bugfix version of the Apache Flink 1.14 series. --- @@ -328,4 +329,4 @@ You can find the binaries on the updated [Downloads page]({{ site.baseurl }}/dow [FLINK-25472] - Update to Log4j 2.17.1 - \ No newline at end of file +
[flink] branch release-1.14 updated: Update japicmp configuration for 1.14.3
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.14 by this push: new bd1cd34 Update japicmp configuration for 1.14.3 bd1cd34 is described below commit bd1cd343a97e9bb9cff53ffe3baae37fde1034c5 Author: Thomas Weise AuthorDate: Wed Jan 19 16:31:04 2022 -0800 Update japicmp configuration for 1.14.3 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 9b556cb..6bbfd62 100644 --- a/pom.xml +++ b/pom.xml @@ -155,7 +155,7 @@ under the License. For Hadoop 2.7, the minor Hadoop version supported for flink-shaded-hadoop-2-uber is 2.7.5 --> 2.7.5 - 1.14.0 + 1.14.3 tools/japicmp-output 2.4.2
[flink-web] branch asf-site updated: Add Flink release 1.14.3
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git The following commit(s) were added to refs/heads/asf-site by this push: new c9d07f1 Add Flink release 1.14.3 c9d07f1 is described below commit c9d07f112643f7670f65550818e0785e07c52024 Author: martijnvisser AuthorDate: Wed Jan 12 19:44:49 2022 +0100 Add Flink release 1.14.3 --- _config.yml | 28 +-- _posts/2022-01-17-release-1.14.3.md | 331 q/gradle-quickstart.sh | 2 +- q/quickstart-scala.sh | 2 +- q/quickstart.sh | 2 +- q/sbt-quickstart.sh | 2 +- 6 files changed, 351 insertions(+), 16 deletions(-) diff --git a/_config.yml b/_config.yml index 96224b6..41464de 100644 --- a/_config.yml +++ b/_config.yml @@ -9,7 +9,7 @@ url: https://flink.apache.org DOCS_BASE_URL: https://nightlies.apache.org/flink/ -FLINK_VERSION_STABLE: 1.14.2 +FLINK_VERSION_STABLE: 1.14.3 FLINK_VERSION_STABLE_SHORT: "1.14" FLINK_ISSUES_URL: https://issues.apache.org/jira/browse/FLINK @@ -64,23 +64,23 @@ flink_releases: - version_short: "1.14" binary_release: - name: "Apache Flink 1.14.2" + name: "Apache Flink 1.14.3" scala_211: id: "1142-download_211" -url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.11.tgz; -asc_url: "https://downloads.apache.org/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.11.tgz.asc; -sha512_url: "https://downloads.apache.org/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.11.tgz.sha512; +url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.11.tgz; +asc_url: "https://downloads.apache.org/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.11.tgz.asc; +sha512_url: "https://downloads.apache.org/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.11.tgz.sha512; scala_212: id: "1142-download_212" -url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.12.tgz; -asc_url: "https://downloads.apache.org/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.12.tgz.asc; -sha512_url: "https://downloads.apache.org/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.12.tgz.sha512; +url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz; +asc_url: "https://downloads.apache.org/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz.asc; +sha512_url: "https://downloads.apache.org/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz.sha512; source_release: - name: "Apache Flink 1.14.2" + name: "Apache Flink 1.14.3" id: "1142-download-source" - url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.14.2/flink-1.14.2-src.tgz; - asc_url: "https://downloads.apache.org/flink/flink-1.14.2/flink-1.14.2-src.tgz.asc; - sha512_url: "https://downloads.apache.org/flink/flink-1.14.2/flink-1.14.2-src.tgz.sha512; + url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.14.3/flink-1.14.3-src.tgz; + asc_url: "https://downloads.apache.org/flink/flink-1.14.3/flink-1.14.3-src.tgz.asc; + sha512_url: "https://downloads.apache.org/flink/flink-1.14.3/flink-1.14.3-src.tgz.sha512; release_notes_url: "https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14; - version_short: "1.13" @@ -205,6 +205,10 @@ release_archive: flink: - version_short: "1.14" +version_long: 1.14.3 +release_date: 2022-01-17 + - +version_short: "1.14" version_long: 1.14.2 release_date: 2021-12-16 - diff --git a/_posts/2022-01-17-release-1.14.3.md b/_posts/2022-01-17-release-1.14.3.md new file mode 100644 index 000..09bcb76 --- /dev/null +++ b/_posts/2022-01-17-release-1.14.3.md @@ -0,0 +1,331 @@ +--- +layout: post +title: "Apache Flink 1.14.3 Release Announcement" +date: 2022-01-17T08:00:00.000Z +categories: news +authors: +- tweise: + name: "Thomas Weise" +- MartijnVisser: + name: "Martijn Visser" + twitter: "martijnvisser82" + +The Apache Flink community released the second bugfix version of the Apache Flink 1.14 series. + +--- + +The Apache Flink community released the second bugfix version of the Apache Flink 1.14 series. +The first bugfix release was 1.14.2, being an emergency release due to an Apache Log4j Zero Day (CVE-2021-44228). Flink 1.14.1 was abandoned. +That means that this Flink release is the first bugfix release of the Flink 1.14 series which
[flink-docker] branch master updated: Add 1.14.3 release
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-docker.git The following commit(s) were added to refs/heads/master by this push: new 312e9af Add 1.14.3 release 312e9af is described below commit 312e9af1313817d78e18c9d5b6dbdff3c4e408d5 Author: Thomas Weise AuthorDate: Mon Jan 17 13:26:17 2022 -0800 Add 1.14.3 release --- 1.14/scala_2.11-java11-debian/Dockerfile | 6 +++--- 1.14/scala_2.11-java11-debian/release.metadata | 2 +- 1.14/scala_2.11-java8-debian/Dockerfile| 6 +++--- 1.14/scala_2.11-java8-debian/release.metadata | 2 +- 1.14/scala_2.12-java11-debian/Dockerfile | 6 +++--- 1.14/scala_2.12-java11-debian/release.metadata | 2 +- 1.14/scala_2.12-java8-debian/Dockerfile| 6 +++--- 1.14/scala_2.12-java8-debian/release.metadata | 2 +- 8 files changed, 16 insertions(+), 16 deletions(-) diff --git a/1.14/scala_2.11-java11-debian/Dockerfile b/1.14/scala_2.11-java11-debian/Dockerfile index 9c57ecc..b5228a2 100644 --- a/1.14/scala_2.11-java11-debian/Dockerfile +++ b/1.14/scala_2.11-java11-debian/Dockerfile @@ -44,9 +44,9 @@ RUN set -ex; \ gosu nobody true # Configure Flink version -ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download=flink/flink-1.14.2/flink-1.14.2-bin-scala_2.11.tgz \ - FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.11.tgz.asc \ -GPG_KEY=19F2195E1B4816D765A2C324C2EED7B111D464BA \ +ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download=flink/flink-1.14.3/flink-1.14.3-bin-scala_2.11.tgz \ + FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.11.tgz.asc \ +GPG_KEY=10409A66C7C2F297C8581C2A12DEE3E4D920A98C \ CHECK_GPG=true # Prepare environment diff --git a/1.14/scala_2.11-java11-debian/release.metadata b/1.14/scala_2.11-java11-debian/release.metadata index cda57d1..84a6d6d 100644 --- a/1.14/scala_2.11-java11-debian/release.metadata +++ b/1.14/scala_2.11-java11-debian/release.metadata @@ -1,2 +1,2 @@ -Tags: 1.14.2-scala_2.11-java11, 1.14-scala_2.11-java11, scala_2.11-java11 +Tags: 1.14.3-scala_2.11-java11, 1.14-scala_2.11-java11, scala_2.11-java11 Architectures: amd64 diff --git a/1.14/scala_2.11-java8-debian/Dockerfile b/1.14/scala_2.11-java8-debian/Dockerfile index 76af456..6aff4df 100644 --- a/1.14/scala_2.11-java8-debian/Dockerfile +++ b/1.14/scala_2.11-java8-debian/Dockerfile @@ -44,9 +44,9 @@ RUN set -ex; \ gosu nobody true # Configure Flink version -ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download=flink/flink-1.14.2/flink-1.14.2-bin-scala_2.11.tgz \ - FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.11.tgz.asc \ -GPG_KEY=19F2195E1B4816D765A2C324C2EED7B111D464BA \ +ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download=flink/flink-1.14.3/flink-1.14.3-bin-scala_2.11.tgz \ + FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.11.tgz.asc \ +GPG_KEY=10409A66C7C2F297C8581C2A12DEE3E4D920A98C \ CHECK_GPG=true # Prepare environment diff --git a/1.14/scala_2.11-java8-debian/release.metadata b/1.14/scala_2.11-java8-debian/release.metadata index c20e599..f405f57 100644 --- a/1.14/scala_2.11-java8-debian/release.metadata +++ b/1.14/scala_2.11-java8-debian/release.metadata @@ -1,2 +1,2 @@ -Tags: 1.14.2-scala_2.11-java8, 1.14-scala_2.11-java8, scala_2.11-java8, 1.14.2-scala_2.11, 1.14-scala_2.11, scala_2.11 +Tags: 1.14.3-scala_2.11-java8, 1.14-scala_2.11-java8, scala_2.11-java8, 1.14.3-scala_2.11, 1.14-scala_2.11, scala_2.11 Architectures: amd64 diff --git a/1.14/scala_2.12-java11-debian/Dockerfile b/1.14/scala_2.12-java11-debian/Dockerfile index 0b22c21..1bb36d3 100644 --- a/1.14/scala_2.12-java11-debian/Dockerfile +++ b/1.14/scala_2.12-java11-debian/Dockerfile @@ -44,9 +44,9 @@ RUN set -ex; \ gosu nobody true # Configure Flink version -ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download=flink/flink-1.14.2/flink-1.14.2-bin-scala_2.12.tgz \ - FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.14.2/flink-1.14.2-bin-scala_2.12.tgz.asc \ -GPG_KEY=19F2195E1B4816D765A2C324C2EED7B111D464BA \ +ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download=flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz \ + FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz.asc \ +GPG_KEY=10409A66C7C2F297C8581C2A12DEE3E4D920A98C \ CHECK_GPG=true # Prepare environment diff --git a/1.14/scala_2.12-java11-debian/release.metadata b/1.14/scala_2.12-java11-debian/release.metadata index 6bed731..23f2f57 100644 --- a/1.14/scala_2.12-java11-debian/release.metadata +++ b/1.14/scala_2.12-java11-debian/release.metadata @@ -1,2 +1,2 @@ -Tags: 1.14.2-scala_2.12-java11, 1.14
[flink-docker] branch dev-1.14 updated (dd51e81 -> a676953)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch dev-1.14 in repository https://gitbox.apache.org/repos/asf/flink-docker.git. from dd51e81 Run tests for 1.14.2 add 9362050 Add GPG key for 1.14.3 release add a676953 update run_travis_tests.sh No new revisions were added by this update. Summary of changes: add-version.sh | 2 ++ testing/run_travis_tests.sh | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-)
[flink] annotated tag release-1.14.3 updated (98997ea -> b36cd46)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to annotated tag release-1.14.3 in repository https://gitbox.apache.org/repos/asf/flink.git. *** WARNING: tag release-1.14.3 was modified! *** from 98997ea (commit) to b36cd46 (tag) tagging 98997ea37ba08eae0f9aa6dd34823238097d8e0d (commit) replaces pre-apache-rename by Thomas Weise on Mon Jan 17 10:07:20 2022 -0800 - Log - Release Flink 1.14.3 -BEGIN PGP SIGNATURE- iQIzBAABCAAdFiCaZsfC8pfIWBwqEt7j5NkgqYwFAmHlsFgACgkQEt7j5Nkg qYwY7A/+JH10viigIExKRdbel9V54UXLcFQdplaRYtt84hgq0yAcHfOOQ/ucMOAV ltyzwwDrbyrC0TKUM8o5U+WaL3T5j5YWyLEd6Md6T3a9vCN2a4XdVlDCZiEIXwJR oi4ACAXpiqFJKoGwE+gH/bp5bFYnq9v8s3sMM6SD9Wi1sYUPeFym64zf0eSAEuh3 vaUJkotKKE9/xuwo3YsuFM8pdQnOYjOGDEkDOuOiOnl4/uf1FLmBJH2gvMVsmryz AGxiGAz4A9XSEEpLBFZRVsZv1AyK3p+mis3ewTA/Wn5HSYBzDBhjWeQHOdbDGkcb rZCXu44FGRkGfGiOCSvaNR3HxrehuyJliJw+QV+vYVgMgXvXpmlCDDcZIxf8z0V+ akgzQPKZVrSEwV+J7RzAecr8jqvOJCiNgSuorkuSmZ1li0iAK+k3yJdDOmhMjuOq hsrYY2+sW+o035p+kN8aoazSJPzFF+Yg/DHBgAxy7Nw/lfyJgHP7+b/jXvFPqsRK rO83hUbtJXmtuIlhrm+UWEryQCBVrvndCOP9Oxgu1aVr1n6ZnRx3Ge4ytX+GCJGb 3JzERfNB0iPloQn6ASJaNFGa5c+EUhXlwFLOea+f8jMuJ4lnIqiM2r4D84jGIsRm CD1QmHqLxZ+EAnYyfSp8Zzr8cuN6H5556h7E7ZKTIYUoFeTLoS0= =x8Q4 -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
svn commit: r52111 - /dev/flink/flink-1.14.3-rc1/ /release/flink/flink-1.14.3/
Author: thw Date: Mon Jan 17 18:05:00 2022 New Revision: 52111 Log: Release Flink 1.14.3 Added: release/flink/flink-1.14.3/ - copied from r52110, dev/flink/flink-1.14.3-rc1/ Removed: dev/flink/flink-1.14.3-rc1/
[flink] annotated tag release-1.14.3-rc1 updated (98997ea -> d32b95f)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to annotated tag release-1.14.3-rc1 in repository https://gitbox.apache.org/repos/asf/flink.git. *** WARNING: tag release-1.14.3-rc1 was modified! *** from 98997ea (commit) to d32b95f (tag) tagging 98997ea37ba08eae0f9aa6dd34823238097d8e0d (commit) replaces pre-apache-rename by Thomas Weise on Mon Jan 10 17:05:11 2022 -0800 - Log - release-1.14.3-rc1 -BEGIN PGP SIGNATURE- iQIzBAABCAAdFiCaZsfC8pfIWBwqEt7j5NkgqYwFAmHc18cACgkQEt7j5Nkg qYx4MBAAr3nARCydSDTLjb/HV2aJ+b9rVcwvtfu2nT5+6IE7XW2JPFhe5CGufwL+ kpEvkDi14mIhpl9LNo2jRil8ejUyUYB5uRVXd0lEw9ZS4zfzCN4/lN62nlRBY3WE Qs1vLT+BH/jIGsl8unGLzJOV+ZRxSW7MkT+sOrWjMY32KBVfKneP3DouGex06msP x1czJDoNNziYe3QNHyu0foNdTanvSW8FEKfKKQkqOAPhCswfphr4Tnj3grRMRDiv U8y6TW5cik4ytRnyrWWLrO8Ou9lYFxn2NVDCPLdd03+guEvamCa0iPSZzTlmNdON +YxyvAwoV3Jjb2lcjLNNd4F9qy/c3b9KWYZBbjVRyH/oIySTmgycZfu30EPM3Vht ttpkUlPL6X0R27aA4qSZp5qtjhIIj+XNI2NPXXmyGMYP330bw8Ble0XSbECBuLoH hOllMdamdwPzTyvqGR7cbg9Sw2L0j+OXCDDgUOp7ebn9CpSssSr0zxK4OwxeSl9r BCRnl3BQ14N+cHto3Q0cNXYFMfGajYsENqgXQSCNzYFFBfWL56D5coBqaF7jdr+9 hZNNlMXfni/YCG3GRdRZwEwTiW7B7SivvOgcSkzscrMn2NKlDS5KviM5WltyVDDS gaeXVz7DVFXUd8GILP+4OoWDT/vxmRt+SyWNx2Lkff8Y1W2IrOc= =XzQk -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
[flink] annotated tag release-1.14.3-rc1 updated (98997ea -> 217984e)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to annotated tag release-1.14.3-rc1 in repository https://gitbox.apache.org/repos/asf/flink.git. *** WARNING: tag release-1.14.3-rc1 was modified! *** from 98997ea (commit) to 217984e (tag) tagging 98997ea37ba08eae0f9aa6dd34823238097d8e0d (commit) replaces pre-apache-rename by Thomas Weise on Sat Jan 8 14:24:11 2022 -0800 - Log - release-1.14.3-rc1 -BEGIN PGP SIGNATURE- iQIzBAABCAAdFiCaZsfC8pfIWBwqEt7j5NkgqYwFAmHaDwsACgkQEt7j5Nkg qYyejA//TW228QDDvgINrwKnjR3XWDu6CBgMbHEzT0a4pcpCq8OZE48Wa/XIyouh 8Q04Z2T4Ov1/vJqe5SSaqfrSpB6FD/bHQYE0pyxiTICj2AEevRKMOgTA2dccEO8g zBiWjwI+yZFRVFb7spXuggLfkDagdWC0da2SUn7Uthf7nXUrGyk0WKvsaexgK1OM cmF0y+9ov4yNPRqYBATGlhfjeBs/qHfZixmkoBxbjqo3B1hXzrKWd48YDk9bTA/l Sd9RHD0maXmacicUcNmJ0X+Z43eGwbAhiHDj3VmzkiGoGiBrhStv3hHTFsVByKAb ln1Fq1kpm03+K5HDViPIsI2h68hYyFDsIzJTIJgrkQQwRNhi19UxjANuRQsAXUii yJyrdOQSobrKX8aNyo7AMJII8lxp9O2w2UxccwAYy6oVtua5tL/rNKERiiOOBy6X Hejr/Tb/N16fIXL3BCDYq1JYV7ivPMsjhEAoqw4IR369En587R1K1NTRzc2s5573 vOIn830Q2gekt8S68KaHY5C1tox8kvtTvoGwufYEOHbVJmddti35eFiz0t4pF3DH mOeLhOghOgDrRhZPiznXZAlpZHezzcoNYMWquyXJtICxgZSjnlKlw3EW/1yp9458 BY51/wzFZvRp3nWmSIhpPV7a9AkGwYZ+579ymUFMxwFbtRV4024= =E6Mw -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
svn commit: r52020 - in /dev/flink/flink-1.14.3-rc1: ./ python/
Author: thw Date: Tue Jan 11 00:56:27 2022 New Revision: 52020 Log: Add flink-1.14.3-rc1 Added: dev/flink/flink-1.14.3-rc1/ dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.11.tgz (with props) dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.11.tgz.asc dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.11.tgz.sha512 dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.12.tgz (with props) dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.12.tgz.asc dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.12.tgz.sha512 dev/flink/flink-1.14.3-rc1/flink-1.14.3-src.tgz (with props) dev/flink/flink-1.14.3-rc1/flink-1.14.3-src.tgz.asc dev/flink/flink-1.14.3-rc1/flink-1.14.3-src.tgz.sha512 dev/flink/flink-1.14.3-rc1/python/ dev/flink/flink-1.14.3-rc1/python/apache-flink-1.14.3.tar.gz (with props) dev/flink/flink-1.14.3-rc1/python/apache-flink-1.14.3.tar.gz.asc dev/flink/flink-1.14.3-rc1/python/apache-flink-1.14.3.tar.gz.sha512 dev/flink/flink-1.14.3-rc1/python/apache-flink-libraries-1.14.3.tar.gz (with props) dev/flink/flink-1.14.3-rc1/python/apache-flink-libraries-1.14.3.tar.gz.asc dev/flink/flink-1.14.3-rc1/python/apache-flink-libraries-1.14.3.tar.gz.sha512 dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp36-cp36m-macosx_10_9_x86_64.whl (with props) dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp36-cp36m-macosx_10_9_x86_64.whl.asc dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp36-cp36m-macosx_10_9_x86_64.whl.sha512 dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp36-cp36m-manylinux1_x86_64.whl (with props) dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp36-cp36m-manylinux1_x86_64.whl.asc dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp36-cp36m-manylinux1_x86_64.whl.sha512 dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp37-cp37m-macosx_10_9_x86_64.whl (with props) dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp37-cp37m-macosx_10_9_x86_64.whl.asc dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp37-cp37m-macosx_10_9_x86_64.whl.sha512 dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp37-cp37m-manylinux1_x86_64.whl (with props) dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp37-cp37m-manylinux1_x86_64.whl.asc dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp37-cp37m-manylinux1_x86_64.whl.sha512 dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp38-cp38-macosx_10_9_x86_64.whl (with props) dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp38-cp38-macosx_10_9_x86_64.whl.asc dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp38-cp38-macosx_10_9_x86_64.whl.sha512 dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp38-cp38-manylinux1_x86_64.whl (with props) dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp38-cp38-manylinux1_x86_64.whl.asc dev/flink/flink-1.14.3-rc1/python/apache_flink-1.14.3-cp38-cp38-manylinux1_x86_64.whl.sha512 Added: dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.11.tgz == Binary file - no diff available. Propchange: dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.11.tgz -- svn:mime-type = application/octet-stream Added: dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.11.tgz.asc == --- dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.11.tgz.asc (added) +++ dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.11.tgz.asc Tue Jan 11 00:56:27 2022 @@ -0,0 +1,16 @@ +-BEGIN PGP SIGNATURE- + +iQIzBAABCAAdFiCaZsfC8pfIWBwqEt7j5NkgqYwFAmHcyIAACgkQEt7j5Nkg +qYzbbQ/8CTxLjr9D3AXGGKeqCeBnIIxR7s17483QDqIOa7QLIew7MYpvqrBYK66B +yHisA7XxL7K7a6zOGV5sO06pLpT9EjYjmxw41UkWAIwMP7GpyFMiu4LhN1LDcCTV +xK7mSxgZ86NjUvoOIexM/pSkoVqSAiMHOa/Q5OzSWPPRo/byXyjUQ5eFb6R937LR +t/Yj3V7wHPLQ30vpu2a+reHRNlmAiH2NKH4oRWVtBx0vIFkmqx2qWqdWevdrUlia +aLrNfPqbw7hRhQEDofbjF9EoViTfvIjC/XuI8Xo1gA+A0YklaMy0HIsh31iNj6Mg +VBxP0/Sh5wlwLsV6Zvj0eZ4bjLvmmv0NWRxZB0Oti4BpZQPz4iZ0T6j7YdmMBBNz +KLE65C3E3jQBz0HnUKPIrqcTWVt3ad0YIk8+6v/t1eqxZculIJbHlH9nLlMc+hdW +UgDGGlGY3pi644k08oi8hsEI5thckQOIRUjGx+3IvajU9lZL7+lqtFvPU8W7Ictg +wcLm1cssKJRPvuIBN15pweFygsRvENpi2iQeo9YSg/ZySfymu66C2wm4oeUtxjoS +69RvjDao6VgvS9Ld7w/9fACOTHFZTViq8IsEDkYLub+LRaanFjuc7XOQpxDNeJwB +l6T9MPb029io5tBV0IDDYElG2+J7muqSHAAngYuV9uFj72MdeQc= +=QqHN +-END PGP SIGNATURE- Added: dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.11.tgz.sha512 == --- dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.11.tgz.sha512 (added) +++ dev/flink/flink-1.14.3-rc1/flink-1.14.3-bin-scala_2.11.tgz.sha512 Tue Jan 11 00:56:27 2022 @@ -0,0 +1
[flink] 01/01: Commit for release 1.14.3
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.14.3-rc1 in repository https://gitbox.apache.org/repos/asf/flink.git commit 98997ea37ba08eae0f9aa6dd34823238097d8e0d Author: Thomas Weise AuthorDate: Sat Jan 8 14:23:54 2022 -0800 Commit for release 1.14.3 --- docs/config.toml | 2 +- flink-annotations/pom.xml | 2 +- flink-clients/pom.xml | 2 +- flink-connectors/flink-connector-base/pom.xml | 2 +- flink-connectors/flink-connector-cassandra/pom.xml| 2 +- flink-connectors/flink-connector-elasticsearch-base/pom.xml | 2 +- flink-connectors/flink-connector-elasticsearch5/pom.xml | 2 +- flink-connectors/flink-connector-elasticsearch6/pom.xml | 2 +- flink-connectors/flink-connector-elasticsearch7/pom.xml | 2 +- flink-connectors/flink-connector-files/pom.xml| 2 +- flink-connectors/flink-connector-gcp-pubsub/pom.xml | 2 +- flink-connectors/flink-connector-hbase-1.4/pom.xml| 2 +- flink-connectors/flink-connector-hbase-2.2/pom.xml| 2 +- flink-connectors/flink-connector-hbase-base/pom.xml | 2 +- flink-connectors/flink-connector-hive/pom.xml | 2 +- flink-connectors/flink-connector-jdbc/pom.xml | 2 +- flink-connectors/flink-connector-kafka/pom.xml| 2 +- flink-connectors/flink-connector-kinesis/pom.xml | 2 +- flink-connectors/flink-connector-nifi/pom.xml | 2 +- flink-connectors/flink-connector-pulsar/pom.xml | 2 +- flink-connectors/flink-connector-rabbitmq/pom.xml | 2 +- flink-connectors/flink-connector-twitter/pom.xml | 2 +- flink-connectors/flink-file-sink-common/pom.xml | 2 +- flink-connectors/flink-hadoop-compatibility/pom.xml | 2 +- flink-connectors/flink-hcatalog/pom.xml | 2 +- flink-connectors/flink-sql-connector-elasticsearch6/pom.xml | 2 +- flink-connectors/flink-sql-connector-elasticsearch7/pom.xml | 2 +- flink-connectors/flink-sql-connector-hbase-1.4/pom.xml| 2 +- flink-connectors/flink-sql-connector-hbase-2.2/pom.xml| 2 +- flink-connectors/flink-sql-connector-hive-1.2.2/pom.xml | 2 +- flink-connectors/flink-sql-connector-hive-2.2.0/pom.xml | 2 +- flink-connectors/flink-sql-connector-hive-2.3.6/pom.xml | 2 +- flink-connectors/flink-sql-connector-hive-3.1.2/pom.xml | 2 +- flink-connectors/flink-sql-connector-kafka/pom.xml| 2 +- flink-connectors/flink-sql-connector-kinesis/pom.xml | 2 +- flink-connectors/pom.xml | 2 +- flink-container/pom.xml | 2 +- flink-contrib/flink-connector-wikiedits/pom.xml | 2 +- flink-contrib/pom.xml | 2 +- flink-core/pom.xml| 2 +- flink-dist/pom.xml| 2 +- flink-docs/pom.xml| 2 +- flink-dstl/flink-dstl-dfs/pom.xml | 2 +- flink-dstl/pom.xml| 2 +- flink-end-to-end-tests/flink-batch-sql-test/pom.xml | 2 +- flink-end-to-end-tests/flink-cli-test/pom.xml | 2 +- flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml| 2 +- .../flink-connector-gcp-pubsub-emulator-tests/pom.xml | 2 +- flink-end-to-end-tests/flink-dataset-allround-test/pom.xml| 2 +- .../flink-dataset-fine-grained-recovery-test/pom.xml | 2 +- flink-end-to-end-tests/flink-datastream-allround-test/pom.xml | 2 +- flink-end-to-end-tests/flink-distributed-cache-via-blob-test/pom.xml | 2 +- flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml | 2 +- flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml | 2 +- flink-end-to-end-tests/flink-elasticsearch7-test/pom.xml | 2 +- flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml| 2 +- flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml | 2 +- flink-end-to-end-tests/flink-end-to-end-tests-hbase/pom.xml | 2 +- flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml | 2 +- flink-end-to-end-tests/flink-file-sink-test/pom.xml
[flink] branch release-1.14.3-rc1 created (now 98997ea)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch release-1.14.3-rc1 in repository https://gitbox.apache.org/repos/asf/flink.git. at 98997ea Commit for release 1.14.3 This branch includes the following new commits: new 98997ea Commit for release 1.14.3 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[flink] branch release-1.14 updated: [FLINK-24064][connector/common] HybridSource restore from savepoint
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.14 by this push: new cb71c37 [FLINK-24064][connector/common] HybridSource restore from savepoint cb71c37 is described below commit cb71c3721efa129972c8d9e6ace347b691ee80e1 Author: Thomas Weise AuthorDate: Tue Aug 31 05:48:40 2021 -0700 [FLINK-24064][connector/common] HybridSource restore from savepoint --- .../connector/base/source/hybrid/HybridSource.java | 19 ++ .../source/hybrid/HybridSourceEnumeratorState.java | 17 -- .../HybridSourceEnumeratorStateSerializer.java | 37 ++-- .../base/source/hybrid/HybridSourceReader.java | 15 ++--- .../base/source/hybrid/HybridSourceSplit.java | 69 -- .../source/hybrid/HybridSourceSplitEnumerator.java | 48 +-- .../source/hybrid/HybridSourceSplitSerializer.java | 39 +++- .../base/source/hybrid/SwitchedSources.java| 48 +++ .../base/source/hybrid/HybridSourceReaderTest.java | 36 --- .../hybrid/HybridSourceSplitEnumeratorTest.java| 36 ++- .../hybrid/HybridSourceSplitSerializerTest.java| 6 +- .../base/source/reader/mocks/MockBaseSource.java | 3 +- 12 files changed, 202 insertions(+), 171 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java index e3d66de..24acb6a 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java @@ -32,9 +32,7 @@ import org.apache.flink.util.Preconditions; import java.io.Serializable; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; /** * Hybrid source that switches underlying sources based on configured source chain. @@ -91,14 +89,11 @@ import java.util.Map; public class HybridSource implements Source { private final List sources; -// sources are populated per subtask at switch time -private final Map switchedSources; /** Protected for subclass, use {@link #builder(Source)} to construct source. */ protected HybridSource(List sources) { Preconditions.checkArgument(!sources.isEmpty()); this.sources = sources; -this.switchedSources = new HashMap<>(sources.size()); } /** Builder for {@link HybridSource}. */ @@ -116,13 +111,13 @@ public class HybridSource implements Source createReader(SourceReaderContext readerContext) throws Exception { -return new HybridSourceReader(readerContext, switchedSources); +return new HybridSourceReader(readerContext); } @Override public SplitEnumerator createEnumerator( SplitEnumeratorContext enumContext) { -return new HybridSourceSplitEnumerator(enumContext, sources, 0, switchedSources, null); +return new HybridSourceSplitEnumerator(enumContext, sources, 0, null); } @Override @@ -131,22 +126,18 @@ public class HybridSource implements Source getSplitSerializer() { -return new HybridSourceSplitSerializer(switchedSources); +return new HybridSourceSplitSerializer(); } @Override public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { -return new HybridSourceEnumeratorStateSerializer(switchedSources); +return new HybridSourceEnumeratorStateSerializer(); } /** diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java index 2da99ee..95aadde 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java @@ -21,18 +21,25 @@ package org.apache.flink.connector.base.source.hybrid; /** The state of hybrid source enumerator. */ public class HybridSourceEnumeratorState { private final int currentSourceIndex; -private final Object wrappedState; +private byte[] wrappedStateBytes; +private final int wrappedStateSerializerVersion; -HybridSourceEnumeratorState(int currentSourceIndex, Object wrappedState) { +HybridSourceEnumeratorState( +int currentSourceIndex, byte[] wrappedStateByte
[flink] branch release-1.13 updated: [FLINK-24064][connector/common] HybridSource restore from savepoint
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.13 by this push: new c7ccd24 [FLINK-24064][connector/common] HybridSource restore from savepoint c7ccd24 is described below commit c7ccd24e8ee566689898f8ced76090bcce0f87fa Author: Thomas Weise AuthorDate: Tue Aug 31 05:48:40 2021 -0700 [FLINK-24064][connector/common] HybridSource restore from savepoint --- .../connector/base/source/hybrid/HybridSource.java | 19 ++ .../source/hybrid/HybridSourceEnumeratorState.java | 17 -- .../HybridSourceEnumeratorStateSerializer.java | 37 ++-- .../base/source/hybrid/HybridSourceReader.java | 15 ++--- .../base/source/hybrid/HybridSourceSplit.java | 69 -- .../source/hybrid/HybridSourceSplitEnumerator.java | 48 +-- .../source/hybrid/HybridSourceSplitSerializer.java | 39 +++- .../base/source/hybrid/SwitchedSources.java| 48 +++ .../base/source/hybrid/HybridSourceReaderTest.java | 36 --- .../hybrid/HybridSourceSplitEnumeratorTest.java| 36 ++- .../hybrid/HybridSourceSplitSerializerTest.java| 6 +- .../base/source/reader/mocks/MockBaseSource.java | 3 +- 12 files changed, 202 insertions(+), 171 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java index e3d66de..24acb6a 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java @@ -32,9 +32,7 @@ import org.apache.flink.util.Preconditions; import java.io.Serializable; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; /** * Hybrid source that switches underlying sources based on configured source chain. @@ -91,14 +89,11 @@ import java.util.Map; public class HybridSource implements Source { private final List sources; -// sources are populated per subtask at switch time -private final Map switchedSources; /** Protected for subclass, use {@link #builder(Source)} to construct source. */ protected HybridSource(List sources) { Preconditions.checkArgument(!sources.isEmpty()); this.sources = sources; -this.switchedSources = new HashMap<>(sources.size()); } /** Builder for {@link HybridSource}. */ @@ -116,13 +111,13 @@ public class HybridSource implements Source createReader(SourceReaderContext readerContext) throws Exception { -return new HybridSourceReader(readerContext, switchedSources); +return new HybridSourceReader(readerContext); } @Override public SplitEnumerator createEnumerator( SplitEnumeratorContext enumContext) { -return new HybridSourceSplitEnumerator(enumContext, sources, 0, switchedSources, null); +return new HybridSourceSplitEnumerator(enumContext, sources, 0, null); } @Override @@ -131,22 +126,18 @@ public class HybridSource implements Source getSplitSerializer() { -return new HybridSourceSplitSerializer(switchedSources); +return new HybridSourceSplitSerializer(); } @Override public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { -return new HybridSourceEnumeratorStateSerializer(switchedSources); +return new HybridSourceEnumeratorStateSerializer(); } /** diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java index 2da99ee..95aadde 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java @@ -21,18 +21,25 @@ package org.apache.flink.connector.base.source.hybrid; /** The state of hybrid source enumerator. */ public class HybridSourceEnumeratorState { private final int currentSourceIndex; -private final Object wrappedState; +private byte[] wrappedStateBytes; +private final int wrappedStateSerializerVersion; -HybridSourceEnumeratorState(int currentSourceIndex, Object wrappedState) { +HybridSourceEnumeratorState( +int currentSourceIndex, byte[] wrappedStateByte
[flink] branch master updated: [FLINK-24064][connector/common] HybridSource restore from savepoint
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 2984d87 [FLINK-24064][connector/common] HybridSource restore from savepoint 2984d87 is described below commit 2984d87ed7761a6ca345b8b79bcb6dd49db0442b Author: Thomas Weise AuthorDate: Tue Aug 31 05:48:40 2021 -0700 [FLINK-24064][connector/common] HybridSource restore from savepoint --- .../connector/base/source/hybrid/HybridSource.java | 19 ++ .../source/hybrid/HybridSourceEnumeratorState.java | 17 -- .../HybridSourceEnumeratorStateSerializer.java | 37 ++-- .../base/source/hybrid/HybridSourceReader.java | 15 ++--- .../base/source/hybrid/HybridSourceSplit.java | 69 -- .../source/hybrid/HybridSourceSplitEnumerator.java | 48 +-- .../source/hybrid/HybridSourceSplitSerializer.java | 39 +++- .../base/source/hybrid/SwitchedSources.java| 48 +++ .../base/source/hybrid/HybridSourceReaderTest.java | 36 --- .../hybrid/HybridSourceSplitEnumeratorTest.java| 36 ++- .../hybrid/HybridSourceSplitSerializerTest.java| 6 +- .../base/source/reader/mocks/MockBaseSource.java | 3 +- 12 files changed, 202 insertions(+), 171 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java index e3d66de..24acb6a 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java @@ -32,9 +32,7 @@ import org.apache.flink.util.Preconditions; import java.io.Serializable; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; /** * Hybrid source that switches underlying sources based on configured source chain. @@ -91,14 +89,11 @@ import java.util.Map; public class HybridSource implements Source { private final List sources; -// sources are populated per subtask at switch time -private final Map switchedSources; /** Protected for subclass, use {@link #builder(Source)} to construct source. */ protected HybridSource(List sources) { Preconditions.checkArgument(!sources.isEmpty()); this.sources = sources; -this.switchedSources = new HashMap<>(sources.size()); } /** Builder for {@link HybridSource}. */ @@ -116,13 +111,13 @@ public class HybridSource implements Source createReader(SourceReaderContext readerContext) throws Exception { -return new HybridSourceReader(readerContext, switchedSources); +return new HybridSourceReader(readerContext); } @Override public SplitEnumerator createEnumerator( SplitEnumeratorContext enumContext) { -return new HybridSourceSplitEnumerator(enumContext, sources, 0, switchedSources, null); +return new HybridSourceSplitEnumerator(enumContext, sources, 0, null); } @Override @@ -131,22 +126,18 @@ public class HybridSource implements Source getSplitSerializer() { -return new HybridSourceSplitSerializer(switchedSources); +return new HybridSourceSplitSerializer(); } @Override public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { -return new HybridSourceEnumeratorStateSerializer(switchedSources); +return new HybridSourceEnumeratorStateSerializer(); } /** diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java index 2da99ee..95aadde 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java @@ -21,18 +21,25 @@ package org.apache.flink.connector.base.source.hybrid; /** The state of hybrid source enumerator. */ public class HybridSourceEnumeratorState { private final int currentSourceIndex; -private final Object wrappedState; +private byte[] wrappedStateBytes; +private final int wrappedStateSerializerVersion; -HybridSourceEnumeratorState(int currentSourceIndex, Object wrappedState) { +HybridSourceEnumeratorState( +int currentSourceIndex, byte[] wrappedStateBytes, int serializerV
[flink] branch release-1.13 updated (dcfbf74 -> a8871ad)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git. from dcfbf74 [FLINK-24033][table-planner] Propagate unique keys for fromChangelogStream add a8871ad [FLINK-24010][connector/common] HybridSourceReader/Enumerator delegate checkpoint notifications No new revisions were added by this update. Summary of changes: .../base/source/hybrid/HybridSourceReader.java | 14 + .../source/hybrid/HybridSourceSplitEnumerator.java | 10 +++ .../base/source/hybrid/HybridSourceReaderTest.java | 34 ++ .../hybrid/HybridSourceSplitEnumeratorTest.java| 21 + 4 files changed, 79 insertions(+)
[flink] branch release-1.14 updated: [FLINK-24010][connector/common] HybridSourceReader/Enumerator delegate checkpoint notifications
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.14 by this push: new 8c293c7 [FLINK-24010][connector/common] HybridSourceReader/Enumerator delegate checkpoint notifications 8c293c7 is described below commit 8c293c7e03bf9c5221ae50843a3af445276afa24 Author: Thomas Weise AuthorDate: Thu Aug 26 14:22:01 2021 -0700 [FLINK-24010][connector/common] HybridSourceReader/Enumerator delegate checkpoint notifications --- .../base/source/hybrid/HybridSourceReader.java | 14 + .../source/hybrid/HybridSourceSplitEnumerator.java | 10 +++ .../base/source/hybrid/HybridSourceReaderTest.java | 34 ++ .../hybrid/HybridSourceSplitEnumeratorTest.java| 21 + 4 files changed, 79 insertions(+) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java index cdd2e8b..28d4011 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java @@ -121,6 +121,20 @@ public class HybridSourceReader implements SourceReader } @Override +public void notifyCheckpointComplete(long checkpointId) throws Exception { +if (currentReader != null) { +currentReader.notifyCheckpointComplete(checkpointId); +} +} + +@Override +public void notifyCheckpointAborted(long checkpointId) throws Exception { +if (currentReader != null) { +currentReader.notifyCheckpointAborted(checkpointId); +} +} + +@Override public CompletableFuture isAvailable() { return availabilityFuture; } diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java index 5a2c010..0f2b036 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java @@ -176,6 +176,16 @@ public class HybridSourceSplitEnumerator } @Override +public void notifyCheckpointComplete(long checkpointId) throws Exception { +currentEnumerator.notifyCheckpointComplete(checkpointId); +} + +@Override +public void notifyCheckpointAborted(long checkpointId) throws Exception { +currentEnumerator.notifyCheckpointAborted(checkpointId); +} + +@Override public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { LOG.debug( "handleSourceEvent {} subtask={} pendingSplits={}", diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java index 3bf77f3..7882333 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java @@ -33,6 +33,7 @@ import org.apache.flink.mock.Whitebox; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import java.util.Collections; import java.util.HashMap; @@ -165,6 +166,39 @@ public class HybridSourceReaderTest { reader.close(); } +@Test +public void testDefaultMethodDelegation() throws Exception { +TestingReaderContext readerContext = new TestingReaderContext(); +TestingReaderOutput readerOutput = new TestingReaderOutput<>(); +MockBaseSource source = +new MockBaseSource(1, 1, Boundedness.BOUNDED) { +@Override +public SourceReader createReader( +SourceReaderContext readerContext) { +return Mockito.spy(super.createReader(readerContext)); +} +}; + +Map switchedSources = new HashMap<>(); + +HybridSourceReader reader = +new HybridSourceReader<>(readerContext, switchedSourc
[flink] branch master updated: [FLINK-24010][connector/common] HybridSourceReader/Enumerator delegate checkpoint notifications
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new da944a8 [FLINK-24010][connector/common] HybridSourceReader/Enumerator delegate checkpoint notifications da944a8 is described below commit da944a8c90477d7b0210024028abd1011e250f14 Author: Thomas Weise AuthorDate: Thu Aug 26 14:22:01 2021 -0700 [FLINK-24010][connector/common] HybridSourceReader/Enumerator delegate checkpoint notifications --- .../base/source/hybrid/HybridSourceReader.java | 14 + .../source/hybrid/HybridSourceSplitEnumerator.java | 10 +++ .../base/source/hybrid/HybridSourceReaderTest.java | 34 ++ .../hybrid/HybridSourceSplitEnumeratorTest.java| 21 + 4 files changed, 79 insertions(+) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java index cdd2e8b..28d4011 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java @@ -121,6 +121,20 @@ public class HybridSourceReader implements SourceReader } @Override +public void notifyCheckpointComplete(long checkpointId) throws Exception { +if (currentReader != null) { +currentReader.notifyCheckpointComplete(checkpointId); +} +} + +@Override +public void notifyCheckpointAborted(long checkpointId) throws Exception { +if (currentReader != null) { +currentReader.notifyCheckpointAborted(checkpointId); +} +} + +@Override public CompletableFuture isAvailable() { return availabilityFuture; } diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java index 5a2c010..0f2b036 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java @@ -176,6 +176,16 @@ public class HybridSourceSplitEnumerator } @Override +public void notifyCheckpointComplete(long checkpointId) throws Exception { +currentEnumerator.notifyCheckpointComplete(checkpointId); +} + +@Override +public void notifyCheckpointAborted(long checkpointId) throws Exception { +currentEnumerator.notifyCheckpointAborted(checkpointId); +} + +@Override public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { LOG.debug( "handleSourceEvent {} subtask={} pendingSplits={}", diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java index 3bf77f3..7882333 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java @@ -33,6 +33,7 @@ import org.apache.flink.mock.Whitebox; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import java.util.Collections; import java.util.HashMap; @@ -165,6 +166,39 @@ public class HybridSourceReaderTest { reader.close(); } +@Test +public void testDefaultMethodDelegation() throws Exception { +TestingReaderContext readerContext = new TestingReaderContext(); +TestingReaderOutput readerOutput = new TestingReaderOutput<>(); +MockBaseSource source = +new MockBaseSource(1, 1, Boundedness.BOUNDED) { +@Override +public SourceReader createReader( +SourceReaderContext readerContext) { +return Mockito.spy(super.createReader(readerContext)); +} +}; + +Map switchedSources = new HashMap<>(); + +HybridSourceReader reader = +new HybridSourceReader<>(readerContext, switchedSourc
[flink] 02/03: [FLINK-22791][docs] Documentation for HybridSource
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git commit f8a6d31c4e4e180421517f7e700c731e58be37d8 Author: Thomas Weise AuthorDate: Sun Aug 22 13:56:13 2021 -0700 [FLINK-22791][docs] Documentation for HybridSource --- .../docs/connectors/datastream/hybridsource.md | 100 + 1 file changed, 100 insertions(+) diff --git a/docs/content/docs/connectors/datastream/hybridsource.md b/docs/content/docs/connectors/datastream/hybridsource.md new file mode 100644 index 000..02f5077 --- /dev/null +++ b/docs/content/docs/connectors/datastream/hybridsource.md @@ -0,0 +1,100 @@ +--- +title: Hybrid Source +weight: 8 +type: docs +aliases: +--- + + +# Hybrid Source + +`HybridSource` is a source that contains a list of concrete [sources]({{< ref "docs/dev/datastream/sources" >}}). +It solves the problem of sequentially reading input from heterogeneous sources to produce a single input stream. + +For example, a bootstrap use case may need to read several days worth of bounded input from S3 before continuing with the latest unbounded input from Kafka. +`HybridSource` switches from `FileSource` to `KafkaSource` when the bounded file input finishes without interrupting the application. + +Prior to `HybridSource`, it was necessary to create a topology with multiple sources and define a switching mechanism in user land, which leads to operational complexity and inefficiency. + +With `HybridSource` the multiple sources appear as a single source in the Flink job graph and from `DataStream` API perspective. + +For more background see [FLIP-150](https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source) + +To use the connector, add the ```flink-connector-base``` dependency to your project: + +{{< artifact flink-connector-base >}} + +(Typically comes as transitive dependency with concrete sources.) + +## Start position for next source + +To arrange multiple sources in a `HybridSource`, all sources except the last one need to be bounded. Therefore, the sources typically need to be assigned a start and end position. The last source may be bounded in which case the `HybridSource` is bounded and unbounded otherwise. +Details depend on the specific source and the external storage systems. + +Here we cover the most basic and then a more complex scenario, following the File/Kafka example. + + Fixed start position at graph construction time + +Example: Read till pre-determined switch time from files and then continue reading from Kafka. +Each source covers an upfront known range and therefore the contained sources can be created upfront as if they were used directly: + +```java +long switchTimestamp = ...; // derive from file input paths +FileSource fileSource = + FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build(); +KafkaSource kafkaSource = + KafkaSource.builder() + .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1)) + .build(); +HybridSource hybridSource = + HybridSource.builder(fileSource) + .addSource(kafkaSource) + .build(); +``` + + Dynamic start position at switch time + +Example: File source reads a very large backlog, taking potentially longer than retention available for next source. +Switch needs to occur at "current time - X". This requires the start time for the next source to be set at switch time. +Here we require transfer of end position from the previous file enumerator for deferred construction of `KafkaSource` +by implementing `SourceFactory`. + +Note that enumerators need to support getting the end timestamp. This may currently require a source customization. +Adding support for dynamic end position to `FileSource` is tracked in [FLINK-23633](https://issues.apache.org/jira/browse/FLINK-23633). + +```java +FileSource fileSource = CustomFileSource.readTillOneDayFromLatest(); +HybridSource hybridSource = +HybridSource.builder(fileSource) +.addSource( +switchContext -> { + CustomFileSplitEnumerator previousEnumerator = + switchContext.getPreviousEnumerator(); + // how to get timestamp depends on specific enumerator + long switchTimestamp = previousEnumerator.getEndTimestamp(); + KafkaSource kafkaSource = + KafkaSource.builder() + .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1)) + .build(); + return kafkaSource; +}, +Boundedness.CONTINUOUS_UNBOUNDED) +.build(); +```
[flink] 01/03: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git commit c72aa4b97848625f23bac331b8e916c6a3e2acd3 Author: Thomas Weise AuthorDate: Fri Feb 26 19:32:32 2021 -0800 [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline --- .../connector/base/source/hybrid/HybridSource.java | 257 ++ .../source/hybrid/HybridSourceEnumeratorState.java | 38 ++ .../HybridSourceEnumeratorStateSerializer.java | 106 ++ .../base/source/hybrid/HybridSourceReader.java | 254 ++ .../base/source/hybrid/HybridSourceSplit.java | 94 + .../source/hybrid/HybridSourceSplitEnumerator.java | 390 + .../source/hybrid/HybridSourceSplitSerializer.java | 98 ++ .../source/hybrid/SourceReaderFinishedEvent.java | 49 +++ .../base/source/hybrid/SwitchSourceEvent.java | 62 .../base/source/hybrid/HybridSourceITCase.java | 246 + .../base/source/hybrid/HybridSourceReaderTest.java | 181 ++ .../hybrid/HybridSourceSplitEnumeratorTest.java| 252 + .../hybrid/HybridSourceSplitSerializerTest.java| 52 +++ .../base/source/hybrid/HybridSourceTest.java | 115 ++ 14 files changed, 2194 insertions(+) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java new file mode 100644 index 000..e3d66de --- /dev/null +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Hybrid source that switches underlying sources based on configured source chain. + * + * A simple example with FileSource and KafkaSource with fixed Kafka start position: + * + * {@code + * FileSource fileSource = + * FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build(); + * KafkaSource kafkaSource = + * KafkaSource.builder() + * .setBootstrapServers("localhost:9092") + * .setGroupId("MyGroup") + * .setTopics(Arrays.asList("quickstart-events")) + * .setDeserializer( + * KafkaRecordDeserializer.valueOnly(StringDeserializer.class)) + * .setStartingOffsets(OffsetsInitializer.earliest()) + * .build(); + * HybridSource hybridSource = + * HybridSource.builder(fileSource) + * .addSource(kafkaSource) + * .build(); + * } + * + * A more complex example with Kafka start position derived from previous source: + * + * {@code + * HybridSource hybridSource = + * HybridSource.builder(fileSource) + * .addSource( + * switchContext -> { + * StaticFileSplitEnumerator previousEnumerator = + * switchContext.getPreviousEnumerator(); + * // how to get timestamp depends on specific enumerator + * long timestamp = previousEnumerator.getEndTime
[flink] 03/03: [FLINK-22791][docs] HybridSource release availability for 1.13.x
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git commit f74fc331ac8305df0539fb1cbfa1d4722d8c1c56 Author: Thomas Weise AuthorDate: Tue Aug 24 17:06:18 2021 -0700 [FLINK-22791][docs] HybridSource release availability for 1.13.x --- docs/content/docs/connectors/datastream/hybridsource.md | 4 1 file changed, 4 insertions(+) diff --git a/docs/content/docs/connectors/datastream/hybridsource.md b/docs/content/docs/connectors/datastream/hybridsource.md index 02f5077..ff0c559 100644 --- a/docs/content/docs/connectors/datastream/hybridsource.md +++ b/docs/content/docs/connectors/datastream/hybridsource.md @@ -25,6 +25,10 @@ under the License. # Hybrid Source +{{< hint info >}} +This feature is available starting from release 1.13.3 +{{< /hint >}} + `HybridSource` is a source that contains a list of concrete [sources]({{< ref "docs/dev/datastream/sources" >}}). It solves the problem of sequentially reading input from heterogeneous sources to produce a single input stream.
[flink] branch release-1.13 updated (4b682e8 -> f74fc33)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git. from 4b682e8 [FLINK-23920][table-common] Keep primary key when inferring schema with SchemaTranslator new c72aa4b [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline new f8a6d31 [FLINK-22791][docs] Documentation for HybridSource new f74fc33 [FLINK-22791][docs] HybridSource release availability for 1.13.x The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../docs/connectors/datastream/hybridsource.md | 104 ++ .../connector/base/source/hybrid/HybridSource.java | 257 ++ .../source/hybrid/HybridSourceEnumeratorState.java | 38 ++ .../HybridSourceEnumeratorStateSerializer.java | 106 ++ .../base/source/hybrid/HybridSourceReader.java | 254 ++ .../base/source/hybrid/HybridSourceSplit.java | 94 + .../source/hybrid/HybridSourceSplitEnumerator.java | 390 + .../source/hybrid/HybridSourceSplitSerializer.java | 98 ++ .../source/hybrid/SourceReaderFinishedEvent.java | 49 +++ .../base/source/hybrid/SwitchSourceEvent.java | 62 .../base/source/hybrid/HybridSourceITCase.java | 246 + .../base/source/hybrid/HybridSourceReaderTest.java | 181 ++ .../hybrid/HybridSourceSplitEnumeratorTest.java| 252 + .../hybrid/HybridSourceSplitSerializerTest.java| 52 +++ .../base/source/hybrid/HybridSourceTest.java | 115 ++ 15 files changed, 2298 insertions(+) create mode 100644 docs/content/docs/connectors/datastream/hybridsource.md create mode 100644 flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java create mode 100644 flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java create mode 100644 flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorStateSerializer.java create mode 100644 flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java create mode 100644 flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplit.java create mode 100644 flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java create mode 100644 flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializer.java create mode 100644 flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SourceReaderFinishedEvent.java create mode 100644 flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchSourceEvent.java create mode 100644 flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceITCase.java create mode 100644 flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java create mode 100644 flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java create mode 100644 flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializerTest.java create mode 100644 flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceTest.java
[flink] branch master updated (543f6b7 -> fb7fbd8)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 543f6b7 [hotfix] Add missing import AkkaOptions to MiniClusterConfiguration add fb7fbd8 [FLINK-22791][docs] Documentation for HybridSource No new revisions were added by this update. Summary of changes: .../docs/connectors/datastream/hybridsource.md | 100 + 1 file changed, 100 insertions(+) create mode 100644 docs/content/docs/connectors/datastream/hybridsource.md
[flink] branch master updated (233e5d4 -> 23b048b)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 233e5d4 [hotfix][streaming] Removed useless calculateThroughput in the test add 23b048b [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline No new revisions were added by this update. Summary of changes: .../connector/base/source/hybrid/HybridSource.java | 257 ++ .../source/hybrid/HybridSourceEnumeratorState.java | 38 ++ .../HybridSourceEnumeratorStateSerializer.java | 106 ++ .../base/source/hybrid/HybridSourceReader.java | 254 ++ .../base/source/hybrid/HybridSourceSplit.java | 94 + .../source/hybrid/HybridSourceSplitEnumerator.java | 390 + .../source/hybrid/HybridSourceSplitSerializer.java | 98 ++ .../source/hybrid/SourceReaderFinishedEvent.java | 49 +++ .../base/source/hybrid/SwitchSourceEvent.java | 62 .../base/source/hybrid/HybridSourceITCase.java | 246 + .../base/source/hybrid/HybridSourceReaderTest.java | 181 ++ .../hybrid/HybridSourceSplitEnumeratorTest.java| 252 + .../hybrid/HybridSourceSplitSerializerTest.java| 52 +++ .../base/source/hybrid/HybridSourceTest.java | 115 ++ 14 files changed, 2194 insertions(+) create mode 100644 flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java create mode 100644 flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java create mode 100644 flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorStateSerializer.java create mode 100644 flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java create mode 100644 flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplit.java create mode 100644 flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java create mode 100644 flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializer.java create mode 100644 flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SourceReaderFinishedEvent.java create mode 100644 flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchSourceEvent.java create mode 100644 flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceITCase.java create mode 100644 flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java create mode 100644 flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java create mode 100644 flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializerTest.java create mode 100644 flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceTest.java
[flink] branch master updated: [FLINK-22778] Upgrade to JUnit 4.13
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 15b5a06 [FLINK-22778] Upgrade to JUnit 4.13 15b5a06 is described below commit 15b5a06efa3eccd52d359ebf9c28e69392b6805d Author: Thomas Weise AuthorDate: Tue May 25 20:31:07 2021 -0700 [FLINK-22778] Upgrade to JUnit 4.13 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 72f7ab6..ea7919f 100644 --- a/pom.xml +++ b/pom.xml @@ -127,7 +127,7 @@ under the License. 1.10.0 1.2.0 2.3.1 - 4.12 + 4.13.2 2.21.0 2.0.4 1.3
[flink] branch release-1.12 updated (1de3d63 -> be33d47)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git. from 1de3d63 [FLINK-17170][kinesis] Fix deadlock during stop-with-savepoint. add fc886c7 [FLINK-20114][connector/kafka] KafkaSourceReader should not commit offsets for partitions whose offsets have not been initialized. add 0786be4 [hotfix][connector/kafka] Reduce the offset commit logging verbosity from INFO to DEBUG. add 5909ecf [FLINK-20114][connector/common] SourceOperatorStreamTask should update the numRecordsOutCount metric add 79ea3bf [FLINK-20114][connector/kafka] KafkaSourceEnumerator should close the admin client early if periodic partition discovery is disabled. add 62fc4a1 [hotfix][connector/kafka] Remove the unused close.timeout.ms config. add c618de9 [FLINK-20114][connector/kafka] PartitionOffsetsRetrieverImpl.committedOffsets() should handle the case without committed offsets. add 1eeeaea [FLINK-20114][connector/kafka] SourceOperatorStreamTask should check the committed offset first before using OffsetResetStrategy. add a0ba502 [FLINK-20114][connector/kafka] Auto offset commit should be disabled by default. add be33d47 [FLINK-20114][connector/kafka] Remove duplicated warning and remove redundant default value for partition.discovery.interval.ms No new revisions were added by this update. Summary of changes: .../connector/kafka/source/KafkaSourceBuilder.java | 9 +++- .../connector/kafka/source/KafkaSourceOptions.java | 8 +--- .../source/enumerator/KafkaSourceEnumerator.java | 17 +-- .../initializer/SpecifiedOffsetsInitializer.java | 6 ++ .../kafka/source/reader/KafkaSourceReader.java | 24 ++ .../initializer/OffsetsInitializerTest.java| 21 ++- .../kafka/source/reader/KafkaSourceReaderTest.java | 14 + .../io/StreamMultipleInputProcessorFactory.java| 6 +- .../runtime/tasks/SourceOperatorStreamTask.java| 18 ++-- 9 files changed, 92 insertions(+), 31 deletions(-)
[flink] branch release-1.12 updated (719bf5e -> dbdacac)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git. from 719bf5e [FLINK-21944][python] Perform null check before closing arrowSerializer add dbdacac [FLINK-20533][datadog][release-1.12] Backport flink-metrics-datadog changes from master (#15328) No new revisions were added by this update. Summary of changes: docs/deployment/metric_reporters.md| 4 + docs/deployment/metric_reporters.zh.md | 4 + .../org/apache/flink/metrics/datadog/DCounter.java | 7 +- .../org/apache/flink/metrics/datadog/DGauge.java | 6 +- .../apache/flink/metrics/datadog/DHistogram.java | 81 +++ .../org/apache/flink/metrics/datadog/DMeter.java | 2 +- .../org/apache/flink/metrics/datadog/DMetric.java | 48 ++-- .../org/apache/flink/metrics/datadog/DSeries.java | 23 +- .../flink/metrics/datadog/DatadogHttpReporter.java | 20 +- .../datadog/{DGauge.java => MetricMetaData.java} | 48 ++-- .../{DataCenter.java => StaticDMetric.java}| 18 +- .../metrics/datadog/DatadogHttpClientTest.java | 269 ++--- 12 files changed, 301 insertions(+), 229 deletions(-) create mode 100644 flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DHistogram.java copy flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/{DGauge.java => MetricMetaData.java} (53%) copy flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/{DataCenter.java => StaticDMetric.java} (73%)
[flink] branch release-1.12 updated: [FLINK-21169][kafka] flink-connector-base dependency should be scope compile
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.12 by this push: new d25ed8d [FLINK-21169][kafka] flink-connector-base dependency should be scope compile d25ed8d is described below commit d25ed8d9ef6cfd147f76e41dd205752a5b707c3f Author: Thomas Weise AuthorDate: Wed Jan 27 18:52:56 2021 -0800 [FLINK-21169][kafka] flink-connector-base dependency should be scope compile --- flink-connectors/flink-connector-kafka/pom.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-connectors/flink-connector-kafka/pom.xml b/flink-connectors/flink-connector-kafka/pom.xml index 52d2ae9..7369d6b 100644 --- a/flink-connectors/flink-connector-kafka/pom.xml +++ b/flink-connectors/flink-connector-kafka/pom.xml @@ -87,7 +87,6 @@ under the License. org.apache.flink flink-connector-base ${project.version} - provided
[flink] branch master updated (dac3e72 -> a4f1174)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from dac3e72 [FLINK-21006] Fix hbase 1.4 tests on Hadoop 3.x add a4f1174 [FLINK-21059][kafka] KafkaSourceEnumerator does not honor consumer properties No new revisions were added by this update. Summary of changes: .../flink/connector/kafka/source/KafkaSource.java | 2 +- .../source/enumerator/KafkaSourceEnumerator.java | 11 +++-- .../source/enumerator/KafkaEnumeratorTest.java | 56 -- 3 files changed, 61 insertions(+), 8 deletions(-)
[flink] branch master updated (09f1674 -> c7823ff)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 09f1674 [FLINK-18655][flink-runtime] Set failOnUnableToExtractRepoInfo to false for git-commit-id-plugin (#12941) add c7823ff [FLINK-11547][flink-connector-kinesis] Fix JsonMappingException in DynamoDBStreamsSchema No new revisions were added by this update. Summary of changes: .../connectors/kinesis/serialization/DynamoDBStreamsSchema.java| 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-)
[flink] branch release-1.10 updated (ff7b4f6 -> 99ea1aa)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git. from ff7b4f6 [FLINK-17313] Fix type validation error when sink table ddl contains columns with precision of decimal/varchar (#11993) add 99ea1aa [FLINK-17496][kinesis] Upgrade amazon-kinesis-producer to 0.14.0 No new revisions were added by this update. Summary of changes: flink-connectors/flink-connector-kinesis/pom.xml| 2 +- .../flink-connector-kinesis/src/main/resources/META-INF/NOTICE | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-)
[flink] branch master updated (2849e39 -> a737cdc)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 2849e39 [FLINK-17515][yarn] Move YARN staging functionality to a separate class add a737cdc [FLINK-17496][kinesis] Upgrade amazon-kinesis-producer to 0.14.0 No new revisions were added by this update. Summary of changes: flink-connectors/flink-connector-kinesis/pom.xml| 2 +- .../flink-connector-kinesis/src/main/resources/META-INF/NOTICE | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-)
[flink] branch master updated (b098ce5 -> de5f8fa)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from b098ce5 [FLINK-15838] Dangling CountDownLatch.await(timeout) add de5f8fa [FLINK-16393][kinesis] Skip record emitter thread creation w/o source sync No new revisions were added by this update. Summary of changes: .../kinesis/internals/KinesisDataFetcher.java | 36 -- 1 file changed, 19 insertions(+), 17 deletions(-)
[flink] branch release-1.10 updated: [FLINK-13027][fs-connector] SFS bulk-encoded writer supports customized checkpoint policy
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new c4acec8 [FLINK-13027][fs-connector] SFS bulk-encoded writer supports customized checkpoint policy c4acec8 is described below commit c4acec8975a03a57713a1cbc8543cbed1d83612d Author: Ying AuthorDate: Fri Dec 20 15:12:09 2019 -0800 [FLINK-13027][fs-connector] SFS bulk-encoded writer supports customized checkpoint policy --- .../sink/filesystem/StreamingFileSink.java | 22 ++ ...ingPolicy.java => CheckpointRollingPolicy.java} | 34 ++ .../rollingpolicies/OnCheckpointRollingPolicy.java | 9 ++ .../api/functions/sink/filesystem/TestUtils.java | 3 ++ 4 files changed, 38 insertions(+), 30 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java index cd4afc2..d8d9d6d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -73,7 +74,7 @@ import java.io.Serializable; * {@code "prefix-1-17.ext"} containing the data from {@code subtask 1} of the sink and is the {@code 17th} bucket * created by that subtask. * Part files roll based on the user-specified {@link RollingPolicy}. By default, a {@link DefaultRollingPolicy} - * is used. + * is used for row-encoded sink output; a {@link OnCheckpointRollingPolicy} is used for bulk-encoded sink output. * * In some scenarios, the open buckets are required to change based on time. In these cases, the user * can specify a {@code bucketCheckInterval} (by default 1m) and the sink will check periodically and roll @@ -268,7 +269,7 @@ public class StreamingFileSink public StreamingFileSink.RowFormatBuilder> withNewBucketAssignerAndPolicy(final BucketAssigner assigner, final RollingPolicy policy) { Preconditions.checkState(bucketFactory.getClass() == DefaultBucketFactoryImpl.class, "newBuilderWithBucketAssignerAndPolicy() cannot be called after specifying a customized bucket factory"); - return new RowFormatBuilder<>(basePath, encoder, Preconditions.checkNotNull(assigner), Preconditions.checkNotNull(policy), bucketCheckInterval, new DefaultBucketFactoryImpl<>(), outputFileConfig); + return new RowFormatBuilder(basePath, encoder, Preconditions.checkNotNull(assigner), Preconditions.checkNotNull(policy), bucketCheckInterval, new DefaultBucketFactoryImpl<>(), outputFileConfig); } /** Creates the actual sink. */ @@ -311,24 +312,29 @@ public class StreamingFileSink private BucketAssigner bucketAssigner; + private CheckpointRollingPolicy rollingPolicy; + private BucketFactory bucketFactory; private OutputFileConfig outputFileConfig; protected BulkFormatBuilder(Path basePath, BulkWriter.Factory writerFactory, BucketAssigner assigner) { - this(basePath, writerFactory, assigner, DEFAULT_BUCKET_CHECK_INTERVAL, new DefaultBucketFactoryImpl<>(), OutputFileConfig.builder().build()); + this(basePath, writerFactory, assigner, OnCheckpointRollingPolicy.build(), DEFAULT_BUCKET_CHECK_INTERVAL, + new DefaultBucketFactoryImpl<>(), OutputFileConfig.builder().build()); } protected BulkFormatBuilder( Path basePath, BulkWriter.Factory writerFactory, BucketAssigner assigner, + CheckpointRollingPolicy policy, long bucketCheckInterval,
[flink] branch master updated (ec8ea8b -> df5eb21)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from ec8ea8b [FLINK-13596][ml] Improve fallback conversion and test for Table transformation. add df5eb21 [FLINK-13027][fs-connector] SFS bulk-encoded writer supports customized checkpoint policy No new revisions were added by this update. Summary of changes: .../sink/filesystem/StreamingFileSink.java | 22 ++ ...ingPolicy.java => CheckpointRollingPolicy.java} | 34 ++ .../rollingpolicies/OnCheckpointRollingPolicy.java | 9 ++ .../api/functions/sink/filesystem/TestUtils.java | 3 ++ 4 files changed, 38 insertions(+), 30 deletions(-) copy flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/{OnCheckpointRollingPolicy.java => CheckpointRollingPolicy.java} (60%)
[flink] branch release-1.10 updated: [FLINK-15301] [kinesis] Exception propagation from record emitter to source thread
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new c4e7baa [FLINK-15301] [kinesis] Exception propagation from record emitter to source thread c4e7baa is described below commit c4e7baaedab0be13c08b9fc0b3e6bef9d8668791 Author: Thomas Weise AuthorDate: Tue Dec 17 14:56:43 2019 -0800 [FLINK-15301] [kinesis] Exception propagation from record emitter to source thread --- .../kinesis/internals/KinesisDataFetcher.java | 14 +++- .../connectors/kinesis/util/RecordEmitter.java | 13 ++- .../kinesis/FlinkKinesisConsumerTest.java | 26 +- 3 files changed, 46 insertions(+), 7 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index afd6138..52d6293 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -497,7 +497,19 @@ public class KinesisDataFetcher { Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_IDLE_INTERVAL_MILLIS))); // run record emitter in separate thread since main thread is used for discovery - Thread thread = new Thread(this.recordEmitter); + Runnable recordEmitterRunnable = new Runnable() { + @Override + public void run() { + try { + recordEmitter.run(); + } catch (Throwable error) { + // report the error that terminated the emitter loop to source thread + stopWithError(error); + } + } + }; + + Thread thread = new Thread(recordEmitterRunnable); thread.setName("recordEmitter-" + runtimeContext.getTaskNameWithSubtasks()); thread.setDaemon(true); thread.start(); diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java index 95c3688..dfa3388 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java @@ -170,7 +170,14 @@ public abstract class RecordEmitter implements Runna @Override public void run() { LOG.info("Starting emitter with maxLookaheadMillis: {}", this.maxLookaheadMillis); + emitRecords(); + } + + public void stop() { + running = false; + } + protected void emitRecords() { // emit available records, ordered by timestamp AsyncRecordQueue min = heads.poll(); runLoop: @@ -248,12 +255,8 @@ public abstract class RecordEmitter implements Runna } } - public void stop() { - running = false; - } - /** Emit the record. This is specific to a connector, like the Kinesis data fetcher. */ - public abstract void emit(T record, RecordQueue source); + protected abstract void emit(T record, RecordQueue source); /** Summary of emit queues that can be used for logging. */ public String printInfo() { diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index 1a910cb..d2d637c 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -75,6 +75,7 @@ import org.powermock.api.mockito.P
[flink] branch master updated (8525c37 -> c2c38b1)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 8525c37 [FLINK-15308][runtime] Remove data compression for pipelined partition add c2c38b1 [FLINK-15301] [kinesis] Exception propagation from record emitter to source thread No new revisions were added by this update. Summary of changes: .../kinesis/internals/KinesisDataFetcher.java | 14 +++- .../connectors/kinesis/util/RecordEmitter.java | 13 ++- .../kinesis/FlinkKinesisConsumerTest.java | 26 +- 3 files changed, 46 insertions(+), 7 deletions(-)
[flink] branch release-1.9 updated: Update KPL version to 0.13.1
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.9 by this push: new e7650ff Update KPL version to 0.13.1 e7650ff is described below commit e7650ffa1eccb914a438f672b2311b826565b2b8 Author: Abhilasha Seth AuthorDate: Thu Oct 17 15:37:40 2019 + Update KPL version to 0.13.1 This fixes https://github.com/awslabs/amazon-kinesis-producer/issues/224. The issue has been fixed in Flink 1.10.0 as part of FLINK-12847. --- flink-connectors/flink-connector-kinesis/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml index fcb9818..7efe5d7 100644 --- a/flink-connectors/flink-connector-kinesis/pom.xml +++ b/flink-connectors/flink-connector-kinesis/pom.xml @@ -35,7 +35,7 @@ under the License. 1.11.319 1.9.0 - 0.12.9 + 0.13.1 1.4.0
[flink] branch release-1.8 updated: Upgrade KPL version to 0.13.1
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.8 by this push: new 148dedd Upgrade KPL version to 0.13.1 148dedd is described below commit 148dedd44f1f4fc537d8522d5f4cd04406031a9a Author: Abhilasha Seth AuthorDate: Wed Oct 9 16:22:10 2019 -0700 Upgrade KPL version to 0.13.1 This commit mitigates the issue - https://github.com/awslabs/amazon-kinesis-producer/issues/224 This closes #14175 --- flink-connectors/flink-connector-kinesis/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml index b131e0d..f79d225 100644 --- a/flink-connectors/flink-connector-kinesis/pom.xml +++ b/flink-connectors/flink-connector-kinesis/pom.xml @@ -35,7 +35,7 @@ under the License. 1.11.319 1.9.0 - 0.12.9 + 0.13.1 1.4.0
[flink-web] branch asf-site updated: Rebuild website
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git The following commit(s) were added to refs/heads/asf-site by this push: new d2ee4f0 Rebuild website d2ee4f0 is described below commit d2ee4f0cc63408b86b029019df30b260fb2ff967 Author: Thomas Weise AuthorDate: Wed Oct 9 11:50:41 2019 -0700 Rebuild website --- content/2019/05/03/pulsar-flink.html | 3 +++ content/2019/05/14/temporal-tables.html| 3 +++ content/2019/05/19/state-ttl.html | 3 +++ content/2019/06/05/flink-network-stack.html| 3 +++ content/2019/06/26/broadcast-state.html| 3 +++ content/2019/07/23/flink-network-stack-2.html | 3 +++ content/blog/index.html| 3 +++ content/blog/page2/index.html | 3 +++ content/blog/page3/index.html | 3 +++ content/blog/page4/index.html | 3 +++ content/blog/page5/index.html | 3 +++ content/blog/page6/index.html | 3 +++ content/blog/page7/index.html | 3 +++ content/blog/page8/index.html | 3 +++ content/blog/page9/index.html | 3 +++ content/blog/release_1.0.0-changelog_known_issues.html | 3 +++ content/blog/release_1.1.0-changelog.html | 3 +++ content/blog/release_1.2.0-changelog.html | 3 +++ content/blog/release_1.3.0-changelog.html | 3 +++ content/community.html | 3 +++ content/contributing/code-style-and-quality-common.html| 3 +++ content/contributing/code-style-and-quality-components.html| 3 +++ content/contributing/code-style-and-quality-formatting.html| 3 +++ content/contributing/code-style-and-quality-java.html | 3 +++ content/contributing/code-style-and-quality-preamble.html | 3 +++ content/contributing/code-style-and-quality-pull-requests.html | 3 +++ content/contributing/code-style-and-quality-scala.html | 3 +++ content/contributing/contribute-code.html | 3 +++ content/contributing/contribute-documentation.html | 3 +++ content/contributing/how-to-contribute.html| 3 +++ content/contributing/improve-website.html | 3 +++ content/contributing/reviewing-prs.html| 3 +++ content/documentation.html | 3 +++ content/downloads.html | 3 +++ content/ecosystem.html | 7 +++ content/faq.html | 3 +++ content/feature/2019/09/13/state-processor-api.html| 3 +++ content/features/2017/07/04/flink-rescalable-state.html| 3 +++ content/features/2018/01/30/incremental-checkpointing.html | 3 +++ .../features/2018/03/01/end-to-end-exactly-once-apache-flink.html | 3 +++ content/features/2019/03/11/prometheus-monitoring.html | 3 +++ content/flink-applications.html| 3 +++ content/flink-architecture.html| 3 +++ content/flink-operations.html | 3 +++ content/gettinghelp.html | 3 +++ content/index.html | 3 +++ content/material.html | 3 +++ content/news/2014/08/26/release-0.6.html | 3 +++ content/news/2014/09/26/release-0.6.1.html | 3 +++ content/news/2014/10/03/upcoming_events.html | 3 +++ content/news/2014/11/04/release-0.7.0.html | 3 +++ content/news/2014/11/18/hadoop-compatibility.html | 3 +++ content/news/2015/01/06/december-in-flink.html | 3 +++ content/news/2015/01/21/release-0.8.html | 3 +++ content/news/2015/02/04/january-in-flink.html | 3 +++ content/news/2015/02/09/streaming-example.html | 3 +++ content/news/2015/03/02/february-2015-in-flink.html| 3 +++ .../news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html| 3 +++ content/news/2015/04/07/march-in-flink.html| 3 +++ content/news/2015/04/13/release-0.9.0-milestone1.html
[flink] branch master updated (28c5264 -> 3f5c212)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 28c5264 [FLINK-13065][datastream][docs] Clarify key selector example add 3f5c212 [FLINK-14283][kinesis][docs] Update Kinesis consumer docs for recent feature additions No new revisions were added by this update. Summary of changes: docs/dev/connectors/kinesis.md | 66 +- 1 file changed, 59 insertions(+), 7 deletions(-)
[flink] branch release-1.9 updated: [FLINK-14107][kinesis] Erroneous queue selection in record emitter may lead to deadlock
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.9 by this push: new 8186ec3 [FLINK-14107][kinesis] Erroneous queue selection in record emitter may lead to deadlock 8186ec3 is described below commit 8186ec30e513f92ec91d14add875a2f7dd245af8 Author: Thomas Weise AuthorDate: Mon Sep 16 13:33:42 2019 -0700 [FLINK-14107][kinesis] Erroneous queue selection in record emitter may lead to deadlock --- .../connectors/kinesis/util/RecordEmitter.java | 5 ++ .../connectors/kinesis/util/RecordEmitterTest.java | 66 -- 2 files changed, 65 insertions(+), 6 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java index da74b08..95c3688 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java @@ -236,6 +236,11 @@ public abstract class RecordEmitter implements Runna } if (record == null) { this.emptyQueues.put(min, true); + } else if (nextQueue != null && nextQueue.headTimestamp > min.headTimestamp) { + // if we stopped emitting due to reaching max timestamp, + // the next queue may not be the new min + heads.offer(nextQueue); + nextQueue = min; } else { heads.offer(min); } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java index 1948237..84949cc 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java @@ -17,12 +17,14 @@ package org.apache.flink.streaming.connectors.kinesis.util; +import org.apache.flink.api.common.time.Deadline; import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -32,10 +34,10 @@ import java.util.concurrent.Executors; /** Test for {@link RecordEmitter}. */ public class RecordEmitterTest { - static List results = Collections.synchronizedList(new ArrayList<>()); - private class TestRecordEmitter extends RecordEmitter { + private List results = Collections.synchronizedList(new ArrayList<>()); + private TestRecordEmitter() { super(DEFAULT_QUEUE_CAPACITY); } @@ -68,14 +70,66 @@ public class RecordEmitterTest { ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(emitter); - long timeout = System.currentTimeMillis() + 10_000; - while (results.size() != 4 && System.currentTimeMillis() < timeout) { - Thread.sleep(100); + Deadline dl = Deadline.fromNow(Duration.ofSeconds(10)); + while (emitter.results.size() != 4 && dl.hasTimeLeft()) { + Thread.sleep(10); } emitter.stop(); executor.shutdownNow(); - Assert.assertThat(results, Matchers.contains(one, five, two, ten)); + Assert.assertThat(emitter.results, Matchers.contains(one, five, two, ten)); } + @Test + public void testRetainMinAfterReachingLimit() throws Exception { + + TestRecordEmitter emitter = new TestRecordEmitter(); + + final TimestampedValue one = new TimestampedValue<>("1", 1); + final TimestampedValue two = new TimestampedValue<>("2", 2); + final TimestampedValue three = new TimestampedValue<>("3", 3); + final TimestampedValue ten = new TimestampedValue<>("10", 10); + final Timestam
[flink] branch release-1.8 updated: [FLINK-14107][kinesis] Erroneous queue selection in record emitter may lead to deadlock
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.8 by this push: new 4a4dae0 [FLINK-14107][kinesis] Erroneous queue selection in record emitter may lead to deadlock 4a4dae0 is described below commit 4a4dae0d81b830d5c4245d61ca95dbffaf2a1f2d Author: Thomas Weise AuthorDate: Mon Sep 16 13:33:42 2019 -0700 [FLINK-14107][kinesis] Erroneous queue selection in record emitter may lead to deadlock --- .../connectors/kinesis/util/RecordEmitter.java | 5 ++ .../connectors/kinesis/util/RecordEmitterTest.java | 66 -- 2 files changed, 65 insertions(+), 6 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java index 17344b1..7fa21fe 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java @@ -239,6 +239,11 @@ public abstract class RecordEmitter implements Runna } if (record == null) { this.emptyQueues.put(min, true); + } else if (nextQueue != null && nextQueue.headTimestamp > min.headTimestamp) { + // if we stopped emitting due to reaching max timestamp, + // the next queue may not be the new min + heads.offer(nextQueue); + nextQueue = min; } else { heads.offer(min); } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java index 1948237..84949cc 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java @@ -17,12 +17,14 @@ package org.apache.flink.streaming.connectors.kinesis.util; +import org.apache.flink.api.common.time.Deadline; import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -32,10 +34,10 @@ import java.util.concurrent.Executors; /** Test for {@link RecordEmitter}. */ public class RecordEmitterTest { - static List results = Collections.synchronizedList(new ArrayList<>()); - private class TestRecordEmitter extends RecordEmitter { + private List results = Collections.synchronizedList(new ArrayList<>()); + private TestRecordEmitter() { super(DEFAULT_QUEUE_CAPACITY); } @@ -68,14 +70,66 @@ public class RecordEmitterTest { ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(emitter); - long timeout = System.currentTimeMillis() + 10_000; - while (results.size() != 4 && System.currentTimeMillis() < timeout) { - Thread.sleep(100); + Deadline dl = Deadline.fromNow(Duration.ofSeconds(10)); + while (emitter.results.size() != 4 && dl.hasTimeLeft()) { + Thread.sleep(10); } emitter.stop(); executor.shutdownNow(); - Assert.assertThat(results, Matchers.contains(one, five, two, ten)); + Assert.assertThat(emitter.results, Matchers.contains(one, five, two, ten)); } + @Test + public void testRetainMinAfterReachingLimit() throws Exception { + + TestRecordEmitter emitter = new TestRecordEmitter(); + + final TimestampedValue one = new TimestampedValue<>("1", 1); + final TimestampedValue two = new TimestampedValue<>("2", 2); + final TimestampedValue three = new TimestampedValue<>("3", 3); + final TimestampedValue ten = new TimestampedValue<>("10", 10); + final Timestam
[flink] branch master updated (f65ee55 -> 4a4a147)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from f65ee55 [FLINK-14017][python] Support to start up Python worker in process mode. add 4a4a147 [FLINK-14107][kinesis] Erroneous queue selection in record emitter may lead to deadlock No new revisions were added by this update. Summary of changes: .../connectors/kinesis/util/RecordEmitter.java | 5 ++ .../connectors/kinesis/util/RecordEmitterTest.java | 66 -- 2 files changed, 65 insertions(+), 6 deletions(-)
[flink] branch release-1.9 updated: [hotfix][kinesis] Update emit record javadoc and don't count max watermark as timeout
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.9 by this push: new c905a4d [hotfix][kinesis] Update emit record javadoc and don't count max watermark as timeout c905a4d is described below commit c905a4d323c0ed4985cdb9e5764efe13bc6183d0 Author: Thomas Weise AuthorDate: Mon Aug 26 15:02:40 2019 -0700 [hotfix][kinesis] Update emit record javadoc and don't count max watermark as timeout --- .../streaming/connectors/kinesis/internals/KinesisDataFetcher.java | 5 +++-- .../connectors/kinesis/util/JobManagerWatermarkTracker.java | 4 +++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index f38e6eb..80b724b 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -715,7 +715,7 @@ public class KinesisDataFetcher { // /** -* Atomic operation to collect a record and update state to the sequence number of the record. +* Prepare a record and hand it over to the {@link RecordEmitter}, which may collect it asynchronously. * This method is called by {@link ShardConsumer}s. * * @param record the record to collect @@ -752,7 +752,8 @@ public class KinesisDataFetcher { } /** -* Actual record emission called from the record emitter. +* Atomic operation to collect a record and update state to the sequence number of the record. +* This method is called from the record emitter. * * Responsible for tracking per shard watermarks and emit timestamps extracted from * the record, when a watermark assigner was configured. diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java index f150bb0..1581024 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java @@ -144,7 +144,9 @@ public class JobManagerWatermarkTracker extends WatermarkTracker { WatermarkState ws = e.getValue(); if (ws.lastUpdated + updateTimeoutMillis < currentTime) { // ignore outdated entry - updateTimeoutCount++; + if (ws.watermark < Long.MAX_VALUE) { + updateTimeoutCount++; + } continue; } globalWatermark = Math.min(ws.watermark, globalWatermark);
[flink] branch master updated (0437ad2 -> b6bd8f6)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 0437ad2 [FLINK-13841][hive] Extend Hive version support to all 1.2 and 2.3 versions add b6bd8f6 [hotfix][kinesis] Update emit record javadoc and don't count max watermark as timeout No new revisions were added by this update. Summary of changes: .../streaming/connectors/kinesis/internals/KinesisDataFetcher.java | 5 +++-- .../connectors/kinesis/util/JobManagerWatermarkTracker.java | 4 +++- 2 files changed, 6 insertions(+), 3 deletions(-)
[flink] branch release-1.9 updated: [FLINK-12595][kinesis] Interrupt thread at right time to avoid deadlock
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.9 by this push: new 7ea55e9 [FLINK-12595][kinesis] Interrupt thread at right time to avoid deadlock 7ea55e9 is described below commit 7ea55e967bc450b3b744edcaea23834646e439cd Author: Shannon Carey AuthorDate: Sat Jul 20 14:15:50 2019 -0500 [FLINK-12595][kinesis] Interrupt thread at right time to avoid deadlock - Inside testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown, consumerThread.interrupt() was getting absorbed inside KinesisDataFetcher's while(running) loop, therefore TestableKinesisDataFetcherForShardConsumerException's awaitTermination() wasn't getting interrupted by it. This led to deadlock, with KinesisDataFetcher waiting on the test code to send the interrupt, and the test code waiting for KinesisDataFetcher to throw the expected exception. - Now, the test code waits until KinesisDataFetcher is inside awaitTermination() before producing the interrupt, so it can be sure that the interrupt it produces will be received/handled inside awaitTermination(). --- .../kinesis/internals/KinesisDataFetcherTest.java | 6 +- ...stableKinesisDataFetcherForShardConsumerException.java | 15 ++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java index 5255e61..2815193 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java @@ -846,7 +846,7 @@ public class KinesisDataFetcherTest extends TestLogger { DummyFlinkKinesisConsumer consumer = new DummyFlinkKinesisConsumer<>( TestUtils.getStandardProperties(), fetcher, 1, 0); - CheckedThread consumerThread = new CheckedThread() { + CheckedThread consumerThread = new CheckedThread("FlinkKinesisConsumer") { @Override public void go() throws Exception { consumer.run(new TestSourceContext<>()); @@ -858,6 +858,10 @@ public class KinesisDataFetcherTest extends TestLogger { // ShardConsumer exception (from deserializer) will result in fetcher being shut down. fetcher.waitUntilShutdown(20, TimeUnit.SECONDS); + // Ensure that KinesisDataFetcher has exited its while(running) loop and is inside its awaitTermination() + // method before we interrupt its thread, so that our interrupt doesn't get absorbed by any other mechanism. + fetcher.waitUntilAwaitTermination(20, TimeUnit.SECONDS); + // Interrupt the thread so that KinesisDataFetcher#awaitTermination() will throw InterruptedException. consumerThread.interrupt(); diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java index c08b7af..6ae4391 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.kinesis.testutils; +import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; @@ -32,6 +33,8 @@ import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; /** @@ -39,6 +42,10 @@ import java.util.concurrent.atomic.AtomicReference; * {@link #awaitTermination(
[flink] branch master updated (5da6e4c -> 091266d)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 5da6e4c [FLINK-13206]replace 'use database' with 'use' in sql client parser add 091266d [FLINK-12595][kinesis] Interrupt thread at right time to avoid deadlock No new revisions were added by this update. Summary of changes: .../kinesis/internals/KinesisDataFetcherTest.java | 6 +- ...stableKinesisDataFetcherForShardConsumerException.java | 15 ++- 2 files changed, 19 insertions(+), 2 deletions(-)
[flink] branch release-1.8 updated: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.8 by this push: new 184e8e0 [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer 184e8e0 is described below commit 184e8e00fd20519a037cf0bf4e3dba7132d75fa1 Author: Thomas Weise AuthorDate: Wed May 22 21:42:15 2019 -0700 [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer --- .../connectors/kinesis/FlinkKinesisConsumer.java | 18 +- .../kinesis/config/ConsumerConfigConstants.java| 11 + .../internals/DynamoDBStreamsDataFetcher.java | 1 + .../kinesis/internals/KinesisDataFetcher.java | 292 +++-- .../kinesis/util/JobManagerWatermarkTracker.java | 179 + .../connectors/kinesis/util/RecordEmitter.java | 269 +++ .../connectors/kinesis/util/WatermarkTracker.java | 114 .../kinesis/FlinkKinesisConsumerMigrationTest.java | 2 +- .../kinesis/FlinkKinesisConsumerTest.java | 185 + .../kinesis/internals/ShardConsumerTest.java | 9 +- .../testutils/TestableKinesisDataFetcher.java | 1 + .../util/JobManagerWatermarkTrackerTest.java | 101 +++ .../connectors/kinesis/util/RecordEmitterTest.java | 81 ++ .../kinesis/util/WatermarkTrackerTest.java | 108 14 files changed, 1342 insertions(+), 29 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index 3c5e3c7..5b24ded 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -45,6 +45,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; +import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; @@ -126,6 +127,7 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction imple private KinesisShardAssigner shardAssigner = KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER; private AssignerWithPeriodicWatermarks periodicWatermarkAssigner; + private WatermarkTracker watermarkTracker; // // Runtime state @@ -254,6 +256,20 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction imple ClosureCleaner.clean(this.periodicWatermarkAssigner, true); } + public WatermarkTracker getWatermarkTracker() { + return this.watermarkTracker; + } + + /** +* Set the global watermark tracker. When set, it will be used by the fetcher +* to align the shard consumers by event time. +* @param watermarkTracker +*/ + public void setWatermarkTracker(WatermarkTracker watermarkTracker) { + this.watermarkTracker = watermarkTracker; + ClosureCleaner.clean(this.watermarkTracker, true); + } + // // Source life cycle // @@ -448,7 +464,7 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction imple Properties configProps, KinesisDeserializationSchema deserializationSchema) { - return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner, periodicWatermarkAssigner); + return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner, periodicWatermarkAssigner, watermarkTracker); } @VisibleForTesting diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java index 41ac6b8..2f5be97 100644 --- a/flink-connectors/flink-conne
[flink] branch master updated: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 793a784 [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer 793a784 is described below commit 793a78407aa22530448efbf18b714952eac40aba Author: Thomas Weise AuthorDate: Wed May 22 21:42:15 2019 -0700 [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer --- .../connectors/kinesis/FlinkKinesisConsumer.java | 18 +- .../kinesis/config/ConsumerConfigConstants.java| 11 + .../internals/DynamoDBStreamsDataFetcher.java | 1 + .../kinesis/internals/KinesisDataFetcher.java | 292 +++-- .../kinesis/util/JobManagerWatermarkTracker.java | 179 + .../connectors/kinesis/util/RecordEmitter.java | 269 +++ .../connectors/kinesis/util/WatermarkTracker.java | 114 .../kinesis/FlinkKinesisConsumerMigrationTest.java | 2 +- .../kinesis/FlinkKinesisConsumerTest.java | 185 + .../kinesis/internals/ShardConsumerTest.java | 9 +- .../testutils/TestableKinesisDataFetcher.java | 1 + .../util/JobManagerWatermarkTrackerTest.java | 101 +++ .../connectors/kinesis/util/RecordEmitterTest.java | 81 ++ .../kinesis/util/WatermarkTrackerTest.java | 108 14 files changed, 1342 insertions(+), 29 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index 3c5e3c7..5b24ded 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -45,6 +45,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; +import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; @@ -126,6 +127,7 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction imple private KinesisShardAssigner shardAssigner = KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER; private AssignerWithPeriodicWatermarks periodicWatermarkAssigner; + private WatermarkTracker watermarkTracker; // // Runtime state @@ -254,6 +256,20 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction imple ClosureCleaner.clean(this.periodicWatermarkAssigner, true); } + public WatermarkTracker getWatermarkTracker() { + return this.watermarkTracker; + } + + /** +* Set the global watermark tracker. When set, it will be used by the fetcher +* to align the shard consumers by event time. +* @param watermarkTracker +*/ + public void setWatermarkTracker(WatermarkTracker watermarkTracker) { + this.watermarkTracker = watermarkTracker; + ClosureCleaner.clean(this.watermarkTracker, true); + } + // // Source life cycle // @@ -448,7 +464,7 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction imple Properties configProps, KinesisDeserializationSchema deserializationSchema) { - return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner, periodicWatermarkAssigner); + return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner, periodicWatermarkAssigner, watermarkTracker); } @VisibleForTesting diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java index 41ac6b8..2f5be97 100644 --- a/flink-connectors/flink-connector-kine
[flink] branch release-1.8 updated: [FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469)
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.8 by this push: new 7b862dc [FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469) 7b862dc is described below commit 7b862dcd91a2506f99dd6e1feacab57bc78fa4e6 Author: Kailash Dayanand AuthorDate: Wed May 22 10:40:27 2019 -0700 [FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469) --- .../streaming/api/functions/sink/filesystem/StreamingFileSink.java| 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java index dc0b1c6..f5b1bf9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java @@ -127,7 +127,7 @@ public class StreamingFileSink /** * Creates a new {@code StreamingFileSink} that writes files to the given base directory. */ - private StreamingFileSink( + protected StreamingFileSink( final StreamingFileSink.BucketsBuilder bucketsBuilder, final long bucketCheckInterval) { @@ -170,7 +170,7 @@ public class StreamingFileSink /** * The base abstract class for the {@link RowFormatBuilder} and {@link BulkFormatBuilder}. */ - private abstract static class BucketsBuilder implements Serializable { + protected abstract static class BucketsBuilder implements Serializable { private static final long serialVersionUID = 1L;
[flink] branch master updated (f652bb5 -> 7748729)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from f652bb5 [FLINK-12447][build] Set minimum maven version to 3.1.1 add 7748729 [FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469) No new revisions were added by this update. Summary of changes: .../streaming/api/functions/sink/filesystem/StreamingFileSink.java| 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-)
[flink] branch release-1.8 updated (9795457 -> cf065a0)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git. from 9795457 [FLINK-12064] [core, State Backends] RocksDBKeyedStateBackend snapshots uses incorrect key serializer if reconfigure happens during restore new edf082a [FLINK-11501] [kafka] Add ratelimiting to Kafka consumer (#7679) new e42fa0e [FLINK-11826] Ignore flaky testRateLimitedConsumer new cf065a0 [FLINK-11826][tests] Harden Kafka09ITCase#testRateLimitedConsumer The 16089 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../connectors/kafka/FlinkKafkaConsumer010.java| 10 +- .../connectors/kafka/internal/Kafka010Fetcher.java | 6 +- .../kafka/internal/Kafka010FetcherTest.java| 6 +- .../connectors/kafka/FlinkKafkaConsumer09.java | 26 +++- .../connectors/kafka/internal/Kafka09Fetcher.java | 7 +- .../kafka/internal/KafkaConsumerThread.java| 67 +- .../streaming/connectors/kafka/Kafka09ITCase.java | 142 + .../kafka/internal/Kafka09FetcherTest.java | 6 +- .../kafka/internal/KafkaConsumerThreadTest.java| 139 +++- flink-core/pom.xml | 6 + .../io/ratelimiting/FlinkConnectorRateLimiter.java | 57 - .../GuavaFlinkConnectorRateLimiter.java| 79 12 files changed, 507 insertions(+), 44 deletions(-) copy flink-java/src/main/java/org/apache/flink/api/java/summarize/ObjectColumnSummary.java => flink-core/src/main/java/org/apache/flink/api/common/io/ratelimiting/FlinkConnectorRateLimiter.java (53%) create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/io/ratelimiting/GuavaFlinkConnectorRateLimiter.java
[flink] branch release-1.8 updated: [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.8 by this push: new 11af452 [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test 11af452 is described below commit 11af4523801164539e186d836462f5884b561941 Author: Thomas Weise AuthorDate: Thu Feb 28 16:11:50 2019 -0800 [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test --- .../flink-streaming-kinesis-test/pom.xml | 92 +++ .../streaming/kinesis/test/KinesisExample.java | 91 +++ .../streaming/kinesis/test/KinesisExampleTest.java | 127 .../kinesis/test/KinesisPubsubClient.java | 128 + flink-end-to-end-tests/pom.xml | 21 flink-end-to-end-tests/run-pre-commit-tests.sh | 1 + flink-end-to-end-tests/test-scripts/common.sh | 2 +- .../test-scripts/test_streaming_kinesis.sh | 63 ++ 8 files changed, 524 insertions(+), 1 deletion(-) diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml new file mode 100644 index 000..2774964 --- /dev/null +++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml @@ -0,0 +1,92 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + flink-end-to-end-tests + org.apache.flink + 1.8-SNAPSHOT + .. + + 4.0.0 + + flink-streaming-kinesis-test_${scala.binary.version} + flink-streaming-kinesis-test + jar + + + + org.apache.flink + flink-streaming-kafka-test-base_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-connector-kinesis_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + + + + junit + junit + ${junit.version} + compile + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + fat-jar-kinesis-example + package + + shade + + + false + false + false + + + org.apache.flink.streaming.kinesis.test.KinesisExample + + + KinesisExample + + + + + + + + diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java new file mode 100644 index 000..4957c35 --- /dev/null +++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtai
[flink] branch master updated: [FLINK-9007] [kinesis] [e2e] Build Kinesis test under include-kinesis profile
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 4250819 [FLINK-9007] [kinesis] [e2e] Build Kinesis test under include-kinesis profile 4250819 is described below commit 425081951162a2a5ea027cc8ee3137c688f3aada Author: Thomas Weise AuthorDate: Sun Mar 17 11:23:24 2019 -0700 [FLINK-9007] [kinesis] [e2e] Build Kinesis test under include-kinesis profile --- flink-end-to-end-tests/pom.xml | 22 +- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index 9f77483..c6674f4 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -66,9 +66,29 @@ under the License. flink-streaming-kafka-test flink-streaming-kafka011-test flink-streaming-kafka010-test - flink-streaming-kinesis-test + + + + + include-kinesis + + + include-kinesis + + + + flink-streaming-kinesis-test + + + +
[flink] branch master updated: [hotfix] [FLINK-9007] [kinesis] [e2e] Disable record aggregation
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 92c7808 [hotfix] [FLINK-9007] [kinesis] [e2e] Disable record aggregation 92c7808 is described below commit 92c7808e9a2d02b17d4cfddf2afe675260fe4a59 Author: Thomas Weise AuthorDate: Sat Mar 16 13:14:15 2019 -0700 [hotfix] [FLINK-9007] [kinesis] [e2e] Disable record aggregation --- .../java/org/apache/flink/streaming/kinesis/test/KinesisExample.java| 2 ++ 1 file changed, 2 insertions(+) diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java index 3c52f1f..4957c35 100644 --- a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java +++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java @@ -62,6 +62,8 @@ public class KinesisExample { Properties producerProperties = new Properties(parameterTool.getProperties()); // producer needs region even when URL is specified producerProperties.putIfAbsent(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + // test driver does not deaggregate + producerProperties.putIfAbsent("AggregationEnabled", String.valueOf(false)); // KPL does not recognize endpoint URL.. String kinesisUrl = producerProperties.getProperty(ConsumerConfigConstants.AWS_ENDPOINT);
[flink] branch master updated: [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 19570c6 [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test 19570c6 is described below commit 19570c6c2938628a7e44c6f2bf3ce7ec04959ae6 Author: Thomas Weise AuthorDate: Thu Feb 28 16:11:50 2019 -0800 [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test --- .../flink-streaming-kinesis-test/pom.xml | 92 +++ .../streaming/kinesis/test/KinesisExample.java | 89 ++ .../streaming/kinesis/test/KinesisExampleTest.java | 127 .../kinesis/test/KinesisPubsubClient.java | 128 + flink-end-to-end-tests/pom.xml | 1 + flink-end-to-end-tests/run-pre-commit-tests.sh | 1 + flink-end-to-end-tests/test-scripts/common.sh | 2 +- .../test-scripts/test_streaming_kinesis.sh | 63 ++ 8 files changed, 502 insertions(+), 1 deletion(-) diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml new file mode 100644 index 000..d16f25f --- /dev/null +++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml @@ -0,0 +1,92 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + flink-end-to-end-tests + org.apache.flink + 1.9-SNAPSHOT + .. + + 4.0.0 + + flink-streaming-kinesis-test_${scala.binary.version} + flink-streaming-kinesis-test + jar + + + + org.apache.flink + flink-streaming-kafka-test-base_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-connector-kinesis_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + + + + junit + junit + ${junit.version} + compile + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + fat-jar-kinesis-example + package + + shade + + + false + false + false + + + org.apache.flink.streaming.kinesis.test.KinesisExample + + + KinesisExample + + + + + + + + diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java new file mode 100644 index 000..3c52f1f --- /dev/null +++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtai
[flink] branch master updated: [FLINK-11826] Ignore flaky testRateLimitedConsumer
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 56afa2e [FLINK-11826] Ignore flaky testRateLimitedConsumer 56afa2e is described below commit 56afa2e32a841922417e4ed02282d96c430875c2 Author: Thomas Weise AuthorDate: Tue Mar 5 12:37:09 2019 -0800 [FLINK-11826] Ignore flaky testRateLimitedConsumer --- .../java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java index cd73872..5a4e775 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java @@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import java.io.ByteArrayInputStream; @@ -166,6 +167,7 @@ public class Kafka09ITCase extends KafkaConsumerTestBase { * a desired rate of 3 bytes / second. Based on the execution time, the test asserts that this rate was not surpassed. * If no rate limiter is set on the consumer, the test should fail. */ + @Ignore @Test(timeout = 6) public void testRateLimitedConsumer() throws Exception { final String testTopic = "testRateLimitedConsumer";