[flink] branch master updated: [FLINK-26199][table-api-java] Annotate StatementSet#compilePlan as experimental.
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new fc86f13 [FLINK-26199][table-api-java] Annotate StatementSet#compilePlan as experimental. fc86f13 is described below commit fc86f13659dec102bbb7cb4c2741f3d4c530 Author: David Moravek AuthorDate: Wed Feb 16 20:01:27 2022 +0100 [FLINK-26199][table-api-java] Annotate StatementSet#compilePlan as experimental. This closes #18809. --- .../src/main/java/org/apache/flink/table/api/StatementSet.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StatementSet.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StatementSet.java index 8e93855..6a96e65 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StatementSet.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StatementSet.java @@ -18,6 +18,7 @@ package org.apache.flink.table.api; +import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.PublicEvolving; /** @@ -84,5 +85,6 @@ public interface StatementSet extends Explainable, Compilable, Exe * one job. */ @Override +@Experimental CompiledPlan compilePlan() throws TableException; }
[flink-docker] branch dev-1.13 updated: Add GPG key for 1.13.6 release (#104)
This is an automated email from the ASF dual-hosted git repository. knaufk pushed a commit to branch dev-1.13 in repository https://gitbox.apache.org/repos/asf/flink-docker.git The following commit(s) were added to refs/heads/dev-1.13 by this push: new 68c0fb9 Add GPG key for 1.13.6 release (#104) 68c0fb9 is described below commit 68c0fb9bc11a808b363361020b5dca520fbde28f Author: Konstantin Knauf AuthorDate: Thu Feb 17 08:31:23 2022 +0100 Add GPG key for 1.13.6 release (#104) --- add-version.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/add-version.sh b/add-version.sh index e98e4cc..03cdee3 100755 --- a/add-version.sh +++ b/add-version.sh @@ -102,6 +102,8 @@ elif [ "$flink_version" = "1.13.4" ]; then gpg_key="19F2195E1B4816D765A2C324C2EED7B111D464BA" elif [ "$flink_version" = "1.13.5" ]; then gpg_key="19F2195E1B4816D765A2C324C2EED7B111D464BA" +elif [ "$flink_version" = "1.13.6" ]; then +gpg_key="CCFA852FD039380AB3EC36D08C3FB007FE60DEFA" else error "Missing GPG key ID for this release" fi
[GitHub] [flink-kubernetes-operator] gyfora edited a comment on pull request #4: Use github actions for flink-kubernetes-operator CI
gyfora edited a comment on pull request #4: URL: https://github.com/apache/flink-kubernetes-operator/pull/4#issuecomment-1042631370 I will be away during the day today, I will take a look in the evening :) @morhidi / @mbalassi please review if you have time, -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #4: Use github actions for flink-kubernetes-operator CI
gyfora commented on pull request #4: URL: https://github.com/apache/flink-kubernetes-operator/pull/4#issuecomment-1042631370 I will be away during the day today, I will take a look in the evening :) @morhidi / @martonb please review if you have time, -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] asfgit closed pull request #3: README fix
asfgit closed pull request #3: URL: https://github.com/apache/flink-kubernetes-operator/pull/3 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[flink-kubernetes-operator] branch main updated: README fix
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new ae14891 README fix ae14891 is described below commit ae14891f673a23acdc83c01e12bea63030f6b007 Author: Thomas Weise AuthorDate: Wed Feb 16 14:37:12 2022 -0800 README fix Closes #3 --- README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 23ce381..249aa9a 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,6 @@ -# flink-kubernetes-operator -Temporary repository for Flink Kubernetes Operator. The content will be moved to OSS repo once created an IPR. Check [FLIP-212](https://cwiki.apache.org/confluence/display/FLINK/FLIP-212%3A+Introduce+Flink+Kubernetes+Operator) further info. +# Apache Flink Kubernetes Operator + +A Kubernetes operator for Apache Flink, implemented in Java. See [FLIP-212](https://cwiki.apache.org/confluence/display/FLINK/FLIP-212%3A+Introduce+Flink+Kubernetes+Operator). ## Installation
[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request #5: [FLINK-26135] Introduce ReconciliationStatus and improve error handling in controller flow
gyfora opened a new pull request #5: URL: https://github.com/apache/flink-kubernetes-operator/pull/5 Improvements to flink deployment status handling. Introduce `ReconciliationStatus` to allow capturing error that do not necessarily affect the running jobs. The PR does not introduce new validation logic for the deployments, that is left as a separate ticket for now The PR also improves the reonciliation flow to avoid rescheduling reconciliation when not necessary + introduces a controller test to verify the basic flow. cc @wangyang0918 @tweise -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[flink] branch release-1.14 updated: [FLINK-20830][k8s] Add type of Headless_Cluster_IP for external rest service
This is an automated email from the ASF dual-hosted git repository. wangyang0918 pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.14 by this push: new 68f0688 [FLINK-20830][k8s] Add type of Headless_Cluster_IP for external rest service 68f0688 is described below commit 68f0688e38a43e99417c26b630395e234f605aff Author: Aitozi AuthorDate: Mon Jan 24 16:32:04 2022 +0800 [FLINK-20830][k8s] Add type of Headless_Cluster_IP for external rest service This closes #18767. --- .../generated/kubernetes_config_configuration.html | 2 +- .../kubernetes/KubernetesClusterDescriptor.java| 7 +- .../configuration/KubernetesConfigOptions.java | 27 +- .../kubeclient/Fabric8FlinkKubeClient.java | 5 +- .../decorators/ExternalServiceDecorator.java | 33 ++-- .../decorators/InternalServiceDecorator.java | 24 +- .../kubeclient/services/ClusterIPService.java | 41 ++ .../services/HeadlessClusterIPService.java | 68 .../kubeclient/services/LoadBalancerService.java | 41 ++ .../kubeclient/services/NodePortService.java | 43 ++ .../kubeclient/services/ServiceType.java | 95 ++ .../apache/flink/kubernetes/utils/Constants.java | 2 - .../flink/kubernetes/KubernetesClientTestBase.java | 32 +++- .../decorators/ExternalServiceDecoratorTest.java | 17 .../factory/KubernetesJobManagerFactoryTest.java | 3 +- .../kubeclient/services/ServiceTypeTest.java | 47 +++ 16 files changed, 425 insertions(+), 62 deletions(-) diff --git a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html index 78d51ae..774ec0d 100644 --- a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html +++ b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html @@ -174,7 +174,7 @@ kubernetes.rest-service.exposed.type LoadBalancer Enum -The exposed type of the rest service. The exposed rest service could be used to access the Flink’s Web UI and REST endpoint.Possible values:"ClusterIP""NodePort""LoadBalancer" +The exposed type of the rest service. The exposed rest service could be used to access the Flink’s Web UI and REST endpoint.Possible values:"ClusterIP""NodePort""LoadBalancer""Headless_ClusterIP" kubernetes.secrets diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java index e291242..02708e8 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java @@ -125,15 +125,16 @@ public class KubernetesClusterDescriptor implements ClusterDescriptor { private String getWebMonitorAddress(Configuration configuration) throws Exception { AddressResolution resolution = AddressResolution.TRY_ADDRESS_RESOLUTION; -if (configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE) -== KubernetesConfigOptions.ServiceExposedType.ClusterIP) { +final KubernetesConfigOptions.ServiceExposedType serviceType = + configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE); +if (serviceType.isClusterIP()) { resolution = AddressResolution.NO_ADDRESS_RESOLUTION; LOG.warn( "Please note that Flink client operations(e.g. cancel, list, stop," + " savepoint, etc.) won't work from outside the Kubernetes cluster" + " since '{}' has been set to {}.", KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE.key(), -KubernetesConfigOptions.ServiceExposedType.ClusterIP); +serviceType); } return HighAvailabilityServicesUtils.getWebMonitorAddress(configuration, resolution); } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java index a5f61d5..6250435 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java @@ -24,6 +24,11 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ExternalResourceOptions; import
[flink-table-store] branch master updated: [FLINK-25876] Implement overwrite in FlinkStoreCommitImpl
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git The following commit(s) were added to refs/heads/master by this push: new 44dfecd [FLINK-25876] Implement overwrite in FlinkStoreCommitImpl 44dfecd is described below commit 44dfecd56695d9575e75b0b63a49aa8211ecde9f Author: tsreaper AuthorDate: Thu Feb 17 13:36:47 2022 +0800 [FLINK-25876] Implement overwrite in FlinkStoreCommitImpl This closes #16 --- .../apache/flink/table/store/file/Snapshot.java| 16 +- .../store/file/manifest/ManifestCommittable.java | 23 +- .../manifest/ManifestCommittableSerializer.java| 6 +- .../store/file/manifest/ManifestFileMeta.java | 29 +- .../table/store/file/manifest/ManifestList.java| 4 - .../store/file/operation/FileStoreCommit.java | 4 +- .../store/file/operation/FileStoreCommitImpl.java | 326 - .../flink/table/store/file/utils/TypeUtils.java| 93 ++ .../table/store/file/TestKeyValueGenerator.java| 9 + ...ommitTestBase.java => FileStoreCommitTest.java} | 150 +++--- .../store/file/operation/FileStoreExpireTest.java | 4 +- .../store/file/operation/FileStoreScanTest.java| 4 +- .../store/file/operation/OperationTestUtils.java | 56 +++- .../store/file/operation/TestCommitThread.java | 113 +-- .../src/test/resources/log4j2-test.xml | 29 ++ 15 files changed, 613 insertions(+), 253 deletions(-) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java index e02fa0d..9b9085b 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java @@ -37,7 +37,7 @@ public class Snapshot { private static final String FIELD_ID = "id"; private static final String FIELD_MANIFEST_LIST = "manifestList"; private static final String FIELD_COMMIT_USER = "commitUser"; -private static final String FIELD_COMMIT_DIGEST = "commitDigest"; +private static final String FIELD_COMMIT_UUID = "commitUuid"; private static final String FIELD_COMMIT_KIND = "commitKind"; private static final String FIELD_TIME_MILLIS = "timeMillis"; @@ -51,8 +51,8 @@ public class Snapshot { private final String commitUser; // for deduplication -@JsonProperty(FIELD_COMMIT_DIGEST) -private final String commitDigest; +@JsonProperty(FIELD_COMMIT_UUID) +private final String commitUuid; @JsonProperty(FIELD_COMMIT_KIND) private final CommitKind commitKind; @@ -65,13 +65,13 @@ public class Snapshot { @JsonProperty(FIELD_ID) long id, @JsonProperty(FIELD_MANIFEST_LIST) String manifestList, @JsonProperty(FIELD_COMMIT_USER) String commitUser, -@JsonProperty(FIELD_COMMIT_DIGEST) String commitDigest, +@JsonProperty(FIELD_COMMIT_UUID) String commitUuid, @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind, @JsonProperty(FIELD_TIME_MILLIS) long timeMillis) { this.id = id; this.manifestList = manifestList; this.commitUser = commitUser; -this.commitDigest = commitDigest; +this.commitUuid = commitUuid; this.commitKind = commitKind; this.timeMillis = timeMillis; } @@ -91,9 +91,9 @@ public class Snapshot { return commitUser; } -@JsonGetter(FIELD_COMMIT_DIGEST) -public String commitDigest() { -return commitDigest; +@JsonGetter(FIELD_COMMIT_UUID) +public String commitUuid() { +return commitUuid; } @JsonGetter(FIELD_COMMIT_KIND) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java index 4fcd763..765fe1a 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java @@ -27,26 +27,26 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.UUID; /** Manifest commit message. */ public class ManifestCommittable { +private final String uuid; private final Map>> newFiles; - private final Map>> compactBefore; - private final Map>> compactAfter; public ManifestCommittable() { -this.newFiles = new HashMap<>(); -this.compactBefore = new HashMap<>(); -this.compactAfter = new HashMap<>(); +this(UUID.randomUUID().toString(), new HashMap<>(),
[flink] branch master updated: [FLINK-26168][runtime] The judgment of chainable ignores StreamExchangeMode when partitioner is ForwardForConsecutiveHashPartitioner
This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 8f3a025 [FLINK-26168][runtime] The judgment of chainable ignores StreamExchangeMode when partitioner is ForwardForConsecutiveHashPartitioner 8f3a025 is described below commit 8f3a0251d195ed2532abc31e602029dfcbf7bc77 Author: Lijie Wang AuthorDate: Wed Feb 16 08:36:22 2022 +0800 [FLINK-26168][runtime] The judgment of chainable ignores StreamExchangeMode when partitioner is ForwardForConsecutiveHashPartitioner This closes #18789. --- .../api/graph/StreamingJobGraphGenerator.java | 22 -- .../ForwardForConsecutiveHashPartitionerTest.java | 13 + .../partitioner/StreamPartitionerTestUtils.java| 22 +++--- 3 files changed, 52 insertions(+), 5 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index e056048..4801ec7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -1064,8 +1064,10 @@ public class StreamingJobGraphGenerator { if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex) && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph) -&& (edge.getPartitioner() instanceof ForwardPartitioner) -&& edge.getExchangeMode() != StreamExchangeMode.BATCH +&& arePartitionerAndExchangeModeChainable( +edge.getPartitioner(), +edge.getExchangeMode(), +streamGraph.getExecutionConfig().isDynamicGraph()) && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() && streamGraph.isChainingEnabled())) { @@ -1084,6 +1086,22 @@ public class StreamingJobGraphGenerator { } @VisibleForTesting +static boolean arePartitionerAndExchangeModeChainable( +StreamPartitioner partitioner, +StreamExchangeMode exchangeMode, +boolean isDynamicGraph) { +if (partitioner instanceof ForwardForConsecutiveHashPartitioner) { +checkState(isDynamicGraph); +return true; +} else if ((partitioner instanceof ForwardPartitioner) +&& exchangeMode != StreamExchangeMode.BATCH) { +return true; +} else { +return false; +} +} + +@VisibleForTesting static boolean areOperatorsChainable( StreamNode upStreamVertex, StreamNode downStreamVertex, StreamGraph streamGraph) { StreamOperatorFactory upStreamOperator = upStreamVertex.getOperatorFactory(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java index 8e1863b..9e0fd6c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java @@ -21,6 +21,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -36,6 +37,12 @@ public class ForwardForConsecutiveHashPartitionerTest extends TestLogger { @Test public void testConvertToForwardPartitioner() { +testConvertToForwardPartitioner(StreamExchangeMode.BATCH); +testConvertToForwardPartitioner(StreamExchangeMode.PIPELINED); +testConvertToForwardPartitioner(StreamExchangeMode.UNDEFINED); +} + +private void testConvertToForwardPartitioner(StreamExchangeMode streamExchangeMode) { JobGraph jobGraph = StreamPartitionerTestUtils.createJobGraph( "group1", @@ -53,6 +60,12 @@ public class ForwardForConsecutiveHashPartitionerTest extends TestLogger { @Test public void testConvertToHashPartitioner() { +testConvertToHashPartitioner(StreamExchangeMode.BATCH); +testConvertToHashPartitioner(StreamExchangeMode.PIPELINED); +
[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #4: Use github actions for flink-kubernetes-operator CI
wangyang0918 commented on pull request #4: URL: https://github.com/apache/flink-kubernetes-operator/pull/4#issuecomment-1042544537 The CI has been successfully run my personal actions. It will take effect as soon as we merge this PR. https://github.com/wangyang0918/flink-kubernetes-operator/actions/runs/1856555659 cc @gyfora -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[flink-ml] branch release-2.0 updated: [hotfix] Fix version and branch info of docs in release 2.0
This is an automated email from the ASF dual-hosted git repository. lindong pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink-ml.git The following commit(s) were added to refs/heads/release-2.0 by this push: new f636072 [hotfix] Fix version and branch info of docs in release 2.0 f636072 is described below commit f636072f62c39a92ef71e54941ff9947baf6ed6e Author: yunfengzhou-hub AuthorDate: Thu Feb 17 11:20:12 2022 +0800 [hotfix] Fix version and branch info of docs in release 2.0 This closes #59. --- docs/config.toml | 7 --- docs/content/_index.md | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/docs/config.toml b/docs/config.toml index 1391a0b..4c41179 100644 --- a/docs/config.toml +++ b/docs/config.toml @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -baseURL = '//nightlies.apache.org/flink/flink-ml-docs-master' +baseURL = '//nightlies.apache.org/flink/flink-ml-docs-release-2.0' languageCode = 'en-us' title = 'Apache Flink Machine Learning Library' enableGitInfo = false @@ -38,10 +38,10 @@ pygmentsUseClasses = true # For stable releases, leave the bugfix version out (e.g. 1.2). For snapshot # release this should be the same as the regular version - VersionTitle = "2.0.0" + VersionTitle = "2.0" # The branch for this version of Apache Flink Machine Learning Library - Branch = "master" + Branch = "release-2.0" # The github repository for Apache Flink Machine Learning Library Repo = "//github.com/apache/flink-ml" @@ -57,6 +57,7 @@ pygmentsUseClasses = true # of the menu MenuLinks = [ ["Project Homepage", "//flink.apache.org"], +["JavaDocs", "//nightlies.apache.org/flink/flink-ml-docs-release-2.0/api/java/"], ] PreviousDocs = [] diff --git a/docs/content/_index.md b/docs/content/_index.md index e31794d..9b76115 100644 --- a/docs/content/_index.md +++ b/docs/content/_index.md @@ -33,7 +33,7 @@ build ML pipelines for both training and inference jobs. ## Try Flink ML If you’re interested in playing around with Flink ML, check out our [example -codes](({{< ref "docs/try-flink-ml/quick-start" >}})). It provides a step by +codes]({{< ref "docs/try-flink-ml/quick-start" >}}). It provides a step by step introduction to the APIs and guides you through real applications. <--->
[flink] branch master updated: [FLINK-25490][checkpoint] Complete the Chinese document regarding final checkpoint
This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 4682620 [FLINK-25490][checkpoint] Complete the Chinese document regarding final checkpoint 4682620 is described below commit 46826201f5d69ea5903c9ec1ac7d3370c7212de0 Author: Yun Gao AuthorDate: Tue Feb 15 12:03:26 2022 +0800 [FLINK-25490][checkpoint] Complete the Chinese document regarding final checkpoint This closes #18766. --- .../datastream/fault-tolerance/checkpointing.md| 39 ++ docs/content.zh/docs/internals/task_lifecycle.md | 5 ++- .../datastream/fault-tolerance/checkpointing.md| 6 ++-- 3 files changed, 46 insertions(+), 4 deletions(-) diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md index 333e6d8..2ad6d90 100644 --- a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md +++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md @@ -195,5 +195,44 @@ Flink 现在为没有迭代(iterations)的作业提供一致性的处理保 请注意在环形边上游走的记录(以及与之相关的状态变化)在故障时会丢失。 +## 部分任务结束后的 Checkpoint + +从版本 1.14 开始 Flink 支持在部分任务结束后继续进行Checkpoint。 +如果一部分数据源是有限数据集,那么就可以出现这种情况。 +从版本 1.15 开始,这一特性被默认打开。如果想要关闭这一功能,可以执行: + +```java +Configuration config = new Configuration(); +config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false); +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); +``` + +在这种情况下,结束的任务不会参与 Checkpoint 的过程。在实现自定义的算子或者 UDF (用户自定义函数)时需要考虑这一点。 + +为了支持部分任务结束后的 Checkpoint 操作,我们调整了 [任务的生命周期]({{}}) 并且引入了 +{{< javadoc file="org/apache/flink/streaming/api/operators/StreamOperator.html#finish--" name="StreamOperator#finish" >}} 方法。 +在这一方法中,用户需要写出所有缓冲区中的数据。在 finish 方法调用后的 checkpoint 中,这一任务一定不能再有缓冲区中的数据,因为在 `finish()` 后没有办法来输出这些数据。 +在大部分情况下,`finish()` 后这一任务的状态为空,唯一的例外是如果其中某些算子中包含外部系统事务的句柄(例如为了实现恰好一次语义), +在这种情况下,在 `finish()` 后进行的 checkpoint 操作应该保留这些句柄,并且在结束 checkpoint(即任务退出前所等待的 checkpoint)时提交。 +一个可以参考的例子是满足恰好一次语义的 sink 接口与 `TwoPhaseCommitSinkFunction`。 + +### 对 operator state 的影响 + +在部分 Task 结束后的checkpoint中,Flink 对 `UnionListState` 进行了特殊的处理。 +`UnionListState` 一般用于实现对外部系统读取位置的一个全局视图(例如,用于记录所有 Kafka 分区的读取偏移)。 +如果我们在算子的某个并发调用 `close()` 方法后丢弃它的状态,我们就会丢失它所分配的分区的偏移量信息。 +为了解决这一问题,对于使用 `UnionListState` 的算子我们只允许在它的并发都在运行或都已结束的时候才能进行 checkpoint 操作。 + +`ListState` 一般不会用于类似的场景,但是用户仍然需要注意在调用 `close()` 方法后进行的 checkpoint 会丢弃算子的状态并且 +这些状态在算子重启后不可用。 + +任何支持并发修改操作的算子也可以支持部分并发实例结束后的恢复操作。从这种类型的快照中恢复等价于将算子的并发改为正在运行的并发实例数。 + +### 任务结束前等待最后一次 Checkpoint + +为了保证使用两阶段提交的算子可以提交所有的数据,任务会在所有算子都调用 `finish()` 方法后等待下一次 checkpoint 成功后退出。 +需要注意的是,这一行为可能会延长任务运行的时间,如果 checkpoint 周期比较大,这一延迟会非常明显。 +极端情况下,如果 checkpoint 的周期被设置为 `Long.MAX_VALUE`,那么任务永远不会结束,因为下一次 checkpoint 不会进行。 + {{< top >}} diff --git a/docs/content.zh/docs/internals/task_lifecycle.md b/docs/content.zh/docs/internals/task_lifecycle.md index 6c96b91..24d8495 100644 --- a/docs/content.zh/docs/internals/task_lifecycle.md +++ b/docs/content.zh/docs/internals/task_lifecycle.md @@ -92,6 +92,7 @@ Task 在没有中断的情况下执行到结束的阶段如下所示: open-operators run finish-operators + wait for the final checkponit completed (if enabled) close-operators task-specific-cleanup common-cleanup @@ -117,7 +118,9 @@ task 里多个连续算子的开启是从后往前依次执行。 现在 task 可以恢复执行,算子可以开始处理新输入的数据。在这里,特定 task 的 `run()` 方法会被调用。这个方法会一直运行直到没有更多输入数据进来(有限的数据流)或者 task 被取消了(人为的或其他的原因)。这里也是算子定义的 `processElement()` 方法和 `processWatermark()` 方法执行的地方。 -在运行到完成的情况下,即没有更多的输入数据要处理,从run()方法退出后,task 进入关闭阶段。首先定时器服务停止注册任何新的定时器(比如从正在执行的定时器里注册),清理掉所有还未启动的定时器,并等待当前执行中的定时器运行结束。然后通过调用 `finishAllOperators()` 方法调用每个算子的 `finish()` 方法来通知所有参与计算的算子。然后所有缓存的输出数据会刷出去以便下游 task 处理,最终 task 通过调用每个算子的 `close()` 方法来尝试清理掉算子持有的所有资源。与我们之前提到的开启算子不同是,开启时从后往前依次调用 `open()`;而关闭时刚好相反,从前往后依次调用 `close()`。 +在运行到完成的情况下,即没有更多的输入数据要处理,从run()方法退出后,task 进入关闭阶段。首先定时器服务停止注册任何新的定时器(比如从正在执行的定时器里注册),清理掉所有还未启动的定时器,并等待当前执行中的定时器运行结束。然后通过调用 `finishAllOperators()` 方法调用每个算子的 `finish()` 方法来通知所有参与计算的算子。然后所有缓存的输出数据会刷出去以便下游 task 处理。 +如果开启了部分任务结束后继续 checkpoint 的功能,任务将 [等待下一个 checkpoint 结束]({{< ref "docs/dev/datastream/fault-tolerance/checkpointing#任务结束前等待最后一次-checkpoint" >}}) 来保证使用两阶段提交的算子可能最终提交所有的记录。 +最终 task 通过调用每个算子的 `close()` 方法来尝试清理掉算子持有的所有资源。与我们之前提到的开启算子不同是,开启时从后往前依次调用 `open()`;而关闭时刚好相反,从前往后依次调用 `close()`。 {{< hint info >}} diff --git a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md index c98386a..f763882 100644 --- a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md +++ b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md @@ -260,16 +260,16 @@
[GitHub] [flink-kubernetes-operator] wangyang0918 opened a new pull request #4: Use github actions for flink-kubernetes-operator CI
wangyang0918 opened a new pull request #4: URL: https://github.com/apache/flink-kubernetes-operator/pull/4 This PR tries to use the github actions for the CI. We have the following two jobs. * test_ci, this job will run all the unit tests and integration tests * e2e_ci, this job will run all the e2e tests * The new added e2e test `test_kubernetes_application_ha.sh` will start a Flink application with HA enabled. Wait for successful checkpoints and then kill the JobManager to verify the restoring. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[flink-web] branch asf-site updated (0f50686 -> 0cb4b0b)
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a change to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git. from 0f50686 Rebuild website new 5a8b85b [FLINK-26200] Redirect statefun directly to docs new 0cb4b0b rebuild site The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: _includes/navbar.html | 4 +- content/2019/05/03/pulsar-flink.html | 4 +- content/2019/05/14/temporal-tables.html| 4 +- content/2019/05/19/state-ttl.html | 4 +- content/2019/06/05/flink-network-stack.html| 4 +- content/2019/06/26/broadcast-state.html| 4 +- content/2019/07/23/flink-network-stack-2.html | 4 +- content/2020/04/09/pyflink-udf-support-flink.html | 4 +- content/2020/07/23/catalogs.html | 4 +- ...ql-demo-building-e2e-streaming-application.html | 4 +- .../08/04/pyflink-pandas-udf-support-flink.html| 4 +- content/2020/08/19/statefun.html | 4 +- .../flink-1.11-memory-management-improvements.html | 4 +- ...om-aligned-to-unaligned-checkpoints-part-1.html | 4 +- content/2020/12/15/pipelined-region-sheduling.html | 4 +- content/2021/01/07/pulsar-flink-connector-270.html | 4 +- content/2021/01/18/rocksdb.html| 4 +- content/2021/02/10/native-k8s-with-ha.html | 4 +- content/2021/03/11/batch-execution-mode.html | 4 +- content/2021/05/06/reactive-mode.html | 4 +- content/2021/07/07/backpressure.html | 4 +- .../2021/09/07/connector-table-sql-api-part1.html | 4 +- .../2021/09/07/connector-table-sql-api-part2.html | 4 +- content/2021/10/26/sort-shuffle-part1.html | 4 +- content/2021/10/26/sort-shuffle-part2.html | 4 +- content/2021/11/03/flink-backward.html | 4 +- content/2021/12/10/log4j-cve.html | 4 +- .../2022/01/04/scheduler-performance-part-one.html | 4 +- .../2022/01/04/scheduler-performance-part-two.html | 4 +- content/2022/01/20/pravega-connector-101.html | 4 +- content/blog/index.html| 4 +- content/blog/page10/index.html | 4 +- content/blog/page11/index.html | 4 +- content/blog/page12/index.html | 4 +- content/blog/page13/index.html | 4 +- content/blog/page14/index.html | 4 +- content/blog/page15/index.html | 4 +- content/blog/page16/index.html | 4 +- content/blog/page17/index.html | 4 +- content/blog/page18/index.html | 4 +- content/blog/page2/index.html | 4 +- content/blog/page3/index.html | 4 +- content/blog/page4/index.html | 4 +- content/blog/page5/index.html | 4 +- content/blog/page6/index.html | 4 +- content/blog/page7/index.html | 4 +- content/blog/page8/index.html | 4 +- content/blog/page9/index.html | 4 +- .../blog/release_1.0.0-changelog_known_issues.html | 4 +- content/blog/release_1.1.0-changelog.html | 4 +- content/blog/release_1.2.0-changelog.html | 4 +- content/blog/release_1.3.0-changelog.html | 4 +- content/community.html | 4 +- .../code-style-and-quality-common.html | 4 +- .../code-style-and-quality-components.html | 4 +- .../code-style-and-quality-formatting.html | 4 +- .../contributing/code-style-and-quality-java.html | 4 +- .../code-style-and-quality-preamble.html | 4 +- .../code-style-and-quality-pull-requests.html | 4 +- .../contributing/code-style-and-quality-scala.html | 4 +- content/contributing/contribute-code.html | 4 +- content/contributing/contribute-documentation.html | 4 +- content/contributing/docs-style.html | 4 +- content/contributing/how-to-contribute.html| 4 +- content/contributing/improve-website.html | 4 +- content/contributing/reviewing-prs.html| 4 +- content/documentation.html | 4 +- content/downloads.html | 4 +- content/ecosystem.html | 4 +- .../apache-beam-how-beam-runs-on-top-of-flink.html | 4 +- .../2020/06/23/flink-on-zeppelin-part2.html| 4 +- .../feature/2019/09/13/state-processor-api.html| 4 +-
[flink-web] 01/02: [FLINK-26200] Redirect statefun directly to docs
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit 5a8b85bd6c449ce4edebacf332568cd4b6845837 Author: sjwiesman AuthorDate: Wed Feb 16 17:37:28 2022 -0600 [FLINK-26200] Redirect statefun directly to docs This closes #507 --- _includes/navbar.html| 4 +- stateful-functions.md| 217 +- stateful-functions.zh.md | 218 +-- 3 files changed, 6 insertions(+), 433 deletions(-) diff --git a/_includes/navbar.html b/_includes/navbar.html index f3fc2bb..b6c9aaa 100755 --- a/_includes/navbar.html +++ b/_includes/navbar.html @@ -48,9 +48,9 @@ {% endif %} - + -{{ site.data.i18n[page.language].what_is_statefun }} +https://nightlies.apache.org/flink/flink-statefun-docs-stable/;>{{ site.data.i18n[page.language].what_is_statefun }} {{ site.data.i18n[page.language].use_case }} diff --git a/stateful-functions.md b/stateful-functions.md index 383139a..0f98cd9 100644 --- a/stateful-functions.md +++ b/stateful-functions.md @@ -2,220 +2,7 @@ title: "Stateful Functions — Event-driven Applications on Apache Flink" layout: base --- - - - - **Stateful Functions — Event-driven Applications on Apache Flink®** - - - - - +This page has moved! If you are not automatically redirected follow this [link](https://nightlies.apache.org/flink/flink-statefun-docs-stable/) - - -Stateful Functions is an API that **simplifies building distributed stateful applications**. It's based on functions with persistent state that can interact dynamically with strong consistency guarantees. - - - - - - - - - -### Stateful Functions Applications - -A _stateful function_ is a small piece of logic/code existing in multiple instances that represent entities — similar to [actors](https://www.brianstorti.com/the-actor-model/). Functions are invoked through messages and are: - - - Stateful - Functions have embedded, fault-tolerant state, accessed locally like a variable. - Virtual - Much like FaaS, functions don't reserve resources — inactive functions don't consume CPU/Memory. - - -Applications are composed of _modules_ of multiple functions that can interact arbitrarily with: - - - Exactly-once Semantics -State and messaging go hand-in-hand, providing exactly-once message/state semantics. - Logical Addressing -Functions message each other by logical addresses. No service discovery needed. - Dynamic and Cyclic Messaging -Messaging patterns don't need to be pre-defined as dataflows (dynamic) and are also not restricted to DAGs (cyclic). - - - - -## A Runtime built for Serverless Architectures - -The Stateful Functions runtime is designed to provide a set of properties similar to what characterizes [serverless functions](https://martinfowler.com/articles/serverless.html), but applied to stateful problems. - - - - - - - - - - -The runtime is built on Apache Flink®, with the following design principles: - - - Logical Compute/State Co-location: -Messaging, state access/updates and function invocations are managed tightly together. This ensures a high-level of consistency out-of-the-box. - Physical Compute/State Separation: -Functions can be executed remotely, with message and state access provided as part of the invocation request. This way, functions can be managed like stateless processes and support rapid scaling, rolling upgrades and other common operational patterns. - Language Independence: -Function invocations use a simple HTTP/gRPC-based protocol so that Functions can be easily implemented in various languages. - - -This makes it possible to execute functions on a **Kubernetes deployment**, a **FaaS platform** or **behind a (micro)service**, while providing consistent state and lightweight messaging between functions. - - - -## Key Benefits - - - - - - - - - - - - Dynamic Messaging - -The API allows you to build and compose functions that communicate dynamic- and arbitrarily with each other. This gives you much more flexibility compared to the acyclic nature of classical stream processing topologies. -Learn More - - - - - - Consistent State - Functions can keep local state that is persistent and integrated with the messaging between functions. This gives you the effect of exactly-once state access/updates and guaranteed efficient messaging out-of-the-box. - Learn More - - - - - - - Multi-language Support - -Functions can be implemented in any programming language that can handle HTTP requests or bring up a gRPC server, with initial support for Python. More SDKs will be added
[GitHub] [flink-kubernetes-operator] tweise commented on pull request #3: README fix
tweise commented on pull request #3: URL: https://github.com/apache/flink-kubernetes-operator/pull/3#issuecomment-1042385670 @gyfora PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[flink] branch master updated (979313a -> c865def)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 979313a [FLINK-26118][connectors/common] Add support for AsyncSink to downscale with state. AsyncSinkWriter can be restored from multiple BufferedRequestState add 70280bd [FLINK-26189][build] Remove bundling of grizzled-slf4j from rpc-akka add c865def [hotfix][legal] Add missing Scala license to rpc-akka No new revisions were added by this update. Summary of changes: flink-rpc/flink-rpc-akka/pom.xml | 6 -- .../src/main/resources/META-INF/NOTICE | 5 - .../main/resources/META-INF/licenses/LICENSE.scala | 25 +++--- 3 files changed, 12 insertions(+), 24 deletions(-) copy flink-filesystems/flink-gs-fs-hadoop/src/main/resources/META-INF/licenses/LICENSE.threetenbp => flink-rpc/flink-rpc-akka/src/main/resources/META-INF/licenses/LICENSE.scala (55%)
[flink] branch master updated (aec2d38 -> 979313a)
This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from aec2d38 [FLINK-25583][connectors/filesystem] Add IT cases for compaction in FileSink. add fcf8c6d [FLINK-26118][connectors/common] Setting defaults in test to reduce test complexity add 979313a [FLINK-26118][connectors/common] Add support for AsyncSink to downscale with state. AsyncSinkWriter can be restored from multiple BufferedRequestState No new revisions were added by this update. Summary of changes: .../base/sink/writer/AsyncSinkWriter.java | 19 +-- .../base/sink/writer/AsyncSinkWriterTest.java | 173 ++--- 2 files changed, 56 insertions(+), 136 deletions(-)
[flink-docker] branch master updated: Update Dockerfiles for 1.13.6 release (#105)
This is an automated email from the ASF dual-hosted git repository. knaufk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-docker.git The following commit(s) were added to refs/heads/master by this push: new 3dd2ef7 Update Dockerfiles for 1.13.6 release (#105) 3dd2ef7 is described below commit 3dd2ef7d59ff91800fc82be1f0f04b01db9080bf Author: Konstantin Knauf AuthorDate: Wed Feb 16 20:39:55 2022 +0100 Update Dockerfiles for 1.13.6 release (#105) --- 1.13/scala_2.11-java11-debian/Dockerfile | 6 +++--- 1.13/scala_2.11-java11-debian/release.metadata | 2 +- 1.13/scala_2.11-java8-debian/Dockerfile| 6 +++--- 1.13/scala_2.11-java8-debian/release.metadata | 2 +- 1.13/scala_2.12-java11-debian/Dockerfile | 6 +++--- 1.13/scala_2.12-java11-debian/release.metadata | 2 +- 1.13/scala_2.12-java8-debian/Dockerfile| 6 +++--- 1.13/scala_2.12-java8-debian/release.metadata | 2 +- 8 files changed, 16 insertions(+), 16 deletions(-) diff --git a/1.13/scala_2.11-java11-debian/Dockerfile b/1.13/scala_2.11-java11-debian/Dockerfile index 8112da2..f7fad32 100644 --- a/1.13/scala_2.11-java11-debian/Dockerfile +++ b/1.13/scala_2.11-java11-debian/Dockerfile @@ -44,9 +44,9 @@ RUN set -ex; \ gosu nobody true # Configure Flink version -ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download=flink/flink-1.13.5/flink-1.13.5-bin-scala_2.11.tgz \ - FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.13.5/flink-1.13.5-bin-scala_2.11.tgz.asc \ -GPG_KEY=19F2195E1B4816D765A2C324C2EED7B111D464BA \ +ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download=flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz \ + FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz.asc \ +GPG_KEY=CCFA852FD039380AB3EC36D08C3FB007FE60DEFA \ CHECK_GPG=true # Prepare environment diff --git a/1.13/scala_2.11-java11-debian/release.metadata b/1.13/scala_2.11-java11-debian/release.metadata index 15e97bb..9c08cc8 100644 --- a/1.13/scala_2.11-java11-debian/release.metadata +++ b/1.13/scala_2.11-java11-debian/release.metadata @@ -1,2 +1,2 @@ -Tags: 1.13.5-scala_2.11-java11, 1.13-scala_2.11-java11, scala_2.11-java11 +Tags: 1.13.6-scala_2.11-java11, 1.13-scala_2.11-java11, scala_2.11-java11 Architectures: amd64 diff --git a/1.13/scala_2.11-java8-debian/Dockerfile b/1.13/scala_2.11-java8-debian/Dockerfile index 3d6aa7b..ace4b73 100644 --- a/1.13/scala_2.11-java8-debian/Dockerfile +++ b/1.13/scala_2.11-java8-debian/Dockerfile @@ -44,9 +44,9 @@ RUN set -ex; \ gosu nobody true # Configure Flink version -ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download=flink/flink-1.13.5/flink-1.13.5-bin-scala_2.11.tgz \ - FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.13.5/flink-1.13.5-bin-scala_2.11.tgz.asc \ -GPG_KEY=19F2195E1B4816D765A2C324C2EED7B111D464BA \ +ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download=flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz \ + FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz.asc \ +GPG_KEY=CCFA852FD039380AB3EC36D08C3FB007FE60DEFA \ CHECK_GPG=true # Prepare environment diff --git a/1.13/scala_2.11-java8-debian/release.metadata b/1.13/scala_2.11-java8-debian/release.metadata index ae4190e..0798f06 100644 --- a/1.13/scala_2.11-java8-debian/release.metadata +++ b/1.13/scala_2.11-java8-debian/release.metadata @@ -1,2 +1,2 @@ -Tags: 1.13.5-scala_2.11-java8, 1.13-scala_2.11-java8, scala_2.11-java8, 1.13.5-scala_2.11, 1.13-scala_2.11, scala_2.11 +Tags: 1.13.6-scala_2.11-java8, 1.13-scala_2.11-java8, scala_2.11-java8, 1.13.6-scala_2.11, 1.13-scala_2.11, scala_2.11 Architectures: amd64 diff --git a/1.13/scala_2.12-java11-debian/Dockerfile b/1.13/scala_2.12-java11-debian/Dockerfile index f1b1e29..180d048 100644 --- a/1.13/scala_2.12-java11-debian/Dockerfile +++ b/1.13/scala_2.12-java11-debian/Dockerfile @@ -44,9 +44,9 @@ RUN set -ex; \ gosu nobody true # Configure Flink version -ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download=flink/flink-1.13.5/flink-1.13.5-bin-scala_2.12.tgz \ - FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.13.5/flink-1.13.5-bin-scala_2.12.tgz.asc \ -GPG_KEY=19F2195E1B4816D765A2C324C2EED7B111D464BA \ +ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download=flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz \ + FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz.asc \ +GPG_KEY=CCFA852FD039380AB3EC36D08C3FB007FE60DEFA \ CHECK_GPG=true # Prepare environment diff --git a/1.13/scala_2.12-java11-debian/release.metadata b/1.13/scala_2.12-java11-debian/release.metadata index 0d5d203..ce57c6a 100644 --- a/1.13/scala_2.12-java11-debian/release.metadata +++
[flink-docker] branch master updated (d232f51 -> 6b00930)
This is an automated email from the ASF dual-hosted git repository. knaufk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink-docker.git. from d232f51 [FLINK-25679] Add support for arm64v8 add 6b00930 [hotfix] replace example for pull request to docker-library/official-images (#106) No new revisions were added by this update. Summary of changes: README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
svn commit: r52581 - /dev/flink/flink-1.13.6-rc1/ /release/flink/flink-1.13.6/
Author: knaufk Date: Wed Feb 16 18:11:21 2022 New Revision: 52581 Log: Release Flink 1.13.6 Added: release/flink/flink-1.13.6/ - copied from r52580, dev/flink/flink-1.13.6-rc1/ Removed: dev/flink/flink-1.13.6-rc1/
[flink] 06/06: [FLINK-25583][connectors/filesystem] Add IT cases for compaction in FileSink.
This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit aec2d38710a67d90bd819bfdce66b5a5a646a882 Author: Gen Luo AuthorDate: Wed Feb 16 18:38:48 2022 +0800 [FLINK-25583][connectors/filesystem] Add IT cases for compaction in FileSink. This closes #18680. --- .../file/sink/BatchCompactingFileSinkITCase.java | 69 .../file/sink/FileSinkCompactionSwitchITCase.java | 391 + .../flink/connector/file/sink/FileSinkITBase.java | 4 +- .../sink/StreamingCompactingFileSinkITCase.java| 69 4 files changed, 532 insertions(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchCompactingFileSinkITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchCompactingFileSinkITCase.java new file mode 100644 index 000..5167e97 --- /dev/null +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchCompactingFileSinkITCase.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink; + +import org.apache.flink.connector.file.sink.compactor.DecoderBasedReader; +import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy; +import org.apache.flink.connector.file.sink.compactor.FileCompactor; +import org.apache.flink.connector.file.sink.compactor.RecordWiseFileCompactor; +import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils; +import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.IntDecoder; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; + +import org.junit.Rule; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** Tests the compaction of the {@link FileSink} in BATCH mode. */ +@RunWith(Parameterized.class) +public class BatchCompactingFileSinkITCase extends BatchExecutionFileSinkITCase { + +private static final int PARALLELISM = 4; + +@Rule +public final MiniClusterWithClientResource miniClusterResource = +new MiniClusterWithClientResource( +new MiniClusterResourceConfiguration.Builder() +.setNumberTaskManagers(1) +.setNumberSlotsPerTaskManager(PARALLELISM) +.setRpcServiceSharing(RpcServiceSharing.DEDICATED) +.withHaLeadershipControl() +.build()); + +@Override +protected FileSink createFileSink(String path) { +return FileSink.forRowFormat(new Path(path), new IntegerFileSinkTestDataUtils.IntEncoder()) +.withBucketAssigner( +new IntegerFileSinkTestDataUtils.ModuloBucketAssigner(NUM_BUCKETS)) +.withRollingPolicy(new PartSizeAndCheckpointRollingPolicy(1024)) +.enableCompact(createFileCompactStrategy(), createFileCompactor()) +.build(); +} + +private static FileCompactor createFileCompactor() { +return new RecordWiseFileCompactor<>(new DecoderBasedReader.Factory<>(IntDecoder::new)); +} + +private static FileCompactStrategy createFileCompactStrategy() { +return FileCompactStrategy.Builder.newBuilder().setSizeThreshold(1).build(); +} +} diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java new file mode 100644 index 000..4a7c0f0 --- /dev/null +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software
[flink] 03/06: [FLINK-25583][connectors/filesystem] Add the getPath and getSize methods in PendingFileRecoverable.
This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit b2ea16c4aace21b72faf57a1760abf9d65035f3a Author: Gen Luo AuthorDate: Wed Jan 26 15:27:53 2022 +0800 [FLINK-25583][connectors/filesystem] Add the getPath and getSize methods in PendingFileRecoverable. --- .../file/sink/utils/FileSinkTestUtils.java | 19 +++ .../sink/filesystem/BulkBucketWriter.java | 5 +- .../functions/sink/filesystem/BulkPartWriter.java | 4 +- .../sink/filesystem/InProgressFileWriter.java | 13 +- .../OutputStreamBasedPartFileWriter.java | 162 +++-- .../sink/filesystem/RowWiseBucketWriter.java | 5 +- .../sink/filesystem/RowWisePartWriter.java | 4 +- .../hadoop/bulk/HadoopPathBasedPartFileWriter.java | 64 +++- 8 files changed, 254 insertions(+), 22 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/FileSinkTestUtils.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/FileSinkTestUtils.java index 908aa42..b3d7041 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/FileSinkTestUtils.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/FileSinkTestUtils.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.file.sink.utils; import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter; @@ -35,12 +36,30 @@ public class FileSinkTestUtils { /** A type of testing {@link InProgressFileWriter.PendingFileRecoverable}. */ public static class TestPendingFileRecoverable extends StringValue implements InProgressFileWriter.PendingFileRecoverable { +@Override +public Path getPath() { +return null; +} + +@Override +public long getSize() { +return -1L; +} // Nope } /** A type of testing {@link InProgressFileWriter.InProgressFileRecoverable}. */ public static class TestInProgressFileRecoverable extends StringValue implements InProgressFileWriter.InProgressFileRecoverable { +@Override +public Path getPath() { +return null; +} + +@Override +public long getSize() { +return -1L; +} // Nope } diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java index 7906243..0c4ee74 100644 --- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java +++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java @@ -50,6 +50,7 @@ public class BulkBucketWriter public InProgressFileWriter resumeFrom( final BucketID bucketId, final RecoverableFsDataOutputStream stream, +final Path path, final RecoverableWriter.ResumeRecoverable resumable, final long creationTime) throws IOException { @@ -58,7 +59,7 @@ public class BulkBucketWriter Preconditions.checkNotNull(resumable); final BulkWriter writer = writerFactory.create(stream); -return new BulkPartWriter<>(bucketId, stream, writer, creationTime); +return new BulkPartWriter<>(bucketId, path, stream, writer, creationTime); } @Override @@ -73,6 +74,6 @@ public class BulkBucketWriter Preconditions.checkNotNull(path); final BulkWriter writer = writerFactory.create(stream); -return new BulkPartWriter<>(bucketId, stream, writer, creationTime); +return new BulkPartWriter<>(bucketId, path, stream, writer, creationTime); } } diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java index 758296d..d770c69 100644 --- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java +++
[flink] branch master updated (3cf0393 -> aec2d38)
This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 3cf0393 [FLINK-24385][table] Introduce TRY_CAST new 6a0d6fa [hotfix][core] Do not set parallelism without checking whether the parallelism is set when translating Sink. new 51adf0f [FLINK-25583][connectors/filesystem] Introduce CompactingFileWriter, implement in implementations of InProgressFileWriter. new b2ea16c [FLINK-25583][connectors/filesystem] Add the getPath and getSize methods in PendingFileRecoverable. new 824752c8 [FLINK-25583][connectors/filesystem] Add bucketId and compactedFileToCleanup in FileSinkCommittable, delete compactedFileToCleanup in FileCommitter. new 255d656 [FLINK-25583][connectors/filesystem] Add compaction support for FileSink. new aec2d38 [FLINK-25583][connectors/filesystem] Add IT cases for compaction in FileSink. The 6 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../7602816f-5c01-4b7a-9e3e-235dfedec245 | 1 - .../apache/flink/connector/file/sink/FileSink.java | 169 ++- .../connector/file/sink/FileSinkCommittable.java | 74 ++- .../file/sink/FileSinkCommittableSerializer.java | 46 +- .../file/sink/committer/FileCommitter.java | 21 + .../file/sink/compactor/ConcatFileCompactor.java | 72 +++ .../file/sink/compactor/DecoderBasedReader.java| 96 .../file/sink/compactor/FileCompactStrategy.java | 112 + .../file/sink/compactor/FileCompactor.java | 48 ++ .../sink/compactor/IdenticalFileCompactor.java | 45 ++ .../sink/compactor/InputFormatBasedReader.java | 78 +++ .../compactor/OutputStreamBasedFileCompactor.java | 53 ++ .../sink/compactor/RecordWiseFileCompactor.java| 84 .../file/sink/compactor/SimpleStringDecoder.java | 55 +++ .../compactor/operator/CompactCoordinator.java | 265 ++ .../operator/CompactCoordinatorFactory.java| 77 +++ .../operator/CompactCoordinatorStateHandler.java | 89 .../CompactCoordinatorStateHandlerFactory.java | 87 .../sink/compactor/operator/CompactService.java| 147 ++ .../sink/compactor/operator/CompactorOperator.java | 343 + .../operator/CompactorOperatorFactory.java | 93 .../operator/CompactorOperatorStateHandler.java| 323 + .../CompactorOperatorStateHandlerFactory.java | 84 .../sink/compactor/operator/CompactorRequest.java | 70 +++ .../operator/CompactorRequestSerializer.java | 98 .../operator/CompactorRequestTypeInfo.java | 120 + .../file/sink/writer/FileWriterBucket.java | 5 +- .../file/sink/BatchCompactingFileSinkITCase.java | 69 +++ .../file/sink/FileCommittableSerializerTest.java | 25 +- ...FileSinkCommittableSerializerMigrationTest.java | 4 +- .../file/sink/FileSinkCompactionSwitchITCase.java | 391 +++ .../flink/connector/file/sink/FileSinkITBase.java | 4 +- .../sink/StreamingCompactingFileSinkITCase.java| 69 +++ .../file/sink/committer/FileCommitterTest.java | 14 +- .../sink/compactor/AbstractCompactTestBase.java| 58 +++ .../sink/compactor/CompactCoordinatorTest.java | 449 + .../file/sink/compactor/CompactorOperatorTest.java | 534 + .../operator/CompactorRequestTypeInfoTest.java | 41 ++ .../file/sink/utils/FileSinkTestUtils.java | 91 +++- .../sink/utils/IntegerFileSinkTestDataUtils.java | 26 + .../functions/sink/filesystem/BucketWriter.java| 26 + .../sink/filesystem/BulkBucketWriter.java | 5 +- .../functions/sink/filesystem/BulkPartWriter.java | 5 +- ...ssFileWriter.java => CompactingFileWriter.java} | 47 +- .../sink/filesystem/InProgressFileWriter.java | 23 +- ... => OutputStreamBasedCompactingFileWriter.java} | 35 +- .../OutputStreamBasedPartFileWriter.java | 198 +++- ...er.java => RecordWiseCompactingFileWriter.java} | 33 +- .../sink/filesystem/RowWiseBucketWriter.java | 5 +- .../sink/filesystem/RowWisePartWriter.java | 12 +- .../apache/flink/util/CloseShieldOutputStream.java | 56 +++ .../SerializableSupplierWithException.java | 34 ++ .../flink/formats/avro/AvroWriterFactory.java | 37 +- .../hadoop/bulk/HadoopPathBasedPartFileWriter.java | 64 ++- .../translators/SinkTransformationTranslator.java | 11 - flink-tests/pom.xml| 8 + 56 files changed, 4952 insertions(+), 177 deletions(-) create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.java create mode 100644
[flink] 04/06: [FLINK-25583][connectors/filesystem] Add bucketId and compactedFileToCleanup in FileSinkCommittable, delete compactedFileToCleanup in FileCommitter.
This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 824752c82729b5fd6aab5c6f205476ae63c8aff5 Author: Gen Luo AuthorDate: Tue Jan 25 18:31:17 2022 +0800 [FLINK-25583][connectors/filesystem] Add bucketId and compactedFileToCleanup in FileSinkCommittable, delete compactedFileToCleanup in FileCommitter. --- .../connector/file/sink/FileSinkCommittable.java | 74 +- .../file/sink/FileSinkCommittableSerializer.java | 46 -- .../file/sink/committer/FileCommitter.java | 21 ++ .../file/sink/writer/FileWriterBucket.java | 5 +- .../file/sink/FileCommittableSerializerTest.java | 25 +++- .../file/sink/committer/FileCommitterTest.java | 14 ++-- 6 files changed, 167 insertions(+), 18 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittable.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittable.java index 2c5e8e5..7ea5b1d 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittable.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittable.java @@ -19,11 +19,13 @@ package org.apache.flink.connector.file.sink; import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter; import javax.annotation.Nullable; import java.io.Serializable; +import java.util.Objects; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -34,26 +36,51 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public class FileSinkCommittable implements Serializable { +private final String bucketId; + @Nullable private final InProgressFileWriter.PendingFileRecoverable pendingFile; @Nullable private final InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup; -public FileSinkCommittable(InProgressFileWriter.PendingFileRecoverable pendingFile) { +@Nullable private final Path compactedFileToCleanup; + +public FileSinkCommittable( +String bucketId, InProgressFileWriter.PendingFileRecoverable pendingFile) { +this.bucketId = bucketId; this.pendingFile = checkNotNull(pendingFile); this.inProgressFileToCleanup = null; +this.compactedFileToCleanup = null; } public FileSinkCommittable( +String bucketId, InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup) { +this.bucketId = bucketId; this.pendingFile = null; this.inProgressFileToCleanup = checkNotNull(inProgressFileToCleanup); +this.compactedFileToCleanup = null; +} + +public FileSinkCommittable(String bucketId, Path compactedFileToCleanup) { +this.bucketId = bucketId; +this.pendingFile = null; +this.inProgressFileToCleanup = null; +this.compactedFileToCleanup = checkNotNull(compactedFileToCleanup); } FileSinkCommittable( +String bucketId, @Nullable InProgressFileWriter.PendingFileRecoverable pendingFile, -@Nullable InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup) { +@Nullable InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup, +@Nullable Path compactedFileToCleanup) { +this.bucketId = bucketId; this.pendingFile = pendingFile; this.inProgressFileToCleanup = inProgressFileToCleanup; +this.compactedFileToCleanup = compactedFileToCleanup; +} + +public String getBucketId() { +return bucketId; } public boolean hasPendingFile() { @@ -73,4 +100,47 @@ public class FileSinkCommittable implements Serializable { public InProgressFileWriter.InProgressFileRecoverable getInProgressFileToCleanup() { return inProgressFileToCleanup; } + +public boolean hasCompactedFileToCleanup() { +return compactedFileToCleanup != null; +} + +@Nullable +public Path getCompactedFileToCleanup() { +return compactedFileToCleanup; +} + +@Override +public boolean equals(Object o) { +if (this == o) { +return true; +} +if (o == null || getClass() != o.getClass()) { +return false; +} +FileSinkCommittable that = (FileSinkCommittable) o; +return Objects.equals(bucketId, that.bucketId) +&& Objects.equals(pendingFile, that.pendingFile) +&& Objects.equals(inProgressFileToCleanup, that.inProgressFileToCleanup) +&&
[flink] 02/06: [FLINK-25583][connectors/filesystem] Introduce CompactingFileWriter, implement in implementations of InProgressFileWriter.
This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 51adf0fcbdc63520a58246100b5a585b9f563ba6 Author: Gen Luo AuthorDate: Tue Jan 25 18:23:27 2022 +0800 [FLINK-25583][connectors/filesystem] Introduce CompactingFileWriter, implement in implementations of InProgressFileWriter. --- .../7602816f-5c01-4b7a-9e3e-235dfedec245 | 1 - .../functions/sink/filesystem/BucketWriter.java| 26 .../functions/sink/filesystem/BulkPartWriter.java | 1 + ...ssFileWriter.java => CompactingFileWriter.java} | 47 +- .../sink/filesystem/InProgressFileWriter.java | 10 - ... => OutputStreamBasedCompactingFileWriter.java} | 35 ++-- .../OutputStreamBasedPartFileWriter.java | 36 - ...er.java => RecordWiseCompactingFileWriter.java} | 33 +-- .../sink/filesystem/RowWisePartWriter.java | 8 ++-- 9 files changed, 117 insertions(+), 80 deletions(-) diff --git a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245 b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245 index 4bfb283..f61d7d9 100644 --- a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245 +++ b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245 @@ -225,7 +225,6 @@ org.apache.flink.streaming.api.functions.sink.filesystem.AbstractPartFileWriter org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter$PendingFile does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$InProgressFileRecoverable does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated -org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$PendingFileRecoverable does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedInProgressFileRecoverable does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java index 88ad598..00c7890 100644 --- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java +++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java @@ -40,6 +40,32 @@ public interface BucketWriter { final BucketID bucketID, final Path path, final long creationTime) throws IOException; /** + * Used to create a new {@link CompactingFileWriter} of the requesting type. Requesting a writer + * of an unsupported type will result in UnsupportedOperationException. By default, only + * RECORD_WISE type is supported, for which a {@link InProgressFileWriter} will be created. + * + * @param type the type of this writer. + * @param bucketID the id of the bucket this writer is writing to. + * @param path the path this writer will write to. + * @param creationTime the creation time of the file. + * @return the new {@link InProgressFileWriter} + * @throws IOException Thrown if creating a writer fails. + * @throws
[flink] 01/06: [hotfix][core] Do not set parallelism without checking whether the parallelism is set when translating Sink.
This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 6a0d6fa4de610ded1220845365c59a84831bc454 Author: Gen Luo AuthorDate: Wed Feb 9 18:56:41 2022 +0800 [hotfix][core] Do not set parallelism without checking whether the parallelism is set when translating Sink. --- .../runtime/translators/SinkTransformationTranslator.java | 11 --- 1 file changed, 11 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java index d2efbb8..97c19d3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java @@ -27,7 +27,6 @@ import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo; -import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies; import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology; import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology; import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology; @@ -233,16 +232,6 @@ public class SinkTransformationTranslator transformations.subList(numTransformsBefore, transformations.size()); for (Transformation subTransformation : expandedTransformations) { -// Skip overwriting the parallelism for the global committer -if (subTransformation.getName() == null -|| !subTransformation -.getName() -.equals( -StandardSinkTopologies - .GLOBAL_COMMITTER_TRANSFORMATION_NAME)) { - subTransformation.setParallelism(transformation.getParallelism()); -} - concatUid( subTransformation, Transformation::getUid,
[flink] branch master updated (6fc4bad -> c7aa5e5)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 6fc4bad [FLINK-26079][state/changelog] Disallow recovery from non-changelog checkpoints add c7aa5e5 [FLINK-26095][metrics] Respect bind-host for MQS actor system No new revisions were added by this update. Summary of changes: .../flink/runtime/entrypoint/ClusterEntrypoint.java | 5 - .../apache/flink/runtime/metrics/util/MetricUtils.java | 16 +--- .../apache/flink/runtime/minicluster/MiniCluster.java| 1 + .../flink/runtime/taskexecutor/TaskManagerRunner.java| 5 - .../flink/runtime/metrics/util/MetricUtilsTest.java | 2 +- 5 files changed, 23 insertions(+), 6 deletions(-)
[flink] branch master updated (20e03f4 -> 6fc4bad)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 20e03f4 [FLINK-26177][tests] Disable PulsarSourceITCase rescaling tests temporarily new 369088f [hotfix][tests] Explicitly disable changelog in migration tests new 6fc4bad [FLINK-26079][state/changelog] Disallow recovery from non-changelog checkpoints The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: docs/content.zh/docs/ops/state/state_backends.md | 4 +- docs/content/docs/ops/state/state_backends.md | 4 +- .../state/changelog/ChangelogStateBackend.java | 32 ++- .../LegacyStatefulJobSavepointMigrationITCase.java | 1 + .../utils/StatefulJobSavepointMigrationITCase.java | 1 + .../StatefulJobWBroadcastStateMigrationITCase.java | 1 + .../TypeSerializerSnapshotMigrationITCase.java | 1 + .../test/state/ChangelogCompatibilityITCase.java | 282 + .../restore/AbstractOperatorRestoreTestBase.java | 1 + .../java/org/apache/flink/test/util/TestUtils.java | 9 +- .../StatefulJobSavepointMigrationITCase.scala | 1 + ...StatefulJobWBroadcastStateMigrationITCase.scala | 2 + 12 files changed, 323 insertions(+), 16 deletions(-) create mode 100644 flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
[flink] 02/02: [FLINK-26079][state/changelog] Disallow recovery from non-changelog checkpoints
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 6fc4bad2c6443ef33ac86a286f815ecc9afba31c Author: Roman Khachatryan AuthorDate: Fri Feb 11 16:12:29 2022 +0100 [FLINK-26079][state/changelog] Disallow recovery from non-changelog checkpoints Private state of non-changelog checkpoints is not registered with the SharedStateRegistry on recovery. Therefore, after recovering in CLAIM mode it will be discarded as soon as the initial checkpoint is subsumed. This change disallows recovery by checking handle types during Changelog backend creation. --- docs/content.zh/docs/ops/state/state_backends.md | 4 +- docs/content/docs/ops/state/state_backends.md | 4 +- .../state/changelog/ChangelogStateBackend.java | 32 ++- .../test/state/ChangelogCompatibilityITCase.java | 282 + .../java/org/apache/flink/test/util/TestUtils.java | 9 +- .../StatefulJobSavepointMigrationITCase.scala | 1 + 6 files changed, 316 insertions(+), 16 deletions(-) diff --git a/docs/content.zh/docs/ops/state/state_backends.md b/docs/content.zh/docs/ops/state/state_backends.md index 38575f6..e6d6764 100644 --- a/docs/content.zh/docs/ops/state/state_backends.md +++ b/docs/content.zh/docs/ops/state/state_backends.md @@ -405,9 +405,9 @@ If a task is backpressured by writing state changes, it will be shown as busy (r **Enabling Changelog** -Resuming from both savepoints and checkpoints is supported: +Resuming only from savepoints in canonical format is supported: - given an existing non-changelog job -- take either a [savepoint]({{< ref "docs/ops/state/savepoints#resuming-from-savepoints" >}}) or a [checkpoint]({{< ref "docs/ops/state/checkpoints#resuming-from-a-retained-checkpoint" >}}) +- take a [savepoint]({{< ref "docs/ops/state/savepoints#resuming-from-savepoints" >}}) (canonical format is the default) - alter configuration (enable Changelog) - resume from the taken snapshot diff --git a/docs/content/docs/ops/state/state_backends.md b/docs/content/docs/ops/state/state_backends.md index 8690e3d..aa06c909 100644 --- a/docs/content/docs/ops/state/state_backends.md +++ b/docs/content/docs/ops/state/state_backends.md @@ -427,9 +427,9 @@ If a task is backpressured by writing state changes, it will be shown as busy (r **Enabling Changelog** -Resuming from both savepoints and checkpoints is supported: +Resuming only from savepoints in canonical format is supported: - given an existing non-changelog job -- take either a [savepoint]({{< ref "docs/ops/state/savepoints#resuming-from-savepoints" >}}) or a [checkpoint]({{< ref "docs/ops/state/checkpoints#resuming-from-a-retained-checkpoint" >}}) +- take a [savepoint]({{< ref "docs/ops/state/savepoints#resuming-from-savepoints" >}}) (canonical format is the default) - alter configuration (enable Changelog) - resume from the taken snapshot diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java index 34c1ed3..a1a2016 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.execution.Environment; @@ -36,6 +37,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SavepointKeyedStateHandle; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle; import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl; @@ -267,19 +269,29 @@ public class ChangelogStateBackend implements DelegatingStateBackend, Configurab } return stateHandles.stream() .filter(Objects::nonNull) -.map( -keyedStateHandle -> -keyedStateHandle instanceof ChangelogStateBackendHandle -
[flink] 01/02: [hotfix][tests] Explicitly disable changelog in migration tests
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 369088f0f94ae7732f8a65b6676f2187c871328a Author: Roman Khachatryan AuthorDate: Tue Feb 15 22:45:13 2022 +0100 [hotfix][tests] Explicitly disable changelog in migration tests --- .../checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java | 1 + .../test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java | 1 + .../checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java | 1 + .../flink/test/migration/TypeSerializerSnapshotMigrationITCase.java | 1 + .../test/state/operator/restore/AbstractOperatorRestoreTestBase.java| 1 + .../api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala | 2 ++ 6 files changed, 7 insertions(+) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java index 6ac2d98..e59be25 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java @@ -168,6 +168,7 @@ public class LegacyStatefulJobSavepointMigrationITCase extends SavepointMigratio default: throw new UnsupportedOperationException(); } +env.enableChangelogStateBackend(false); env.enableCheckpointing(500); env.setParallelism(4); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java index 007be6a..260c1f2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java @@ -133,6 +133,7 @@ public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestB env.enableCheckpointing(500); env.setParallelism(parallelism); env.setMaxParallelism(parallelism); +env.enableChangelogStateBackend(false); SourceFunction> nonParallelSource; SourceFunction> parallelSource; diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java index 35c7257..4470b85 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java @@ -113,6 +113,7 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio default: throw new UnsupportedOperationException(); } +env.enableChangelogStateBackend(false); env.enableCheckpointing(500); env.setParallelism(parallelism); diff --git a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java index c9e59db..aa22407 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java @@ -130,6 +130,7 @@ public class TypeSerializerSnapshotMigrationITCase extends SavepointMigrationTes default: throw new UnsupportedOperationException(); } +env.enableChangelogStateBackend(false); env.enableCheckpointing(500); env.setParallelism(parallelism); diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java index f5bbff4..0ddf11e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java @@ -214,6 +214,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger { env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE); env.setRestartStrategy(RestartStrategies.noRestart()); env.setStateBackend((StateBackend) new
[flink] branch master updated (9bd61e1 -> 20e03f4)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 9bd61e1 [FLINK-24474] Bind Jobmanager and Taskmanager RPC host addresses to localhost by default add 20e03f4 [FLINK-26177][tests] Disable PulsarSourceITCase rescaling tests temporarily No new revisions were added by this update. Summary of changes: .../pulsar/source/PulsarSourceITCase.java | 24 ++ 1 file changed, 24 insertions(+)
[flink] branch master updated: [FLINK-24474] Bind Jobmanager and Taskmanager RPC host addresses to localhost by default
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 9bd61e1 [FLINK-24474] Bind Jobmanager and Taskmanager RPC host addresses to localhost by default 9bd61e1 is described below commit 9bd61e1334e4805541b2b3689fe500e949a86658 Author: Mika AuthorDate: Wed Feb 16 15:29:57 2022 +0100 [FLINK-24474] Bind Jobmanager and Taskmanager RPC host addresses to localhost by default --- flink-dist/src/main/resources/flink-conf.yaml | 15 +++ .../util/flink/container/FlinkContainersBuilder.java | 3 +++ .../decorators/FlinkConfMountDecorator.java | 4 .../org/apache/flink/runtime/net/ConnectionUtils.java | 19 ++- .../flink/yarn/entrypoint/YarnEntrypointUtils.java| 3 +++ 5 files changed, 43 insertions(+), 1 deletion(-) diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml index 39b1f71..eb9bc83 100644 --- a/flink-dist/src/main/resources/flink-conf.yaml +++ b/flink-dist/src/main/resources/flink-conf.yaml @@ -36,6 +36,14 @@ jobmanager.rpc.address: localhost jobmanager.rpc.port: 6123 +# The host interface the JobManager will bind to. My default, this is localhost, and will prevent +# the JobManager from communicating outside the machine/container it is running on. +# +# To enable this, set the bind-host address to one that has access to an outside facing network +# interface, such as 0.0.0.0. + +jobmanager.bind-host: localhost + # The total process memory size for the JobManager. # @@ -43,6 +51,13 @@ jobmanager.rpc.port: 6123 jobmanager.memory.process.size: 1600m +# The host interface the TaskManager will bind to. My default, this is localhost, and will prevent +# the TaskManager from communicating outside the machine/container it is running on. +# +# To enable this, set the bind-host address to one that has access to an outside facing network +# interface, such as 0.0.0.0. + +taskmanager.bind-host: localhost # The total process memory size for the TaskManager. # diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java index c0e7b41..bcd18cb 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java @@ -165,6 +165,9 @@ public class FlinkContainersBuilder { CHECKPOINT_PATH.toAbsolutePath().toUri().toString()); this.conf.set(RestOptions.BIND_ADDRESS, "0.0.0.0"); +this.conf.set(JobManagerOptions.BIND_HOST, "0.0.0.0"); +this.conf.set(TaskManagerOptions.BIND_HOST, "0.0.0.0"); + // Create temporary directory for building Flink image final Path imageBuildingTempDir; try { diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java index 61587cb..e1edc10 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java @@ -21,7 +21,9 @@ package org.apache.flink.kubernetes.kubeclient.decorators; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptionsInternal; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.kubeclient.FlinkPod; import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters; @@ -158,6 +160,8 @@ public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator { clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE); clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR); clusterSideConfig.removeConfig(RestOptions.BIND_ADDRESS); +clusterSideConfig.removeConfig(JobManagerOptions.BIND_HOST); +clusterSideConfig.removeConfig(TaskManagerOptions.BIND_HOST); return clusterSideConfig.toMap(); } diff --git
[flink-statefun] 01/01: [FLINK-26153] Add update_playground_links.sh to update playground links
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git commit cd8e0237f63bb84aae4137ae74481e5291944208 Author: Till Rohrmann AuthorDate: Tue Feb 15 11:29:22 2022 +0100 [FLINK-26153] Add update_playground_links.sh to update playground links This commit adds a new update_playground_links.sh script that updates links to Statefun's playground in various places. This closes #302. --- tools/releasing/update_playground_links.sh | 58 ++ 1 file changed, 58 insertions(+) diff --git a/tools/releasing/update_playground_links.sh b/tools/releasing/update_playground_links.sh new file mode 100755 index 000..c9b47c9 --- /dev/null +++ b/tools/releasing/update_playground_links.sh @@ -0,0 +1,58 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +## +## Required variables +## +PREVIOUS_RELEASE_VERSION=${PREVIOUS_RELEASE_VERSION} +CURRENT_RELEASE_VERSION=${CURRENT_RELEASE_VERSION} + +if [ -z "${PREVIOUS_RELEASE_VERSION}" ]; then +echo "PREVIOUS_RELEASE_VERSION was not set." +exit 1 +fi + +if [ -z "${CURRENT_RELEASE_VERSION}" ]; then +echo "CURRENT_RELEASE_VERSION was not set." +exit 1 +fi + +# fail immediately +set -o errexit +set -o nounset + +CURR_DIR=`pwd` +BASE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )" +PROJECT_ROOT="${BASE_DIR}/../../" + +# Sanity check to ensure that resolved paths are valid; a LICENSE file should always exist in project root +if [ ! -f ${PROJECT_ROOT}/LICENSE ]; then +echo "Project root path ${PROJECT_ROOT} is not valid; script may be in the wrong directory." +exit 1 +fi + +### + +cd ${PROJECT_ROOT} + +# change version strings in README +perl -pi -e "s#https://github.com/apache/flink-statefun-playground/tree/release-$PREVIOUS_RELEASE_VERSION#https://github.com/apache/flink-statefun-playground/tree/release-$CURRENT_RELEASE_VERSION#g; README.md statefun-sdk-python/README.md statefun-sdk-js/README.md +perl -pi -e "s#git clone -b release-$PREVIOUS_RELEASE_VERSION https://github.com/apache/flink-statefun-playground.git#git clone -b release-$CURRENT_RELEASE_VERSION https://github.com/apache/flink-statefun-playground.git#g; README.md + +cd ${CURR_DIR} \ No newline at end of file
[flink-statefun] branch master updated (5fc6e00 -> cd8e023)
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git. from 5fc6e00 [hotfix] Update README.md links to point to release-3.2 playground version add 9183d63 [hotfix] Fix typo in update_branch_version.sh new cd8e023 [FLINK-26153] Add update_playground_links.sh to update playground links The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: tools/releasing/update_branch_version.sh | 2 +- ...elease_branch.sh => update_playground_links.sh} | 32 ++ 2 files changed, 15 insertions(+), 19 deletions(-) copy tools/releasing/{create_release_branch.sh => update_playground_links.sh} (58%)
[flink] branch master updated (20a1b88 -> be5a6b2)
This is an automated email from the ASF dual-hosted git repository. fpaul pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 20a1b88 [FLINK-26089][table-api-java] Introduce TablePipeline add 8db43d7 [FLINK-25701][kafka] Add Flink API annotation to all connector classes based on the new Source/Sink design add be5a6b2 [FLINK-25701][kafka] reduce architectural violation No new revisions were added by this update. Summary of changes: .../archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8 | 3 +-- .../kafka/sink/KafkaRecordSerializationSchemaBuilder.java | 2 ++ .../java/org/apache/flink/connector/kafka/source/KafkaSource.java | 8 .../apache/flink/connector/kafka/source/KafkaSourceBuilder.java | 2 ++ .../apache/flink/connector/kafka/source/KafkaSourceOptions.java | 2 ++ .../connector/kafka/source/enumerator/KafkaSourceEnumState.java | 3 +++ .../kafka/source/enumerator/KafkaSourceEnumStateSerializer.java | 2 ++ .../kafka/source/enumerator/subscriber/KafkaSubscriber.java | 3 +++ .../connector/kafka/source/metrics/KafkaSourceReaderMetrics.java | 2 ++ .../connector/kafka/source/reader/KafkaPartitionSplitReader.java | 2 ++ .../flink/connector/kafka/source/reader/KafkaRecordEmitter.java | 2 ++ .../flink/connector/kafka/source/reader/KafkaSourceReader.java| 2 ++ .../reader/deserializer/KafkaRecordDeserializationSchema.java | 3 +-- .../kafka/source/reader/fetcher/KafkaSourceFetcherManager.java| 2 ++ .../flink/connector/kafka/source/split/KafkaPartitionSplit.java | 2 ++ .../kafka/source/split/KafkaPartitionSplitSerializer.java | 2 ++ .../connector/kafka/source/split/KafkaPartitionSplitState.java| 3 +++ 17 files changed, 41 insertions(+), 4 deletions(-)
[flink] branch master updated (85bc4e3 -> 20a1b88)
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 85bc4e3 [FLINK-26119][connectors/common] Switching AsyncSinkWriterStateSerializer from @Internal to @PublicEvolving add 3868687 [hotfix][table-api-java] Extract explain, execute and compilePlan API capabilities into specific interfaces add 20a1b88 [FLINK-26089][table-api-java] Introduce TablePipeline No new revisions were added by this update. Summary of changes: .../pyflink/table/tests/test_table_completeness.py | 3 +- .../table/api/bridge/java/StreamStatementSet.java | 8 ++ .../java/internal/StreamStatementSetImpl.java | 20 +++- .../org/apache/flink/table/api/Compilable.java | 28 +++-- .../org/apache/flink/table/api/CompiledPlan.java | 16 +-- .../table/api/{ResultKind.java => Executable.java} | 23 ++-- .../api/{PlannerConfig.java => Explainable.java} | 30 +++-- .../org/apache/flink/table/api/StatementSet.java | 133 - .../java/org/apache/flink/table/api/Table.java | 107 ++--- .../api/{PlannerConfig.java => TablePipeline.java} | 24 ++-- .../flink/table/api/internal/StatementSetImpl.java | 65 +++--- .../apache/flink/table/api/internal/TableImpl.java | 29 ++--- .../table/api/internal/TablePipelineImpl.java | 86 + .../api/bridge/scala/StreamStatementSet.scala | 6 +- .../scala/internal/StreamStatementSetImpl.scala| 11 +- .../apache/flink/table/api/CompiledPlanITCase.java | 15 ++- .../table/planner/plan/hint/OptionsHintTest.scala | 5 +- .../runtime/batch/sql/join/JoinITCase.scala| 6 +- .../flink/table/planner/utils/TableTestBase.scala | 72 +-- 19 files changed, 323 insertions(+), 364 deletions(-) copy flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/ExternalSystemDataReader.java => flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Compilable.java (52%) copy flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/{ResultKind.java => Executable.java} (65%) copy flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/{PlannerConfig.java => Explainable.java} (54%) copy flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/{PlannerConfig.java => TablePipeline.java} (65%) create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TablePipelineImpl.java
[GitHub] [flink-kubernetes-operator] tisonkun merged pull request #2: Fix typo in README.md
tisonkun merged pull request #2: URL: https://github.com/apache/flink-kubernetes-operator/pull/2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[flink-kubernetes-operator] branch main updated: Fix typo in README.md (#2)
This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new b59000e Fix typo in README.md (#2) b59000e is described below commit b59000ed59c9c4cc6e3bca005cf6f9d5e55abe55 Author: Biao Geng <80749729+bgeng...@users.noreply.github.com> AuthorDate: Wed Feb 16 20:40:30 2022 +0800 Fix typo in README.md (#2) --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 8871f98..23ce381 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ kubectl create -f examples/basic.yaml ### Delete a Flink deployment ``` -kubectl delete -f create/basic.yaml +kubectl delete -f examples/basic.yaml OR
[GitHub] [flink-kubernetes-operator] bgeng777 opened a new pull request #2: Fix typo in README.md
bgeng777 opened a new pull request #2: URL: https://github.com/apache/flink-kubernetes-operator/pull/2 Quick fix for deleting the example in README -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[flink] branch master updated (894f413 -> 85bc4e3)
This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 894f413 [FLINK-24745][format][json] Improve the metadata tests and documentation of OGG json format add 85bc4e3 [FLINK-26119][connectors/common] Switching AsyncSinkWriterStateSerializer from @Internal to @PublicEvolving No new revisions were added by this update. Summary of changes: .../connector/base/sink/writer/AsyncSinkWriterStateSerializer.java| 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-)
[flink] branch master updated (2c792d0 -> 894f413)
This is an automated email from the ASF dual-hosted git repository. leonard pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 2c792d0 [FLINK-25532] Improve Documentation of Flink Docker-Compose (#18725) add 22e02b2 [FLINK-24745][format][json] Support Oracle OGG json format add 894f413 [FLINK-24745][format][json] Improve the metadata tests and documentation of OGG json format No new revisions were added by this update. Summary of changes: .../docs/connectors/table/formats/ogg.md | 256 +++ .../docs/connectors/table/formats/orc.md | 2 +- .../docs/connectors/table/formats/overview.md | 5 + .../docs/connectors/table/formats/parquet.md | 2 +- .../docs/connectors/table/formats/raw.md | 2 +- docs/content/docs/connectors/table/formats/ogg.md | 270 docs/content/docs/connectors/table/formats/orc.md | 2 +- .../docs/connectors/table/formats/overview.md | 5 + .../docs/connectors/table/formats/parquet.md | 4 +- docs/content/docs/connectors/table/formats/raw.md | 2 +- docs/data/sql_connectors.yml | 6 + .../formats/json/ogg/OggJsonDecodingFormat.java| 207 .../json/ogg/OggJsonDeserializationSchema.java | 273 + .../formats/json/ogg/OggJsonFormatFactory.java | 148 +++ .../formats/json/ogg/OggJsonFormatOptions.java | 41 .../json/ogg/OggJsonSerializationSchema.java | 129 ++ .../org.apache.flink.table.factories.Factory | 1 + .../formats/json/ogg/OggJsonFileSystemITCase.java | 157 .../formats/json/ogg/OggJsonFormatFactoryTest.java | 143 +++ .../formats/json/ogg/OggJsonSerDeSchemaTest.java | 262 .../flink-json/src/test/resources/ogg-data.txt | 16 ++ 21 files changed, 1926 insertions(+), 7 deletions(-) create mode 100644 docs/content.zh/docs/connectors/table/formats/ogg.md create mode 100644 docs/content/docs/connectors/table/formats/ogg.md create mode 100644 flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDecodingFormat.java create mode 100644 flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java create mode 100644 flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java create mode 100644 flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatOptions.java create mode 100644 flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java create mode 100644 flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFileSystemITCase.java create mode 100644 flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java create mode 100644 flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java create mode 100644 flink-formats/flink-json/src/test/resources/ogg-data.txt
[flink-kubernetes-operator] 17/23: Move CRD to flink.apache.org group
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit da6ae36c94f034224e35a780ea8c04c5a5688ede Author: Marton Balassi AuthorDate: Thu Feb 10 11:57:27 2022 +0100 Move CRD to flink.apache.org group --- .gitignore | 2 +- examples/basic-checkpoint-ha.yaml | 2 +- examples/basic-ingress.yaml| 2 +- examples/basic-session.yaml| 2 +- examples/basic.yaml| 2 +- examples/pod-template.yaml | 2 +- flink-kubernetes-operator/pom.xml | 25 ++ .../kubernetes/operator/crd/FlinkDeployment.java | 2 +- helm/flink-operator/templates/rbac.yaml| 2 +- helm/flink-operator/templates/webhook.yaml | 2 +- pom.xml| 279 ++--- 11 files changed, 161 insertions(+), 161 deletions(-) diff --git a/.gitignore b/.gitignore index 7bc2df1..83fe873 100644 --- a/.gitignore +++ b/.gitignore @@ -36,4 +36,4 @@ buildNumber.properties .idea *.iml -helm/flink-operator/templates/flinkdeployments.flink.io-v1.yml +helm/flink-operator/templates/flinkdeployments.flink.apache.org-v1.yml diff --git a/examples/basic-checkpoint-ha.yaml b/examples/basic-checkpoint-ha.yaml index 910376f..57f321b 100644 --- a/examples/basic-checkpoint-ha.yaml +++ b/examples/basic-checkpoint-ha.yaml @@ -16,7 +16,7 @@ # limitations under the License. -apiVersion: flink.io/v1alpha1 +apiVersion: flink.apache.org/v1alpha1 kind: FlinkDeployment metadata: namespace: default diff --git a/examples/basic-ingress.yaml b/examples/basic-ingress.yaml index cb9d656..c0ad3a2 100644 --- a/examples/basic-ingress.yaml +++ b/examples/basic-ingress.yaml @@ -16,7 +16,7 @@ # limitations under the License. -apiVersion: flink.io/v1alpha1 +apiVersion: flink.apache.org/v1alpha1 kind: FlinkDeployment metadata: namespace: default diff --git a/examples/basic-session.yaml b/examples/basic-session.yaml index 6db65e5..0dea5d5 100644 --- a/examples/basic-session.yaml +++ b/examples/basic-session.yaml @@ -16,7 +16,7 @@ # limitations under the License. -apiVersion: flink.io/v1alpha1 +apiVersion: flink.apache.org/v1alpha1 kind: FlinkDeployment metadata: namespace: default diff --git a/examples/basic.yaml b/examples/basic.yaml index 9027db7..affe78a 100644 --- a/examples/basic.yaml +++ b/examples/basic.yaml @@ -16,7 +16,7 @@ # limitations under the License. -apiVersion: flink.io/v1alpha1 +apiVersion: flink.apache.org/v1alpha1 kind: FlinkDeployment metadata: namespace: default diff --git a/examples/pod-template.yaml b/examples/pod-template.yaml index 386813c..e234f38 100644 --- a/examples/pod-template.yaml +++ b/examples/pod-template.yaml @@ -16,7 +16,7 @@ # limitations under the License. -apiVersion: flink.io/v1alpha1 +apiVersion: flink.apache.org/v1alpha1 kind: FlinkDeployment metadata: namespace: default diff --git a/flink-kubernetes-operator/pom.xml b/flink-kubernetes-operator/pom.xml index 23516c2..2779349 100644 --- a/flink-kubernetes-operator/pom.xml +++ b/flink-kubernetes-operator/pom.xml @@ -94,6 +94,31 @@ under the License. +maven-resources-plugin +${maven-resources-plugin.version} + + +copy-resources +process-classes + +copy-resources + + + ${project.basedir}/../helm/flink-operator/templates + + + ${project.build.outputDirectory}/META-INF/fabric8 + + flinkdeployments.flink.apache.org-v1.yml + +false + + + + + + + org.apache.maven.plugins maven-shade-plugin diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
[flink-kubernetes-operator] 13/23: Introduce FlinkService for cluster interactions
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit 5ea10e385ed3a9b16e60cef3d012f4c8ee5a008c Author: Gyula Fora AuthorDate: Mon Feb 7 14:43:04 2022 +0100 Introduce FlinkService for cluster interactions * Extract FlinkService for cluster interactions * Update some dependencies --- pom.xml| 8 +- .../flink/kubernetes/operator/FlinkOperator.java | 20 ++- .../controller/FlinkDeploymentController.java | 22 +-- .../observer/JobStatusObserver.java| 17 ++- .../{controller => }/reconciler/JobReconciler.java | 64 ++--- .../reconciler/SessionReconciler.java | 34 ++--- .../kubernetes/operator/service/FlinkService.java | 155 + .../kubernetes/operator/utils/FlinkUtils.java | 41 -- 8 files changed, 222 insertions(+), 139 deletions(-) diff --git a/pom.xml b/pom.xml index ee19b02..4838678 100644 --- a/pom.xml +++ b/pom.xml @@ -32,14 +32,14 @@ under the License. 3.0.0-M4 2.0.1 -5.11.2 -1.18.10 +5.12.1 +1.18.22 2.12 1.14.3 1.7.15 -2.17.0 +2.17.1 2.4.2 4.1.0 @@ -311,7 +311,7 @@ under the License. **/target/** apache-maven-3.2.5/** - **/.idea/** + **/.idea/** diff --git a/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index a930d09..3a52f82 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -18,9 +18,12 @@ package org.apache.flink.kubernetes.operator; import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController; +import org.apache.flink.kubernetes.operator.observer.JobStatusObserver; +import org.apache.flink.kubernetes.operator.reconciler.JobReconciler; +import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler; +import org.apache.flink.kubernetes.operator.service.FlinkService; import io.fabric8.kubernetes.client.DefaultKubernetesClient; -import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.Operator; import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider; import io.javaoperatorsdk.operator.config.runtime.DefaultConfigurationService; @@ -41,7 +44,7 @@ public class FlinkOperator { LOG.info("Starting Flink Kubernetes Operator"); -KubernetesClient client = new DefaultKubernetesClient(); +DefaultKubernetesClient client = new DefaultKubernetesClient(); String namespace = client.getNamespace(); if (namespace == null) { namespace = "default"; @@ -51,7 +54,18 @@ public class FlinkOperator { client, new ConfigurationServiceOverrider(DefaultConfigurationService.instance()) .build()); -operator.register(new FlinkDeploymentController(client, namespace)); + +FlinkService flinkService = new FlinkService(client); + +JobStatusObserver observer = new JobStatusObserver(flinkService); +JobReconciler jobReconciler = new JobReconciler(client, flinkService); +SessionReconciler sessionReconciler = new SessionReconciler(client, flinkService); + +FlinkDeploymentController controller = +new FlinkDeploymentController( +client, namespace, observer, jobReconciler, sessionReconciler); + +operator.register(controller); operator.installShutdownHook(); operator.start(); diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index 94d5b4d..5ab08e5 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -18,10 +18,10 @@ package org.apache.flink.kubernetes.operator.controller; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.controller.observer.JobStatusObserver; -import org.apache.flink.kubernetes.operator.controller.reconciler.JobReconciler; -import
[flink-kubernetes-operator] 10/23: Adding basic integration test
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit bfdd412976f21380a9a1f4e443ed2baf7ad7a5e7 Author: Matyas Orhidi AuthorDate: Fri Feb 4 13:41:47 2022 +0100 Adding basic integration test IT tests are disabled by default. To enable them use: mvn clean install -Dit.skip=false --- deploy/rbac.yaml | 7 +- pom.xml| 29 .../controller/reconciler/SessionReconciler.java | 2 +- .../kubernetes/operator/FlinkOperatorITCase.java | 164 + 4 files changed, 197 insertions(+), 5 deletions(-) diff --git a/deploy/rbac.yaml b/deploy/rbac.yaml index 52d0399..b273b8b 100644 --- a/deploy/rbac.yaml +++ b/deploy/rbac.yaml @@ -22,12 +22,12 @@ apiVersion: v1 kind: ServiceAccount metadata: name: flink-operator + namespace: default --- -apiVersion: v1 -kind: ClusterRole apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole metadata: name: flink-operator rules: @@ -80,9 +80,8 @@ rules: --- -apiVersion: v1 -kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding metadata: name: flink-operator-cluster-role-binding subjects: diff --git a/pom.xml b/pom.xml index 5b3caa3..ee19b02 100644 --- a/pom.xml +++ b/pom.xml @@ -29,6 +29,7 @@ under the License. 1.8 3.3.0 3.0.0-M4 +3.0.0-M4 2.0.1 5.11.2 @@ -41,6 +42,8 @@ under the License. 2.17.0 2.4.2 +4.1.0 +true @@ -71,6 +74,13 @@ under the License. +org.awaitility +awaitility +${awaitility.version} +test + + + org.projectlombok lombok ${lombok.version} @@ -163,6 +173,25 @@ under the License. +org.apache.maven.plugins +maven-failsafe-plugin +${maven-failsafe-plugin.version} + +${it.skip} + +**/*ITCase.* + + + + + +integration-test + + + + + + org.apache.maven.plugins maven-checkstyle-plugin 2.17 diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java index 32e36fe..04a0568 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java @@ -54,7 +54,7 @@ public class SessionReconciler { KubernetesUtils.deployIngress(flinkApp, effectiveConfig, kubernetesClient); return true; } catch (Exception e) { -LOG.error("Error while deploying " + flinkApp.getMetadata().getName()); +LOG.error("Error while deploying " + flinkApp.getMetadata().getName(), e); return false; } } diff --git a/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java b/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java new file mode 100644 index 000..a2f04a2 --- /dev/null +++ b/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator; + +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; +import
[flink-kubernetes-operator] 05/23: Extract Observer and Reconciler logic from controller
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit 0fe31c540d9181d4edab61650f7c068f94169166 Author: Gyula Fora AuthorDate: Wed Feb 2 14:34:01 2022 +0100 Extract Observer and Reconciler logic from controller --- examples/basic-checkpoint-ha.yaml | 43 +++ examples/basic.yaml| 4 +- examples/pod-template.yaml | 4 +- .../controller/FlinkDeploymentController.java | 321 ++--- .../controller/observer/JobStatusObserver.java | 98 +++ .../controller/reconciler/JobReconciler.java | 184 .../controller/reconciler/SessionReconciler.java | 75 + .../kubernetes/operator/utils/FlinkUtils.java | 55 +++- 8 files changed, 476 insertions(+), 308 deletions(-) diff --git a/examples/basic-checkpoint-ha.yaml b/examples/basic-checkpoint-ha.yaml new file mode 100644 index 000..bb3cf41 --- /dev/null +++ b/examples/basic-checkpoint-ha.yaml @@ -0,0 +1,43 @@ +apiVersion: flink.io/v1alpha1 +kind: FlinkDeployment +metadata: + namespace: default + name: basic-checkpoint-ha-example +spec: + image: flink:1.14.3 + flinkVersion: 1.14.3 + flinkConfiguration: +taskmanager.numberOfTaskSlots: "2" +state.savepoints.dir: file:///flink-data/savepoints +high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory +high-availability.storageDir: file:///flink-data/ha + jobManager: +replicas: 1 +resource: + memory: "2048m" + cpu: 1 + taskManager: +taskSlots: 2 +resource: + memory: "2048m" + cpu: 1 + podTemplate: +spec: + serviceAccount: flink-operator + containers: +- name: flink-main-container + volumeMounts: + - mountPath: /flink-data +name: flink-volume + volumes: + - name: flink-volume +hostPath: + # directory location on host + path: /tmp/flink + # this field is optional + type: Directory + job: +jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar +parallelism: 2 +upgradeMode: savepoint +state: running diff --git a/examples/basic.yaml b/examples/basic.yaml index 9fdc289..38a5a90 100644 --- a/examples/basic.yaml +++ b/examples/basic.yaml @@ -9,7 +9,6 @@ spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" kubernetes.jobmanager.service-account: flink-operator -kubernetes.container-start-command-template: "%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%" jobManager: replicas: 1 resource: @@ -23,5 +22,4 @@ spec: job: jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2 -cancelMode: none -restoreMode: none +upgradeMode: stateless diff --git a/examples/pod-template.yaml b/examples/pod-template.yaml index 1ce72cd..8563630 100644 --- a/examples/pod-template.yaml +++ b/examples/pod-template.yaml @@ -8,13 +8,13 @@ spec: flinkVersion: 1.14.3 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" -kubernetes.jobmanager.service-account: flink-operator podTemplate: apiVersion: v1 kind: Pod metadata: name: pod-template spec: + serviceAccount: flink-operator containers: # Do not change the main container name - name: flink-main-container @@ -55,5 +55,3 @@ spec: job: jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2 -cancelMode: none -restoreMode: none diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index 9a64f04..9cee674 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -1,28 +1,13 @@ package org.apache.flink.kubernetes.operator.controller; -import org.apache.flink.api.common.JobID; -import org.apache.flink.client.cli.ApplicationDeployer; -import org.apache.flink.client.deployment.ClusterClientFactory; -import org.apache.flink.client.deployment.ClusterClientServiceLoader; -import org.apache.flink.client.deployment.ClusterDescriptor; -import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; -import org.apache.flink.client.deployment.application.ApplicationConfiguration; -import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer; -import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; -import
[flink-kubernetes-operator] 22/23: Rework tests to avoid using mockito
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit ce0ba97c5bb0b49f0ab52dfde72e4ad248c3487b Author: Gyula Fora AuthorDate: Tue Feb 15 14:33:30 2022 +0100 Rework tests to avoid using mockito --- .../kubernetes/operator/TestingFlinkService.java | 100 + .../operator/observer/JobStatusObserverTest.java | 56 .../operator/reconciler/JobReconcilerTest.java | 61 + pom.xml| 10 --- 4 files changed, 141 insertions(+), 86 deletions(-) diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java new file mode 100644 index 000..9a915bc --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; +import org.apache.flink.kubernetes.operator.service.FlinkService; +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** Flink service mock for tests. */ +public class TestingFlinkService extends FlinkService { + +public static final String SAVEPOINT = "savepoint"; + +private List> jobs = new ArrayList<>(); +private Set sessions = new HashSet<>(); + +public TestingFlinkService() { +super(null); +} + +public void clear() { +jobs.clear(); +sessions.clear(); +} + +@Override +public void submitApplicationCluster(FlinkDeployment deployment, Configuration conf) { +JobID jobID = new JobID(); +JobStatusMessage jobStatusMessage = +new JobStatusMessage( +jobID, +deployment.getMetadata().getName(), +JobStatus.RUNNING, +System.currentTimeMillis()); + +jobs.add(Tuple2.of(conf.get(SavepointConfigOptions.SAVEPOINT_PATH), jobStatusMessage)); +} + +@Override +public void submitSessionCluster(FlinkDeployment deployment, Configuration conf) { +sessions.add(deployment.getMetadata().getName()); +} + +@Override +public List listJobs(Configuration conf) { +return jobs.stream().map(t -> t.f1).collect(Collectors.toList()); +} + +public List> listJobs() { +return new ArrayList<>(jobs); +} + +@Override +public Optional cancelJob(JobID jobID, UpgradeMode upgradeMode, Configuration conf) +throws Exception { + +if (!jobs.removeIf(js -> js.f1.getJobId().equals(jobID))) { +throw new Exception("Job not found"); +} + +if (upgradeMode != UpgradeMode.STATELESS) { +return Optional.of(SAVEPOINT); +} else { +return Optional.empty(); +} +} + +@Override +public void stopSessionCluster(FlinkDeployment deployment, Configuration conf) { +sessions.remove(deployment.getMetadata().getName()); +} +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java index 37da2f9..e2a1dfd 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java +++
[GitHub] [flink-kubernetes-operator] asfgit closed pull request #1: [FLINK-26078] Kubernetes Operator Prototype
asfgit closed pull request #1: URL: https://github.com/apache/flink-kubernetes-operator/pull/1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[flink-kubernetes-operator] 08/23: Support initialSavepointPath + do not trigger upgrade on certain spec changes
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit 8054752dd0662dc3529819dbce62089f1a30063a Author: Gyula Fora AuthorDate: Wed Feb 2 17:38:10 2022 +0100 Support initialSavepointPath + do not trigger upgrade on certain spec changes --- .../controller/observer/JobStatusObserver.java | 53 -- .../controller/reconciler/JobReconciler.java | 33 +++--- .../kubernetes/operator/crd/spec/JobSpec.java | 6 ++- .../operator/crd/status/FlinkDeploymentStatus.java | 4 +- .../kubernetes/operator/utils/FlinkUtils.java | 10 5 files changed, 51 insertions(+), 55 deletions(-) diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserver.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserver.java index 9b87c59..e1e6504 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserver.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserver.java @@ -32,10 +32,9 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; /** Observes the actual state of the running jobs on the Flink cluster. */ public class JobStatusObserver { @@ -65,12 +64,14 @@ public class JobStatusObserver { FlinkUtils.getRestClusterClient(effectiveConfig)) { Collection clusterJobStatuses = clusterClient.listJobs().get(10, TimeUnit.SECONDS); -flinkAppStatus.setJobStatuses( -mergeJobStatuses(flinkAppStatus.getJobStatuses(), clusterJobStatuses)); if (clusterJobStatuses.isEmpty()) { LOG.info("No jobs found on {} yet, retrying...", flinkApp.getMetadata().getName()); return false; } else { +flinkAppStatus.setJobStatus( +mergeJobStatus( +flinkAppStatus.getJobStatus(), +new ArrayList<>(clusterJobStatuses))); LOG.info("Job statuses updated for {}", flinkApp.getMetadata().getName()); return true; } @@ -88,28 +89,30 @@ public class JobStatusObserver { } } -/** - * Merge previous job statuses with the new ones from the flink job cluster. We match jobs by - * their name to preserve savepoint information. - */ -private List mergeJobStatuses( -List oldJobStatuses, Collection clusterJobStatuses) { -List newStatuses = -oldJobStatuses != null ? new ArrayList<>(oldJobStatuses) : new ArrayList<>(); -Map statusMap = - newStatuses.stream().collect(Collectors.toMap(JobStatus::getJobName, j -> j)); +/** Merge previous job status with the new one from the flink job cluster. */ +private JobStatus mergeJobStatus( +JobStatus oldStatus, List clusterJobStatuses) { +JobStatus newStatus = oldStatus; +Collections.sort( +clusterJobStatuses, +(j1, j2) -> -1 * Long.compare(j1.getStartTime(), j1.getStartTime())); +JobStatusMessage newJob = clusterJobStatuses.get(0); -clusterJobStatuses.forEach( -js -> { -if (statusMap.containsKey(js.getJobName())) { -JobStatus oldStatus = statusMap.get(js.getJobName()); -oldStatus.setState(js.getJobState().name()); -oldStatus.setJobId(js.getJobId().toHexString()); -} else { -newStatuses.add(FlinkUtils.convert(js)); -} -}); +if (newStatus == null) { +newStatus = createJobStatus(newJob); +} else { +newStatus.setState(newJob.getJobState().name()); +newStatus.setJobName(newJob.getJobName()); +newStatus.setJobId(newJob.getJobId().toHexString()); +} +return newStatus; +} -return newStatuses; +public static JobStatus createJobStatus(JobStatusMessage message) { +JobStatus jobStatus = new JobStatus(); +jobStatus.setJobId(message.getJobId().toHexString()); +jobStatus.setJobName(message.getJobName()); +jobStatus.setState(message.getJobState().name()); +return jobStatus; } } diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java index aa86a00..2ae431d 100644 ---
[flink-kubernetes-operator] 16/23: Make project modular + validating webhook prototype
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit 45547d079b0f692cfe2c09bf7d1bbc5b664a5b29 Author: Gyula Fora AuthorDate: Thu Feb 10 07:13:15 2022 +0100 Make project modular + validating webhook prototype --- .gitignore | 1 - Dockerfile | 26 +++- README.md | 10 +- .../values.yaml => docker-entrypoint.sh| 39 +++--- flink-kubernetes-operator/pom.xml | 132 ++ .../flink/kubernetes/operator/FlinkOperator.java | 0 .../controller/FlinkDeploymentController.java | 0 .../kubernetes/operator/crd/FlinkDeployment.java | 0 .../operator/crd/FlinkDeploymentList.java | 0 .../kubernetes/operator/crd/spec/CancelMode.java | 0 .../operator/crd/spec/FlinkDeploymentSpec.java | 0 .../operator/crd/spec/JobManagerSpec.java | 0 .../kubernetes/operator/crd/spec/JobSpec.java | 0 .../kubernetes/operator/crd/spec/JobState.java | 0 .../kubernetes/operator/crd/spec/Resource.java | 0 .../operator/crd/spec/TaskManagerSpec.java | 0 .../kubernetes/operator/crd/spec/UpgradeMode.java | 0 .../operator/crd/status/FlinkDeploymentStatus.java | 0 .../kubernetes/operator/crd/status/JobStatus.java | 0 .../operator/observer/JobStatusObserver.java | 0 .../operator/reconciler/JobReconciler.java | 2 +- .../operator/reconciler/SessionReconciler.java | 0 .../kubernetes/operator/service/FlinkService.java | 0 .../kubernetes/operator/utils/FlinkUtils.java | 0 .../kubernetes/operator/utils/KubernetesUtils.java | 0 .../src}/main/resources/log4j2.properties | 2 +- .../kubernetes/operator/FlinkOperatorITCase.java | 0 .../controller/observer/JobStatusObserverTest.java | 2 +- .../kubernetes/operator/utils/FlinkUtilsTest.java | 0 flink-kubernetes-webhook/pom.xml | 95 + .../operator/admission/AdmissionHandler.java | 124 + .../admission/FlinkDeploymentValidator.java| 53 +++ .../operator/admission/FlinkOperatorWebhook.java | 152 + .../admissioncontroller/AdmissionController.java | 53 +++ .../AdmissionControllerException.java | 44 ++ .../admissioncontroller/AdmissionUtils.java| 66 + .../admissioncontroller/NotAllowedException.java | 90 .../admission/admissioncontroller/Operation.java | 13 +- .../admissioncontroller/RequestHandler.java| 15 +- .../admissioncontroller/clone/Cloner.java | 18 +-- .../clone/ObjectMapperCloner.java | 40 +++--- .../mutation/DefaultRequestMutator.java| 63 + .../admissioncontroller/mutation/Mutator.java | 20 ++- .../validation/DefaultRequestValidator.java| 59 .../admissioncontroller/validation/Validator.java | 20 ++- .../src}/main/resources/log4j2.properties | 0 helm/flink-operator/templates/flink-operator.yaml | 37 + helm/flink-operator/templates/webhook.yaml | 103 ++ helm/flink-operator/values.yaml| 9 +- pom.xml| 117 +--- 50 files changed, 1225 insertions(+), 180 deletions(-) diff --git a/.gitignore b/.gitignore index c99feeb..7bc2df1 100644 --- a/.gitignore +++ b/.gitignore @@ -37,4 +37,3 @@ buildNumber.properties *.iml helm/flink-operator/templates/flinkdeployments.flink.io-v1.yml - diff --git a/Dockerfile b/Dockerfile index 4df54f9..5209db5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,16 +19,32 @@ FROM maven:3.8.4-openjdk-11 AS build WORKDIR /app +ENV OPERATOR_DIR=flink-kubernetes-operator +ENV WEBHOOK_DIR=flink-kubernetes-webhook + +RUN mkdir $OPERATOR_DIR $WEBHOOK_DIR + COPY pom.xml . -RUN --mount=type=cache,target=/root/.m2 mvn dependency:resolve +COPY $WEBHOOK_DIR/pom.xml ./$WEBHOOK_DIR/ +COPY $OPERATOR_DIR/pom.xml ./$OPERATOR_DIR/ + +COPY $OPERATOR_DIR/src ./$OPERATOR_DIR/src +COPY $WEBHOOK_DIR/src ./$WEBHOOK_DIR/src -COPY src ./src COPY tools ./tools -RUN --mount=type=cache,target=/root/.m2 mvn -f ./pom.xml clean install + +RUN --mount=type=cache,target=/root/.m2 mvn clean install # stage FROM openjdk:11-jre -COPY --from=build /app/target/flink-operator-1.0-SNAPSHOT.jar / +ENV OPERATOR_VERSION=1.0-SNAPSHOT +ENV OPERATOR_JAR=flink-kubernetes-operator-$OPERATOR_VERSION-shaded.jar +ENV WEBHOOK_JAR=flink-kubernetes-webhook-$OPERATOR_VERSION-shaded.jar + +COPY --from=build /app/flink-kubernetes-operator/target/$OPERATOR_JAR / +COPY --from=build /app/flink-kubernetes-webhook/target/$WEBHOOK_JAR / -CMD ["java", "-jar", "/flink-operator-1.0-SNAPSHOT.jar"] +COPY
[flink-kubernetes-operator] 21/23: Minor cleanups and fixes
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit 0b48ce04486475a7485ea2f2fc917db60081c5ee Author: Gyula Fora AuthorDate: Tue Feb 15 08:56:58 2022 +0100 Minor cleanups and fixes --- README.md | 6 +++--- examples/basic-checkpoint-ha.yaml | 1 - examples/basic-ingress.yaml | 1 - examples/basic-session.yaml | 1 - examples/basic.yaml | 1 - examples/pod-template.yaml| 2 -- flink-kubernetes-operator/pom.xml | 7 --- .../org/apache/flink/kubernetes/operator/FlinkOperator.java | 10 +- pom.xml | 11 +-- 9 files changed, 9 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index 6aa1ce7..8871f98 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ The webhook can be disabled during helm install by passing the `--set webhook.cr ## User Guide ### Create a new Flink deployment -The flink-operator will watch the CRD resources and submit a new Flink deployment once the CR it applied. +The flink-operator will watch the CRD resources and submit a new Flink deployment once the CR is applied. ``` kubectl create -f examples/basic.yaml ``` @@ -37,7 +37,7 @@ Get all the Flink deployments running in the K8s cluster ``` kubectl get flinkdep ``` -Describe a specific Flink deployment to show the status(including job status, savepoint, ect.) +Describe a specific Flink deployment to show the status(including job status, savepoint, etc.) ``` kubectl describe flinkdep {dep_name} ``` @@ -61,7 +61,7 @@ NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) basic-session-exampleClusterIP None 6123/TCP,6124/TCP 14h basic-session-example-rest LoadBalancer 10.96.36.250 127.0.0.1 8081:30572/TCP 14h ``` -The operator pics up the default log and flink configurations from `/opt/flink/conf`. You can put the rest configuration parameters here: +The operator picks up the default log and flink configurations from `/opt/flink/conf`. You can put the rest configuration parameters here: ``` cat /opt/flink/conf/flink-conf.yaml rest.port: 8081 diff --git a/examples/basic-checkpoint-ha.yaml b/examples/basic-checkpoint-ha.yaml index 57f321b..d8172b4 100644 --- a/examples/basic-checkpoint-ha.yaml +++ b/examples/basic-checkpoint-ha.yaml @@ -25,7 +25,6 @@ spec: image: flink:1.14.3 flinkVersion: 1.14.3 flinkConfiguration: -taskmanager.numberOfTaskSlots: "2" state.savepoints.dir: file:///flink-data/savepoints high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: file:///flink-data/ha diff --git a/examples/basic-ingress.yaml b/examples/basic-ingress.yaml index d846766..af058ec 100644 --- a/examples/basic-ingress.yaml +++ b/examples/basic-ingress.yaml @@ -28,7 +28,6 @@ spec: flinkConfiguration: #rest.address: basic-example.flink.k8s.io #rest.port: "80" -taskmanager.numberOfTaskSlots: "2" kubernetes.jobmanager.service-account: flink-operator jobManager: replicas: 1 diff --git a/examples/basic-session.yaml b/examples/basic-session.yaml index 0dea5d5..f956100 100644 --- a/examples/basic-session.yaml +++ b/examples/basic-session.yaml @@ -25,7 +25,6 @@ spec: image: flink:1.14.3 flinkVersion: 1.14.3 flinkConfiguration: -taskmanager.numberOfTaskSlots: "2" kubernetes.jobmanager.service-account: flink-operator jobManager: replicas: 1 diff --git a/examples/basic.yaml b/examples/basic.yaml index affe78a..ce1983c 100644 --- a/examples/basic.yaml +++ b/examples/basic.yaml @@ -25,7 +25,6 @@ spec: image: flink:1.14.3 flinkVersion: 1.14.3 flinkConfiguration: -taskmanager.numberOfTaskSlots: "2" kubernetes.jobmanager.service-account: flink-operator jobManager: replicas: 1 diff --git a/examples/pod-template.yaml b/examples/pod-template.yaml index e234f38..4807639 100644 --- a/examples/pod-template.yaml +++ b/examples/pod-template.yaml @@ -24,8 +24,6 @@ metadata: spec: image: flink:1.14.3 flinkVersion: 1.14.3 - flinkConfiguration: -taskmanager.numberOfTaskSlots: "2" podTemplate: apiVersion: v1 kind: Pod diff --git a/flink-kubernetes-operator/pom.xml b/flink-kubernetes-operator/pom.xml index 2779349..45845d2 100644 --- a/flink-kubernetes-operator/pom.xml +++ b/flink-kubernetes-operator/pom.xml @@ -33,7 +33,6 @@ under the License. 4.1.0 -1.19 @@ -56,12 +55,6 @@ under the License. provided - -
[flink-kubernetes-operator] 18/23: Add job reconciler test
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit 5fae3534b66fbc428f982389b9ead6ed137f6db2 Author: Gyula Fora AuthorDate: Thu Feb 10 15:29:35 2022 +0100 Add job reconciler test --- .../flink/kubernetes/operator/TestUtils.java | 90 +++ .../observer/JobStatusObserverTest.java| 54 ++- .../operator/reconciler/JobReconcilerTest.java | 100 + 3 files changed, 195 insertions(+), 49 deletions(-) diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java new file mode 100644 index 000..4ef6860 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator; + +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; +import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec; +import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; +import org.apache.flink.kubernetes.operator.crd.spec.JobState; +import org.apache.flink.kubernetes.operator.crd.spec.Resource; +import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; + +import java.util.Collections; + +/** Testing utilities. */ +public class TestUtils { + +private static final ObjectMapper objectMapper = new ObjectMapper(); + +public static final String TEST_NAMESPACE = "flink-operator-test"; +public static final String SERVICE_ACCOUNT = "flink-operator"; +public static final String FLINK_VERSION = "latest"; +public static final String IMAGE = String.format("flink:%s", FLINK_VERSION); + +public static FlinkDeployment buildSessionCluster() { +FlinkDeployment deployment = new FlinkDeployment(); +deployment.setMetadata( +new ObjectMetaBuilder() +.withName("test-cluster") +.withNamespace(TEST_NAMESPACE) +.build()); +deployment.setSpec( +FlinkDeploymentSpec.builder() +.image(IMAGE) +.flinkVersion(FLINK_VERSION) +.flinkConfiguration( +Collections.singletonMap( + KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT.key(), +SERVICE_ACCOUNT)) +.jobManager(new JobManagerSpec(new Resource(1, "2048m"), 1, null)) +.taskManager(new TaskManagerSpec(new Resource(1, "2048m"), 2, null)) +.build()); +return deployment; +} + +public static FlinkDeployment buildApplicationCluster() { +FlinkDeployment deployment = buildSessionCluster(); +deployment +.getSpec() +.setJob( +JobSpec.builder() +.jarURI("local:///tmp/sample.jar") +.state(JobState.RUNNING) +.build()); +return deployment; +} + +public static T clone(T object) { +if (object == null) { +return null; +} +try { +return (T) +objectMapper.readValue( +objectMapper.writeValueAsString(object), object.getClass()); +} catch (JsonProcessingException e) { +throw new IllegalStateException(e); +} +} +} diff --git
[flink-kubernetes-operator] 23/23: Fix webhook helm chart
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit 5824296b8cd6578b5b0fa34312da69d4141ba0e1 Author: Gyula Fora AuthorDate: Tue Feb 15 15:41:58 2022 +0100 Fix webhook helm chart Closes #1 --- helm/flink-operator/templates/webhook.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/helm/flink-operator/templates/webhook.yaml b/helm/flink-operator/templates/webhook.yaml index 95f9403..8fd8a94 100644 --- a/helm/flink-operator/templates/webhook.yaml +++ b/helm/flink-operator/templates/webhook.yaml @@ -48,8 +48,8 @@ metadata: namespace: {{ .Values.operatorNamespace.name }} spec: dnsNames: - - flink-operator-webhook-service.default.svc - - flink-operator-webhook-service.default.svc.cluster.local + - flink-operator-webhook-service.{{ .Values.operatorNamespace.name }}.svc + - flink-operator-webhook-service.{{ .Values.operatorNamespace.name }}.svc.cluster.local keystores: pkcs12: create: true @@ -80,7 +80,7 @@ apiVersion: admissionregistration.k8s.io/v1 kind: ValidatingWebhookConfiguration metadata: annotations: -cert-manager.io/inject-ca-from: default/flink-operator-serving-cert +cert-manager.io/inject-ca-from: {{ .Values.operatorNamespace.name }}/flink-operator-serving-cert name: flink-operator-validating-webhook-configuration webhooks: - name: vflinkdeployments.flink.apache.org
[flink-kubernetes-operator] 14/23: Adding JobStatusObserverTest
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit a8cb3814189c7dcf2848d4f8dccafd7bc449cf7d Author: Matyas Orhidi AuthorDate: Mon Feb 7 16:42:22 2022 +0100 Adding JobStatusObserverTest --- pom.xml| 9 ++ .../operator/crd/spec/FlinkDeploymentSpec.java | 4 + .../operator/crd/spec/JobManagerSpec.java | 2 + .../kubernetes/operator/crd/spec/JobSpec.java | 4 + .../kubernetes/operator/crd/spec/Resource.java | 2 + .../operator/crd/spec/TaskManagerSpec.java | 4 +- .../operator/crd/status/FlinkDeploymentStatus.java | 2 + .../kubernetes/operator/crd/status/JobStatus.java | 4 + .../controller/observer/JobStatusObserverTest.java | 140 + 9 files changed, 170 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 4838678..709e48d 100644 --- a/pom.xml +++ b/pom.xml @@ -44,6 +44,7 @@ under the License. 2.4.2 4.1.0 true +2.21.0 @@ -81,6 +82,14 @@ under the License. +org.mockito +mockito-core +${mockito.version} +jar +test + + + org.projectlombok lombok ${lombok.version} diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java index cacc430..8abaf5c 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java @@ -18,6 +18,8 @@ package org.apache.flink.kubernetes.operator.crd.spec; import io.fabric8.kubernetes.api.model.Pod; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; @@ -26,6 +28,8 @@ import java.util.Map; /** Spec that describes a Flink application deployment. */ @Data @NoArgsConstructor +@AllArgsConstructor +@Builder public class FlinkDeploymentSpec { private String image; private String imagePullPolicy; diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java index 09404ba..0f81b7e 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java @@ -18,12 +18,14 @@ package org.apache.flink.kubernetes.operator.crd.spec; import io.fabric8.kubernetes.api.model.Pod; +import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** JobManager spec. */ @Data @NoArgsConstructor +@AllArgsConstructor public class JobManagerSpec { private Resource resource; private int replicas; diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java index bf7e0e6..062a5c0 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java @@ -17,6 +17,8 @@ package org.apache.flink.kubernetes.operator.crd.spec; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; @@ -24,6 +26,8 @@ import lombok.NoArgsConstructor; /** Flink job spec. */ @Data @NoArgsConstructor +@AllArgsConstructor +@Builder public class JobSpec { private String jarURI; private int parallelism; diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java index e0e73e1..c86f9a5 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java @@ -17,12 +17,14 @@ package org.apache.flink.kubernetes.operator.crd.spec; +import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** Resource spec. */ @Data @NoArgsConstructor +@AllArgsConstructor public class Resource { private double cpu; // 1024m, 1g diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java index f6bd361..8b25127 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java @@ -18,14 +18,16 @@ package
[flink-kubernetes-operator] 11/23: Support pod template merging
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit 5bf7973aee353501fcd793e49e3e32fc989605ca Author: Thomas Weise AuthorDate: Fri Feb 4 09:05:48 2022 -0800 Support pod template merging --- .gitignore | 2 + .../kubernetes/operator/utils/FlinkUtils.java | 59 +++- .../kubernetes/operator/utils/FlinkUtilsTest.java | 65 ++ 3 files changed, 124 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index ceb6113..95cd8a3 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,5 @@ buildNumber.properties .mvn/timing.properties .idea +*.iml + diff --git a/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java index f53f284..18fd3c1 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java @@ -36,6 +36,10 @@ import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; import org.apache.flink.util.StringUtils; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.NamespacedKubernetesClient; @@ -48,12 +52,14 @@ import java.io.IOException; import java.net.URI; import java.nio.file.Files; import java.util.Collections; +import java.util.Iterator; import java.util.concurrent.Executors; /** Flink Utility methods used by the operator. */ public class FlinkUtils { private static final Logger LOG = LoggerFactory.getLogger(FlinkUtils.class); +private static final ObjectMapper MAPPER = new ObjectMapper(); public static Configuration getEffectiveConfig(FlinkDeployment flinkApp) { String namespace = flinkApp.getMetadata().getNamespace(); @@ -118,7 +124,10 @@ public class FlinkUtils { if (spec.getJobManager().getPodTemplate() != null) { effectiveConfig.set( KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE, - createTempFile(spec.getJobManager().getPodTemplate())); +createTempFile( +mergePodTemplates( +spec.getPodTemplate(), + spec.getJobManager().getPodTemplate(; } } @@ -141,7 +150,10 @@ public class FlinkUtils { if (spec.getTaskManager().getPodTemplate() != null) { effectiveConfig.set( KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE, - createTempFile(spec.getTaskManager().getPodTemplate())); +createTempFile( +mergePodTemplates( +spec.getPodTemplate(), + spec.getTaskManager().getPodTemplate(; } } @@ -169,6 +181,49 @@ public class FlinkUtils { return tmp.getAbsolutePath(); } +public static Pod mergePodTemplates(Pod toPod, Pod fromPod) { +if (fromPod == null) { +return toPod; +} else if (toPod == null) { +return fromPod; +} +JsonNode node1 = MAPPER.valueToTree(toPod); +JsonNode node2 = MAPPER.valueToTree(fromPod); +mergeInto(node1, node2); +try { +return MAPPER.treeToValue(node1, Pod.class); +} catch (Exception ex) { +throw new RuntimeException(ex); +} +} + +private static void mergeInto(JsonNode toNode, JsonNode fromNode) { +Iterator fieldNames = fromNode.fieldNames(); +while (fieldNames.hasNext()) { +String fieldName = fieldNames.next(); +JsonNode toChildNode = toNode.get(fieldName); +JsonNode fromChildNode = fromNode.get(fieldName); + +if (toChildNode != null && toChildNode.isArray() && fromChildNode.isArray()) { +// TODO: does merging arrays even make sense or should it just override? +for (int i = 0; i < fromChildNode.size(); i++) { +JsonNode updatedChildNode = fromChildNode.get(i); +if (toChildNode.size() <= i) { +// append new node +((ArrayNode)
[flink-kubernetes-operator] 15/23: adding helm chart
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit 411afb3cb2e6f9bd27bc2a1fb3e19ad16b84549b Author: Matyas Orhidi AuthorDate: Wed Feb 9 10:53:08 2022 +0100 adding helm chart --- .gitignore | 2 + Dockerfile | 7 +- README.md | 39 --- helm/flink-operator/.helmignore| 23 Dockerfile => helm/flink-operator/Chart.yaml | 22 ++-- helm/flink-operator/templates/_helpers.tpl | 79 ++ .../flink-operator/templates}/flink-operator.yaml | 54 ++ .../flink-operator/templates}/rbac.yaml| 117 ++--- .../flink-operator/templates/serviceaccount.yaml | 29 +++-- Dockerfile => helm/flink-operator/values.yaml | 29 +++-- pom.xml| 30 ++ 11 files changed, 283 insertions(+), 148 deletions(-) diff --git a/.gitignore b/.gitignore index 95cd8a3..c99feeb 100644 --- a/.gitignore +++ b/.gitignore @@ -36,3 +36,5 @@ buildNumber.properties .idea *.iml +helm/flink-operator/templates/flinkdeployments.flink.io-v1.yml + diff --git a/Dockerfile b/Dockerfile index c834e97..4df54f9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,11 +19,12 @@ FROM maven:3.8.4-openjdk-11 AS build WORKDIR /app -COPY src ./src -COPY tools ./tools COPY pom.xml . +RUN --mount=type=cache,target=/root/.m2 mvn dependency:resolve -RUN mvn -f ./pom.xml clean install +COPY src ./src +COPY tools ./tools +RUN --mount=type=cache,target=/root/.m2 mvn -f ./pom.xml clean install # stage FROM openjdk:11-jre diff --git a/README.md b/README.md index 0f8de60..da958d4 100644 --- a/README.md +++ b/README.md @@ -1,32 +1,22 @@ # flink-kubernetes-operator Temporary repository for Flink Kubernetes Operator. The content will be moved to OSS repo once created an IPR. Check [FLIP-212](https://cwiki.apache.org/confluence/display/FLINK/FLIP-212%3A+Introduce+Flink+Kubernetes+Operator) further info. -## How to Build -``` -mvn clean install -``` +## Installation -## How to Run -* Make Sure that `FlinkDeployment` Custom Resource Definition is already applied onto the cluster. If not, issue the following commands to apply: -``` -kubectl create -f target/classes/META-INF/fabric8/flinkdeployments.flink.io-v1.yml -``` -* (Optional) Build Docker Image -``` -docker build . -t docker.apple.com/gyula_fora/flink-java-operator:latest -``` -* Start flink-operator deployment. A new `ServiceAccount` "flink-operator" will be created with enough permission to create/list pods and services. +The operator is managed helm chart. To install run: ``` -kubectl create -f deploy/rbac.yaml -kubectl create -f deploy/flink-operator.yaml + cd helm/flink-operator + helm install flink-operator . ``` -* Create a new Flink deployment + +## User Guide +### Create a new Flink deployment The flink-operator will watch the CRD resources and submit a new Flink deployment once the CR it applied. ``` kubectl create -f examples/basic.yaml ``` -* Delete a Flink deployment +### Delete a Flink deployment ``` kubectl delete -f create/basic.yaml @@ -35,17 +25,24 @@ OR kubectl delete flinkdep {dep_name} ``` -* Get/List Flink deployments +### Get/List Flink deployments Get all the Flink deployments running in the K8s cluster ``` kubectl get flinkdep ``` - Describe a specific Flink deployment to show the status(including job status, savepoint, ect.) ``` kubectl describe flinkdep {dep_name} ``` -## How to Debug +## Developer Guide + +### Building docker images +``` +docker build . -t /flink-java-operator:latest +docker push flink-java-operator:latest +helm install flink-operator . --set image.repository= --set image.tag=latest +``` +### Running the operator locally You can run or debug the `FlinkOperator` from your preferred IDE. The operator itself is accessing the deployed Flink clusters through the REST interface. When running locally the `rest.port` and `rest.address` Flink configuration parameters must be modified to a locally accessible value. When using `minikube tunnel` the rest service is exposed on `localhost:8081` diff --git a/helm/flink-operator/.helmignore b/helm/flink-operator/.helmignore new file mode 100644 index 000..0e8a0eb --- /dev/null +++ b/helm/flink-operator/.helmignore @@ -0,0 +1,23 @@ +# Patterns to ignore when building packages. +# This supports shell glob matching, relative path matching, and +# negation (prefixed with !). Only one pattern per line. +.DS_Store +# Common VCS dirs +.git/ +.gitignore +.bzr/ +.bzrignore +.hg/ +.hgignore +.svn/ +# Common backup files +*.swp +*.bak +*.tmp +*.orig +*~ +# Various IDEs +.project +.idea/ +*.tmproj +.vscode/ diff --git a/Dockerfile b/helm/flink-operator/Chart.yaml similarity index 76%
[flink-kubernetes-operator] 20/23: provide operator log configuration via configmap
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit b6f369dd54b521fd68e3ce104bd8688a6aae50d0 Author: Matyas Orhidi AuthorDate: Fri Feb 11 16:58:17 2022 +0100 provide operator log configuration via configmap --- docker-entrypoint.sh | 5 +-- helm/flink-operator/templates/flink-operator.yaml | 41 +-- 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/docker-entrypoint.sh b/docker-entrypoint.sh index 94d8f51..813d174 100755 --- a/docker-entrypoint.sh +++ b/docker-entrypoint.sh @@ -19,6 +19,7 @@ ### args=("$@") + if [ "$1" = "help" ]; then printf "Usage: $(basename "$0") (operator|webhook)\n" printf "Or $(basename "$0") help\n\n" @@ -26,11 +27,11 @@ if [ "$1" = "help" ]; then elif [ "$1" = "operator" ]; then echo "Starting Operator" -exec java -jar /$OPERATOR_JAR +exec java -jar $LOG_CONFIG /$OPERATOR_JAR elif [ "$1" = "webhook" ]; then echo "Starting Webhook" -exec java -jar /$WEBHOOK_JAR +exec java -jar $LOG_CONFIG /$WEBHOOK_JAR fi args=("${args[@]}") diff --git a/helm/flink-operator/templates/flink-operator.yaml b/helm/flink-operator/templates/flink-operator.yaml index 2123d57..0d1b414 100644 --- a/helm/flink-operator/templates/flink-operator.yaml +++ b/helm/flink-operator/templates/flink-operator.yaml @@ -15,7 +15,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - --- apiVersion: apps/v1 kind: Deployment @@ -47,8 +46,12 @@ spec: env: - name: FLINK_CONF_DIR value: /opt/flink/conf +- name: LOG_CONFIG + value: -Dlog4j.configurationFile=/opt/flink-operator/conf/log4j2.properties volumeMounts: -- name: flink-config-volume +- name: flink-operator-config-volume + mountPath: /opt/flink-operator/conf +- name: flink-default-config-volume mountPath: /opt/flink/conf {{- if .Values.webhook.create }} - name: flink-webhook @@ -73,16 +76,26 @@ spec: value: "pkcs12" - name: WEBHOOK_SERVER_PORT value: "9443" +- name: LOG_CONFIG + value: -Dlog4j.configurationFile=/opt/flink-operator/conf/log4j2.properties volumeMounts: - name: keystore mountPath: "/certs" readOnly: true + - name: flink-operator-config-volume +mountPath: /opt/flink-operator/conf {{- end }} volumes: -- name: flink-config-volume +- name: flink-operator-config-volume configMap: name: flink-operator-config items: + - key: log4j2.properties +path: log4j2.properties +- name: flink-default-config-volume + configMap: +name: flink-default-config +items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties @@ -105,6 +118,28 @@ metadata: labels: {{- include "flink-operator.labels" . | nindent 4 }} data: + log4j2.properties: |+ +rootLogger.level = DEBUG +rootLogger.appenderRef.console.ref = ConsoleAppender + +logger.spring-web.name= org.springframework.web.filter +logger.spring-web.level = DEBUG + +# Log all infos to the console +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} %highlight{[%-5level] %msg%n%throwable} +--- + +apiVersion: v1 +kind: ConfigMap +metadata: + name: flink-default-config + namespace: {{ .Values.operatorNamespace.name }} + labels: +{{- include "flink-operator.labels" . | nindent 4 }} +data: flink-conf.yaml: |+ taskmanager.numberOfTaskSlots: 2 blob.server.port: 6124
[flink-kubernetes-operator] 19/23: moving to single ingress
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit b84af2592d7f9c61ccee33747d8550f1f118f493 Author: Matyas Orhidi AuthorDate: Thu Feb 10 15:35:47 2022 +0100 moving to single ingress --- examples/basic-ingress.yaml| 2 +- .../controller/FlinkDeploymentController.java | 16 ++- .../operator/reconciler/JobReconciler.java | 8 +- .../operator/reconciler/SessionReconciler.java | 8 +- .../kubernetes/operator/utils/IngressUtils.java| 112 .../kubernetes/operator/utils/KubernetesUtils.java | 114 - .../{values.yaml => templates/ingress.yaml}| 44 +++- helm/flink-operator/templates/rbac.yaml| 1 + helm/flink-operator/values.yaml| 3 + 9 files changed, 154 insertions(+), 154 deletions(-) diff --git a/examples/basic-ingress.yaml b/examples/basic-ingress.yaml index c0ad3a2..d846766 100644 --- a/examples/basic-ingress.yaml +++ b/examples/basic-ingress.yaml @@ -20,7 +20,7 @@ apiVersion: flink.apache.org/v1alpha1 kind: FlinkDeployment metadata: namespace: default - name: basic-example + name: basic-ingress spec: image: flink:1.14.3 flinkVersion: 1.14.3 diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index 5ab08e5..09eb536 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -23,6 +23,7 @@ import org.apache.flink.kubernetes.operator.observer.JobStatusObserver; import org.apache.flink.kubernetes.operator.reconciler.JobReconciler; import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; +import org.apache.flink.kubernetes.operator.utils.IngressUtils; import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.api.reconciler.Context; @@ -77,6 +78,12 @@ public class FlinkDeploymentController public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) { LOG.info("Cleaning up application cluster {}", flinkApp.getMetadata().getName()); FlinkUtils.deleteCluster(flinkApp, kubernetesClient); +IngressUtils.updateIngressRules( +flinkApp, +FlinkUtils.getEffectiveConfig(flinkApp), +operatorNamespace, +kubernetesClient, +true); return DeleteControl.defaultDelete(); } @@ -89,7 +96,7 @@ public class FlinkDeploymentController boolean success = observer.observeFlinkJobStatus(flinkApp, effectiveConfig); if (success) { try { -success = reconcileFlinkDeployment(flinkApp, effectiveConfig); +success = reconcileFlinkDeployment(operatorNamespace, flinkApp, effectiveConfig); } catch (Exception e) { throw new RuntimeException( "Error while reconciling deployment change for " @@ -109,10 +116,11 @@ public class FlinkDeploymentController } private boolean reconcileFlinkDeployment( -FlinkDeployment flinkApp, Configuration effectiveConfig) throws Exception { +String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig) +throws Exception { return flinkApp.getSpec().getJob() == null -? sessionReconciler.reconcile(flinkApp, effectiveConfig) -: jobReconciler.reconcile(flinkApp, effectiveConfig); +? sessionReconciler.reconcile(operatorNamespace, flinkApp, effectiveConfig) +: jobReconciler.reconcile(operatorNamespace, flinkApp, effectiveConfig); } @Override diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java index 61468e4..564efc9 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java @@ -26,7 +26,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.JobStatus; import
[flink-kubernetes-operator] 12/23: Minor Readme updates
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit 2ed39cc3b28a330adf0335b5d91c419650d7ab03 Author: Marton Balassi AuthorDate: Mon Feb 7 12:47:26 2022 +0100 Minor Readme updates --- README.md | 10 +- deploy/flink-operator.yaml | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 19873e2..0f8de60 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ mvn clean install ## How to Run * Make Sure that `FlinkDeployment` Custom Resource Definition is already applied onto the cluster. If not, issue the following commands to apply: ``` -k apply -f target/classes/META-INF/fabric8/flindeployments.flink.io-v1.yml +kubectl create -f target/classes/META-INF/fabric8/flinkdeployments.flink.io-v1.yml ``` * (Optional) Build Docker Image ``` @@ -17,18 +17,18 @@ docker build . -t docker.apple.com/gyula_fora/flink-java-operator:latest ``` * Start flink-operator deployment. A new `ServiceAccount` "flink-operator" will be created with enough permission to create/list pods and services. ``` -kubectl apply -f deploy/rbac.yaml -kubectl apply -f deploy/flink-operator.yaml +kubectl create -f deploy/rbac.yaml +kubectl create -f deploy/flink-operator.yaml ``` * Create a new Flink deployment The flink-operator will watch the CRD resources and submit a new Flink deployment once the CR it applied. ``` -kubectl apply -f deploy/basic.yaml +kubectl create -f examples/basic.yaml ``` * Delete a Flink deployment ``` -kubectl delete -f deploy/basic.yaml +kubectl delete -f create/basic.yaml OR diff --git a/deploy/flink-operator.yaml b/deploy/flink-operator.yaml index ded236d..b7574b5 100644 --- a/deploy/flink-operator.yaml +++ b/deploy/flink-operator.yaml @@ -33,7 +33,7 @@ spec: serviceAccountName: flink-operator containers: - name: flink-operator -image: docker.apple.com/matyas_orhidi/flink-java-operator:latest +image: docker.apple.com/aiml/dpi-flink/flink-operator:1.0.7 imagePullPolicy: Always env: - name: FLINK_CONF_DIR
[flink-kubernetes-operator] 02/23: CRD alignment + first working flink deployment
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit f3d12d6535b93a4002ba30fdb00573276d191fad Author: Matyas Orhidi AuthorDate: Thu Jan 27 20:02:00 2022 +0100 CRD alignment + first working flink deployment --- README.md | 10 +- deploy/flink-operator.yaml | 82 +- deploy/rbac.yaml | 76 + examples/basic.yaml| 27 +++ examples/cr.yaml | 21 --- examples/pod-template.yaml | 49 ++ pom.xml| 2 +- ...sOperatorEntrypoint.java => FlinkOperator.java} | 8 +- .../kubernetes/operator/Utils/FlinkUtils.java | 181 - .../controller/FlinkApplicationController.java | 169 --- .../controller/FlinkDeploymentController.java | 123 ++ ...{FlinkApplication.java => FlinkDeployment.java} | 6 +- ...plicationList.java => FlinkDeploymentList.java} | 2 +- .../kubernetes/operator/crd/spec/CancelMode.java | 10 ++ .../operator/crd/spec/FlinkApplicationSpec.java| 30 .../operator/crd/spec/FlinkDeploymentSpec.java | 22 +++ .../operator/crd/spec/JobManagerSpec.java | 14 ++ .../kubernetes/operator/crd/spec/JobSpec.java | 16 ++ .../kubernetes/operator/crd/spec/Resource.java | 4 +- .../kubernetes/operator/crd/spec/RestoreMode.java | 12 ++ .../operator/crd/spec/TaskManagerSpec.java | 14 ++ ...ationStatus.java => FlinkDeploymentStatus.java} | 2 +- src/main/resources/log4j2.properties | 2 +- 23 files changed, 489 insertions(+), 393 deletions(-) diff --git a/README.md b/README.md index 845b97d..b0b58f9 100644 --- a/README.md +++ b/README.md @@ -7,17 +7,17 @@ mvn clean install ``` ## How to Run -* Make Sure that FlinkApplication Custom Resource Definition is already applied onto the cluster. The CRD could be find [here](deploy/crd.yaml). If not, issue the following commands to apply: +* Make Sure that FlinkApplication Custom Resource Definition is already applied onto the cluster. If not, issue the following commands to apply: ``` -kubectl apply -f deploy/crd.yaml +k apply -f target/classes/META-INF/fabric8/flinkapplications.flink.io-v1.yml ``` -* Build Docker Image +* (Optional) Build Docker Image ``` docker build . -t docker.apple.com/gyula_fora/flink-java-operator:latest ``` -* Start flink-operator deployment -A new `ServiceAccount` "flink-operator" will be created with enough permission to create/list pods and services. +* Start flink-operator deployment. A new `ServiceAccount` "flink-operator" will be created with enough permission to create/list pods and services. ``` +kubectl apply -f deploy/rbac.yaml kubectl apply -f deploy/flink-operator.yaml ``` * Create a new Flink application diff --git a/deploy/flink-operator.yaml b/deploy/flink-operator.yaml index 564ed12..c2698c7 100644 --- a/deploy/flink-operator.yaml +++ b/deploy/flink-operator.yaml @@ -15,7 +15,7 @@ spec: serviceAccountName: flink-operator containers: - name: flink-operator -image: docker.apple.com/gyula_fora/flink-java-operator:latest +image: docker.apple.com/matyas_orhidi/flink-java-operator:latest imagePullPolicy: Always env: - name: FLINK_CONF_DIR @@ -26,7 +26,7 @@ spec: volumes: - name: flink-config-volume configMap: - name: flink-config + name: flink-operator-config items: - key: flink-conf.yaml path: flink-conf.yaml @@ -38,7 +38,7 @@ spec: apiVersion: v1 kind: ConfigMap metadata: - name: flink-config + name: flink-operator-config labels: app: flink data: @@ -97,79 +97,3 @@ data: # Suppress the irrelevant (wrong) warnings from the Netty channel handler logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF - - -apiVersion: v1 -kind: ServiceAccount -metadata: - name: flink-operator - - -apiVersion: v1 -kind: ClusterRole -apiVersion: rbac.authorization.k8s.io/v1 -metadata: - name: flink-operator -rules: -- apiGroups: - - flink-operator - resources: - - "*" - verbs: - - "*" -- apiGroups: - - "" - resources: - - pods - - services - - endpoints - - persistentvolumeclaims - - events - - configmaps - - secrets - verbs: - - "*" -- apiGroups: - - apps - resources: - - deployments - - replicasets - verbs: - - "*" -- apiGroups: - - extensions - resources: - - deployments - - ingresses - verbs: - - "*" -- apiGroups: - - flink.io - resources: - - flinkapplications - verbs: - - "*" -- apiGroups: - - networking.k8s.io - resources: - -
[flink-kubernetes-operator] 07/23: Enable RAT plugin + add license headers
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit 7d640f59d086226c9a88638afe878c9d1fe90c24 Author: Gyula Fora AuthorDate: Wed Feb 2 16:50:42 2022 +0100 Enable RAT plugin + add license headers --- Dockerfile | 18 + deploy/flink-operator.yaml | 18 + deploy/rbac.yaml | 18 + examples/basic-checkpoint-ha.yaml | 18 + examples/basic-ingress.yaml| 19 - examples/basic-session.yaml| 18 + examples/basic.yaml| 18 + examples/pod-template.yaml | 18 + pom.xml| 83 +- .../flink/kubernetes/operator/FlinkOperator.java | 17 + .../controller/FlinkDeploymentController.java | 17 + .../controller/observer/JobStatusObserver.java | 17 + .../controller/reconciler/JobReconciler.java | 17 + .../controller/reconciler/SessionReconciler.java | 17 + .../kubernetes/operator/crd/FlinkDeployment.java | 17 + .../operator/crd/FlinkDeploymentList.java | 17 + .../kubernetes/operator/crd/spec/CancelMode.java | 17 + .../operator/crd/spec/FlinkDeploymentSpec.java | 17 + .../operator/crd/spec/JobManagerSpec.java | 17 + .../kubernetes/operator/crd/spec/JobSpec.java | 17 + .../kubernetes/operator/crd/spec/JobState.java | 17 + .../kubernetes/operator/crd/spec/Resource.java | 17 + .../operator/crd/spec/TaskManagerSpec.java | 17 + .../kubernetes/operator/crd/spec/UpgradeMode.java | 17 + .../operator/crd/status/FlinkDeploymentStatus.java | 17 + .../kubernetes/operator/crd/status/JobStatus.java | 17 + .../kubernetes/operator/utils/FlinkUtils.java | 17 + .../kubernetes/operator/utils/KubernetesUtils.java | 17 + src/main/resources/log4j2.properties | 18 + 29 files changed, 567 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 89287a1..b4da7d8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,3 +1,21 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + FROM openjdk:11-jre COPY target/flink-operator-1.0-SNAPSHOT.jar / diff --git a/deploy/flink-operator.yaml b/deploy/flink-operator.yaml index bddf360..ded236d 100644 --- a/deploy/flink-operator.yaml +++ b/deploy/flink-operator.yaml @@ -1,3 +1,21 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + apiVersion: apps/v1 kind: Deployment metadata: diff --git a/deploy/rbac.yaml b/deploy/rbac.yaml index 11640ca..52d0399 100644 --- a/deploy/rbac.yaml +++ b/deploy/rbac.yaml @@ -1,3 +1,21 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this
[flink-kubernetes-operator] 01/23: Initial project setup
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit 7c555d2b4c627b1da3f0cdb33ef18e3070fa18b8 Author: Gyula Fora AuthorDate: Thu Jan 20 12:41:26 2022 +0100 Initial project setup --- .gitignore | 36 Dockerfile | 5 + LICENSE| 201 + README.md | 47 - deploy/flink-operator.yaml | 175 ++ examples/cr.yaml | 21 +++ pom.xml| 148 +++ .../operator/KubernetesOperatorEntrypoint.java | 42 + .../flink/kubernetes/operator/Utils/Constants.java | 11 ++ .../kubernetes/operator/Utils/FlinkUtils.java | 99 ++ .../kubernetes/operator/Utils/KubernetesUtils.java | 23 +++ .../controller/FlinkApplicationController.java | 169 + .../kubernetes/operator/crd/FlinkApplication.java | 17 ++ .../operator/crd/FlinkApplicationList.java | 6 + .../operator/crd/spec/FlinkApplicationSpec.java| 30 +++ .../kubernetes/operator/crd/spec/Resource.java | 12 ++ .../crd/status/FlinkApplicationStatus.java | 10 + .../kubernetes/operator/crd/status/JobStatus.java | 15 ++ src/main/resources/log4j2.properties | 8 + 19 files changed, 1074 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore new file mode 100644 index 000..ceb6113 --- /dev/null +++ b/.gitignore @@ -0,0 +1,36 @@ +# Compiled class file +*.class + +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +### Maven template +target/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties + +.idea diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 000..89287a1 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ +FROM openjdk:11-jre + +COPY target/flink-operator-1.0-SNAPSHOT.jar / + +CMD ["java", "-jar", "/flink-operator-1.0-SNAPSHOT.jar"] diff --git a/LICENSE b/LICENSE new file mode 100644 index 000..261eeb9 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 +http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work
[flink-kubernetes-operator] branch main updated (0737641 -> 5824296)
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git. from 0737641 Seed commit new 7c555d2 Initial project setup new f3d12d6 CRD alignment + first working flink deployment new 23408e4 Basic job lifecycle handling + Spotless new e48935c Session cluster + local debugging support new 0fe31c5 Extract Observer and Reconciler logic from controller new 26ca859 Adding Ingress support new 7d640f5 Enable RAT plugin + add license headers new 8054752 Support initialSavepointPath + do not trigger upgrade on certain spec changes new 5c6c128 Docker build improvements new bfdd412 Adding basic integration test new 5bf7973 Support pod template merging new 2ed39cc Minor Readme updates new 5ea10e3 Introduce FlinkService for cluster interactions new a8cb381 Adding JobStatusObserverTest new 411afb3 adding helm chart new 45547d0 Make project modular + validating webhook prototype new da6ae36 Move CRD to flink.apache.org group new 5fae353 Add job reconciler test new b84af25 moving to single ingress new b6f369d provide operator log configuration via configmap new 0b48ce0 Minor cleanups and fixes new ce0ba97 Rework tests to avoid using mockito new 5824296 Fix webhook helm chart The 23 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .gitignore | 39 ++ Dockerfile | 50 ++ LICENSE| 201 README.md | 69 ++- docker-entrypoint.sh | 40 ++ examples/basic-checkpoint-ha.yaml | 60 +++ examples/basic-ingress.yaml| 44 ++ examples/basic-session.yaml| 38 ++ examples/basic.yaml| 42 ++ examples/pod-template.yaml | 73 +++ flink-kubernetes-operator/pom.xml | 150 ++ .../flink/kubernetes/operator/FlinkOperator.java | 66 +++ .../controller/FlinkDeploymentController.java | 142 ++ .../kubernetes/operator/crd/FlinkDeployment.java | 38 ++ .../operator/crd/FlinkDeploymentList.java | 23 + .../kubernetes/operator/crd/spec/CancelMode.java | 28 + .../operator/crd/spec/FlinkDeploymentSpec.java | 44 ++ .../operator/crd/spec/JobManagerSpec.java | 33 ++ .../kubernetes/operator/crd/spec/JobSpec.java | 41 ++ .../kubernetes/operator/crd/spec/JobState.java | 28 + .../kubernetes/operator/crd/spec/Resource.java | 32 ++ .../operator/crd/spec/TaskManagerSpec.java | 33 ++ .../kubernetes/operator/crd/spec/UpgradeMode.java | 30 ++ .../operator/crd/status/FlinkDeploymentStatus.java | 33 ++ .../kubernetes/operator/crd/status/JobStatus.java | 36 ++ .../operator/observer/JobStatusObserver.java | 121 + .../operator/reconciler/JobReconciler.java | 170 +++ .../operator/reconciler/SessionReconciler.java | 78 +++ .../kubernetes/operator/service/FlinkService.java | 155 ++ .../kubernetes/operator/utils/FlinkUtils.java | 229 + .../kubernetes/operator/utils/IngressUtils.java| 112 .../src/main/resources/log4j2.properties | 26 + .../kubernetes/operator/FlinkOperatorITCase.java | 164 ++ .../flink/kubernetes/operator/TestUtils.java | 90 .../kubernetes/operator/TestingFlinkService.java | 100 .../operator/observer/JobStatusObserverTest.java | 74 +++ .../operator/reconciler/JobReconcilerTest.java | 87 .../kubernetes/operator/utils/FlinkUtilsTest.java | 65 +++ flink-kubernetes-webhook/pom.xml | 95 .../operator/admission/AdmissionHandler.java | 124 + .../admission/FlinkDeploymentValidator.java| 53 ++ .../operator/admission/FlinkOperatorWebhook.java | 152 ++ .../admissioncontroller/AdmissionController.java | 53 ++ .../AdmissionControllerException.java | 44 ++ .../admissioncontroller/AdmissionUtils.java| 66 +++ .../admissioncontroller/NotAllowedException.java | 90 .../admission/admissioncontroller/Operation.java | 26 + .../admissioncontroller/RequestHandler.java| 27 + .../admissioncontroller/clone/Cloner.java | 30 ++ .../clone/ObjectMapperCloner.java | 41 ++ .../mutation/DefaultRequestMutator.java| 63 +++ .../admissioncontroller/mutation/Mutator.java | 29 ++ .../validation/DefaultRequestValidator.java| 59 +++
[flink-kubernetes-operator] 06/23: Adding Ingress support
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit 26ca85954dec35094331fea67eb79b6380154709 Author: Matyas Orhidi AuthorDate: Wed Feb 2 15:39:58 2022 +0100 Adding Ingress support --- examples/basic-ingress.yaml| 28 +++ .../controller/reconciler/JobReconciler.java | 2 + .../controller/reconciler/SessionReconciler.java | 2 + .../operator/crd/spec/FlinkDeploymentSpec.java | 1 + .../kubernetes/operator/utils/FlinkUtils.java | 6 ++ .../kubernetes/operator/utils/KubernetesUtils.java | 97 ++ 6 files changed, 136 insertions(+) diff --git a/examples/basic-ingress.yaml b/examples/basic-ingress.yaml new file mode 100644 index 000..1a19c15 --- /dev/null +++ b/examples/basic-ingress.yaml @@ -0,0 +1,28 @@ +apiVersion: flink.io/v1alpha1 +kind: FlinkDeployment +metadata: + namespace: default + name: basic-example +spec: + image: flink:1.14.3 + flinkVersion: 1.14.3 + ingressDomain: flink.k8s.io + flinkConfiguration: +#rest.address: basic-example.flink.k8s.io +#rest.port: "80" +taskmanager.numberOfTaskSlots: "2" +kubernetes.jobmanager.service-account: flink-operator + jobManager: +replicas: 1 +resource: + memory: "2048m" + cpu: 1 + taskManager: +taskSlots: 2 +resource: + memory: "2048m" + cpu: 1 + job: +jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar +parallelism: 2 + diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java index 6c9ea2b..baa97f9 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java @@ -14,6 +14,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.JobStatus; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; +import org.apache.flink.kubernetes.operator.utils.KubernetesUtils; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import io.fabric8.kubernetes.client.KubernetesClient; @@ -47,6 +48,7 @@ public class JobReconciler { } try { deployFlinkJob(flinkApp, effectiveConfig, Optional.empty()); +KubernetesUtils.deployIngress(flinkApp, effectiveConfig, kubernetesClient); return true; } catch (Exception e) { LOG.error("Error while deploying " + flinkApp.getMetadata().getName()); diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java index 4984f08..a17a28e 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java @@ -8,6 +8,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; +import org.apache.flink.kubernetes.operator.utils.KubernetesUtils; import io.fabric8.kubernetes.client.KubernetesClient; import org.slf4j.Logger; @@ -33,6 +34,7 @@ public class SessionReconciler { flinkApp.setStatus(new FlinkDeploymentStatus()); try { deployFlinkSession(flinkApp, effectiveConfig); +KubernetesUtils.deployIngress(flinkApp, effectiveConfig, kubernetesClient); return true; } catch (Exception e) { LOG.error("Error while deploying " + flinkApp.getMetadata().getName()); diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java index 5a562a3..5851f1c 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java @@ -13,6 +13,7 @@ public class FlinkDeploymentSpec { private String image; private String imagePullPolicy; private String flinkVersion; +private String ingressDomain; private Map flinkConfiguration; private Pod podTemplate; private JobManagerSpec jobManager; diff --git
[flink-kubernetes-operator] 04/23: Session cluster + local debugging support
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit e48935c27e51b1450511a74a88d6f128e8617457 Author: Matyas Orhidi AuthorDate: Tue Feb 1 10:35:05 2022 +0100 Session cluster + local debugging support --- README.md | 47 deploy/flink-operator.yaml | 1 - examples/basic-session.yaml| 21 + examples/pod-template.yaml | 12 ++- .../controller/FlinkDeploymentController.java | 89 +++--- .../kubernetes/operator/crd/FlinkDeployment.java | 2 + .../flink/kubernetes/operator/utils/Constants.java | 6 -- .../kubernetes/operator/utils/FlinkUtils.java | 21 - 8 files changed, 148 insertions(+), 51 deletions(-) diff --git a/README.md b/README.md index b0b58f9..19873e2 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # flink-kubernetes-operator -Temporary repository for Flink Kubernetes Operator. The content will be moved to OSS repo once created and IPR. +Temporary repository for Flink Kubernetes Operator. The content will be moved to OSS repo once created an IPR. Check [FLIP-212](https://cwiki.apache.org/confluence/display/FLINK/FLIP-212%3A+Introduce+Flink+Kubernetes+Operator) further info. ## How to Build ``` @@ -7,9 +7,9 @@ mvn clean install ``` ## How to Run -* Make Sure that FlinkApplication Custom Resource Definition is already applied onto the cluster. If not, issue the following commands to apply: +* Make Sure that `FlinkDeployment` Custom Resource Definition is already applied onto the cluster. If not, issue the following commands to apply: ``` -k apply -f target/classes/META-INF/fabric8/flinkapplications.flink.io-v1.yml +k apply -f target/classes/META-INF/fabric8/flindeployments.flink.io-v1.yml ``` * (Optional) Build Docker Image ``` @@ -20,28 +20,47 @@ docker build . -t docker.apple.com/gyula_fora/flink-java-operator:latest kubectl apply -f deploy/rbac.yaml kubectl apply -f deploy/flink-operator.yaml ``` -* Create a new Flink application -The flink-operator will watch the CRD resources and submit a new Flink application once the CR it applied. +* Create a new Flink deployment +The flink-operator will watch the CRD resources and submit a new Flink deployment once the CR it applied. ``` -kubectl apply -f deploy/cr.yaml +kubectl apply -f deploy/basic.yaml ``` -* Delete a Flink application +* Delete a Flink deployment ``` -kubectl delete -f deploy/cr.yaml +kubectl delete -f deploy/basic.yaml OR -kubectl delete flinkapp {app_name} +kubectl delete flinkdep {dep_name} ``` -* Get/List Flink applications -Get all the Flink applications running in the K8s cluster +* Get/List Flink deployments +Get all the Flink deployments running in the K8s cluster ``` -kubectl get flinkapp +kubectl get flinkdep ``` -Describe a specific Flink application to show the status(including job status, savepoint, ect.) +Describe a specific Flink deployment to show the status(including job status, savepoint, ect.) ``` -kubectl describe flinkapp {app_name} +kubectl describe flinkdep {dep_name} ``` +## How to Debug +You can run or debug the `FlinkOperator` from your preferred IDE. The operator itself is accessing the deployed Flink clusters through the REST interface. When running locally the `rest.port` and `rest.address` Flink configuration parameters must be modified to a locally accessible value. + +When using `minikube tunnel` the rest service is exposed on `localhost:8081` +``` +> minikube tunnel + +> kubectl get services +NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE +basic-session-exampleClusterIP None 6123/TCP,6124/TCP 14h +basic-session-example-rest LoadBalancer 10.96.36.250 127.0.0.1 8081:30572/TCP 14h +``` +The operator pics up the default log and flink configurations from `/opt/flink/conf`. You can put the rest configuration parameters here: +``` +cat /opt/flink/conf/flink-conf.yaml +rest.port: 8081 +rest.address: localhost +``` + diff --git a/deploy/flink-operator.yaml b/deploy/flink-operator.yaml index c2698c7..bddf360 100644 --- a/deploy/flink-operator.yaml +++ b/deploy/flink-operator.yaml @@ -43,7 +43,6 @@ metadata: app: flink data: flink-conf.yaml: |+ -jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 2 blob.server.port: 6124 jobmanager.rpc.port: 6123 diff --git a/examples/basic-session.yaml b/examples/basic-session.yaml new file mode 100644 index 000..b332391 --- /dev/null +++ b/examples/basic-session.yaml @@ -0,0 +1,21 @@ +apiVersion: flink.io/v1alpha1 +kind: FlinkDeployment +metadata: + namespace: default + name: basic-session-example +spec: + image: flink:1.14.3 + flinkVersion: 1.14.3 +
[flink-kubernetes-operator] 09/23: Docker build improvements
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit 5c6c128dcb58323e1a73c0eb188a8099e7d97d48 Author: Jaganathan Asokan AuthorDate: Wed Feb 2 15:06:18 2022 -0500 Docker build improvements --- Dockerfile | 12 +++- pom.xml| 2 +- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index b4da7d8..c834e97 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,9 +15,19 @@ # See the License for the specific language governing permissions and # limitations under the License. +# Build +FROM maven:3.8.4-openjdk-11 AS build +WORKDIR /app +COPY src ./src +COPY tools ./tools +COPY pom.xml . + +RUN mvn -f ./pom.xml clean install + +# stage FROM openjdk:11-jre -COPY target/flink-operator-1.0-SNAPSHOT.jar / +COPY --from=build /app/target/flink-operator-1.0-SNAPSHOT.jar / CMD ["java", "-jar", "/flink-operator-1.0-SNAPSHOT.jar"] diff --git a/pom.xml b/pom.xml index a174ef8..5b3caa3 100644 --- a/pom.xml +++ b/pom.xml @@ -282,7 +282,7 @@ under the License. **/target/** apache-maven-3.2.5/** - **/.idea/** + **/.idea/**
[flink-kubernetes-operator] 03/23: Basic job lifecycle handling + Spotless
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit 23408e451a35ab3aec42d85eb46c96f3264c7566 Author: Gyula Fora AuthorDate: Tue Feb 1 10:00:02 2022 +0100 Basic job lifecycle handling + Spotless --- deploy/rbac.yaml | 1 + pom.xml| 63 +++ .../flink/kubernetes/operator/FlinkOperator.java | 17 +- .../flink/kubernetes/operator/Utils/Constants.java | 11 - .../kubernetes/operator/Utils/FlinkUtils.java | 128 - .../kubernetes/operator/Utils/KubernetesUtils.java | 23 - .../controller/FlinkDeploymentController.java | 338 ++--- .../kubernetes/operator/crd/FlinkDeployment.java | 10 +- .../operator/crd/FlinkDeploymentList.java | 4 +- .../kubernetes/operator/crd/spec/CancelMode.java | 1 + .../operator/crd/spec/FlinkDeploymentSpec.java | 2 +- .../operator/crd/spec/JobManagerSpec.java | 2 +- .../kubernetes/operator/crd/spec/JobSpec.java | 6 +- .../kubernetes/operator/crd/spec/JobState.java | 11 + .../kubernetes/operator/crd/spec/Resource.java | 5 +- .../operator/crd/spec/TaskManagerSpec.java | 2 +- .../spec/{RestoreMode.java => UpgradeMode.java}| 7 +- .../operator/crd/status/FlinkDeploymentStatus.java | 8 +- .../kubernetes/operator/crd/status/JobStatus.java | 4 +- .../flink/kubernetes/operator/utils/Constants.java | 6 + .../kubernetes/operator/utils/FlinkUtils.java | 152 ++ tools/maven/checkstyle.xml | 562 + tools/maven/suppressions.xml | 26 + 23 files changed, 1137 insertions(+), 252 deletions(-) diff --git a/deploy/rbac.yaml b/deploy/rbac.yaml index 062ecab..11640ca 100644 --- a/deploy/rbac.yaml +++ b/deploy/rbac.yaml @@ -29,6 +29,7 @@ rules: - events - configmaps - secrets + - nodes verbs: - "*" - apiGroups: diff --git a/pom.xml b/pom.xml index 647868a..0d15470 100644 --- a/pom.xml +++ b/pom.xml @@ -23,6 +23,8 @@ 1.7.15 2.17.0 + +2.4.2 @@ -137,11 +139,72 @@ + org.apache.maven.plugins maven-surefire-plugin ${maven-surefire-plugin.version} + + + org.apache.maven.plugins + maven-checkstyle-plugin + 2.17 + + + com.puppycrawl.tools + checkstyle + + 8.14 + + + + + validate + validate + + check + + + + + /tools/maven/suppressions.xml + true + /tools/maven/checkstyle.xml + true + true + + + + + com.diffplug.spotless + spotless-maven-plugin + ${spotless.version} + + + + 1.7 + AOSP + + + + + org.apache.flink,org.apache.flink.shaded,,javax,java,scala,\# + + + +
[flink] branch master updated (ebfc7c8 -> 2c792d0)
This is an automated email from the ASF dual-hosted git repository. knaufk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from ebfc7c8 [FLINK-25980][datastream] remove unnecessary condition add 2c792d0 [FLINK-25532] Improve Documentation of Flink Docker-Compose (#18725) No new revisions were added by this update. Summary of changes: .../resource-providers/standalone/docker.md| 485 ++--- 1 file changed, 221 insertions(+), 264 deletions(-)
[flink] branch master updated (2d51795 -> ebfc7c8)
This is an automated email from the ASF dual-hosted git repository. fpaul pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 2d51795 [FLINK-25571] Update Elasticsearch Sink to use decomposed interfaces add ebfc7c8 [FLINK-25980][datastream] remove unnecessary condition No new revisions were added by this update. Summary of changes: .../apache/flink/streaming/api/operators/co/IntervalJoinOperator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[flink] branch master updated (07f23e0 -> 2d51795)
This is an automated email from the ASF dual-hosted git repository. fpaul pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 07f23e0 [FLINK-26038][connector/pulsar] Support delay message on PulsarSink. add 2d51795 [FLINK-25571] Update Elasticsearch Sink to use decomposed interfaces No new revisions were added by this update. Summary of changes: .../5b9eed8a-5fb6-4373-98ac-3be2a71941b8 | 1 - .../elasticsearch/sink/ElasticsearchEmitter.java | 2 +- .../elasticsearch/sink/ElasticsearchSink.java | 38 +++--- .../elasticsearch/sink/ElasticsearchWriter.java| 10 +++--- .../elasticsearch/sink/RequestIndexer.java | 4 +-- .../table/ElasticsearchDynamicSink.java| 4 +-- .../table/RowElasticsearchEmitter.java | 2 +- .../sink/ElasticsearchWriterITCase.java| 4 +-- .../connector/elasticsearch/sink/TestEmitter.java | 2 +- .../table/ElasticsearchDynamicSinkBaseITCase.java | 8 ++--- .../ElasticsearchDynamicSinkFactoryBaseTest.java | 6 ++-- .../test/Elasticsearch7SinkExample.scala | 2 +- 12 files changed, 25 insertions(+), 58 deletions(-)
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #1: [FLINK-26078] Kubernetes Operator Prototype
gyfora commented on pull request #1: URL: https://github.com/apache/flink-kubernetes-operator/pull/1#issuecomment-1041302492 @bgeng777 with the native integration, Flink controls the number of taskmanagers based on parallelism/task slots. Once we support standalone integration with reactive scaling this will make sense (and we should add it then) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] bgeng777 commented on pull request #1: [FLINK-26078] Kubernetes Operator Prototype
bgeng777 commented on pull request #1: URL: https://github.com/apache/flink-kubernetes-operator/pull/1#issuecomment-1041298545 I want to check that in TaskManagerSpec, there is no field like `replicas` in JobManagerSpec. As a result, when I run the operator in a real k8s cluster and want to add a new TaskManager, I cannot directly update the `basic.yaml` and apply it. Is there any concern for removing this field for TaskManagerSpec? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[flink] 08/09: [FLINK-26026][connector/pulsar] Create unit tests for Pulsar sink connector.
This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 714dd80df2ed00b370af762e8684d500645e2b3c Author: Yufan Sheng AuthorDate: Thu Feb 10 19:18:56 2022 +0800 [FLINK-26026][connector/pulsar] Create unit tests for Pulsar sink connector. --- .../pulsar/sink/PulsarSinkBuilderTest.java | 108 ++ .../connector/pulsar/sink/PulsarSinkITCase.java| 99 + .../committer/PulsarCommittableSerializerTest.java | 53 + .../pulsar/sink/writer/PulsarWriterTest.java | 199 ++ .../sink/writer/router/KeyHashTopicRouterTest.java | 111 ++ .../writer/router/RoundRobinTopicRouterTest.java | 88 .../writer/topic/TopicMetadataListenerTest.java| 140 + .../writer/topic/TopicProducerRegisterTest.java| 91 .../pulsar/testutils/function/ControlSource.java | 228 + 9 files changed, 1117 insertions(+) diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilderTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilderTest.java new file mode 100644 index 000..188e718 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilderTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.configuration.Configuration; + +import org.junit.jupiter.api.Test; + +import java.util.Properties; + +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_SEND_TIMEOUT_MS; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_SCHEMA_EVOLUTION; +import static org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode.CUSTOM; +import static org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode.MESSAGE_KEY_HASH; +import static org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode.ROUND_ROBIN; +import static org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.flinkSchema; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** Unit tests for {@link PulsarSinkBuilder}. */ +class PulsarSinkBuilderTest { + +@Test +void topicNameCouldBeSetOnlyOnce() { +PulsarSinkBuilder builder = PulsarSink.builder(); +builder.setTopics("a", "b"); + +assertThrows(IllegalStateException.class, () -> builder.setTopics("c")); +} + +@Test +void topicRoutingModeCouldNotBeCustom() { +PulsarSinkBuilder builder = PulsarSink.builder(); + +assertDoesNotThrow(() -> builder.setTopicRoutingMode(ROUND_ROBIN)); +assertDoesNotThrow(() -> builder.setTopicRoutingMode(MESSAGE_KEY_HASH)); +assertThrows(IllegalArgumentException.class, () -> builder.setTopicRoutingMode(CUSTOM)); +} + +@Test +void setConfigCouldNotOverrideExistedConfigs() { +PulsarSinkBuilder builder = PulsarSink.builder(); +builder.setConfig(PULSAR_SEND_TIMEOUT_MS, 1L); + +assertDoesNotThrow(() -> builder.setConfig(PULSAR_SEND_TIMEOUT_MS, 1L)); + +assertThrows( +IllegalArgumentException.class, +() -> builder.setConfig(PULSAR_SEND_TIMEOUT_MS, 2L)); + +Configuration configuration = new Configuration(); +configuration.set(PULSAR_SEND_TIMEOUT_MS, 3L); +assertThrows(IllegalArgumentException.class, () -> builder.setConfig(configuration)); + +Properties properties = new Properties(); +properties.put(PULSAR_SEND_TIMEOUT_MS.key(), 4L); +assertThrows(IllegalArgumentException.class, () -> builder.setProperties(properties)); +} + +@Test +void serializationSchemaIsRequired() { +
[flink] 07/09: [FLINK-26025][connector/pulsar] Replace MockPulsar with new Pulsar test tools based on PulsarStandalone.
This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit b6be14da65fedf01e82dc83a58e791709ce8ce57 Author: Yufan Sheng AuthorDate: Wed Feb 9 15:09:50 2022 +0800 [FLINK-26025][connector/pulsar] Replace MockPulsar with new Pulsar test tools based on PulsarStandalone. 1. Drop some unused fields in test classes. 2. Fix the checkstyle issues for source test. 3. Fix violations for Pulsar connector according to the flink-architecture-tests. 4. Create a standalone Pulsar for test. 5. Add new methods to PulsarRuntimeOperator. 6. Fix the bug in PulsarContainerRuntime, support running tests in E2E environment. 7. Create PulsarContainerTestEnvironment for supporting E2E tests. 8. Add a lot of comments for Pulsar testing tools. 9. Drop mocked Pulsar service, use standalone Pulsar instead. --- flink-connectors/flink-connector-pulsar/pom.xml| 16 - .../source/enumerator/cursor/StopCursor.java | 2 - .../source/enumerator/topic/TopicPartition.java| 4 +- .../split/PulsarUnorderedPartitionSplitReader.java | 16 +- .../common/schema/PulsarSchemaUtilsTest.java | 6 +- .../pulsar/source/PulsarSourceITCase.java | 2 +- .../subscriber/PulsarSubscriberTest.java | 10 +- .../reader/source/PulsarSourceReaderTestBase.java | 2 +- .../pulsar/testutils/PulsarTestContext.java| 4 - .../pulsar/testutils/PulsarTestSuiteBase.java | 2 +- .../connector/pulsar/testutils/SampleData.java | 96 - .../cases/MultipleTopicConsumingContext.java | 1 - .../cases/MultipleTopicTemplateContext.java| 1 - .../cases/SingleTopicConsumingContext.java | 1 - .../pulsar/testutils/runtime/PulsarRuntime.java| 36 +- .../testutils/runtime/PulsarRuntimeOperator.java | 414 ++--- .../runtime/container/PulsarContainerRuntime.java | 61 ++- .../runtime/embedded/PulsarEmbeddedRuntime.java| 284 ++ .../runtime/mock/BlankBrokerInterceptor.java | 61 --- .../runtime/mock/MockBookKeeperClientFactory.java | 74 .../testutils/runtime/mock/MockPulsarService.java | 87 - .../runtime/mock/MockZooKeeperClientFactory.java | 73 .../runtime/mock/NonClosableMockBookKeeper.java| 55 --- .../testutils/runtime/mock/PulsarMockRuntime.java | 160 .../mock/SameThreadOrderedSafeExecutor.java| 56 --- .../test/resources/containers/txnStandalone.conf | 100 - .../util/flink/container/FlinkContainers.java | 2 +- .../util/pulsar/PulsarSourceOrderedE2ECase.java| 7 +- .../util/pulsar/PulsarSourceUnorderedE2ECase.java | 7 +- .../pulsar/cases/ExclusiveSubscriptionContext.java | 14 - .../pulsar/cases/FailoverSubscriptionContext.java | 14 - .../pulsar/cases/KeySharedSubscriptionContext.java | 7 +- .../pulsar/cases/SharedSubscriptionContext.java| 7 +- .../common/PulsarContainerTestEnvironment.java | 31 ++ 34 files changed, 867 insertions(+), 846 deletions(-) diff --git a/flink-connectors/flink-connector-pulsar/pom.xml b/flink-connectors/flink-connector-pulsar/pom.xml index 45047eb..fc7b68c 100644 --- a/flink-connectors/flink-connector-pulsar/pom.xml +++ b/flink-connectors/flink-connector-pulsar/pom.xml @@ -120,22 +120,6 @@ under the License. org.apache.pulsar - testmocks - ${pulsar.version} - test - - - org.testng - testng - - - org.powermock - powermock-module-testng - - - - - org.apache.pulsar pulsar-broker ${pulsar.version} test diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java index b85944f..aaec143 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java @@ -18,7 +18,6 @@ package org.apache.flink.connector.pulsar.source.enumerator.cursor; -import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import
[flink] 05/09: [FLINK-26024][connector/pulsar] Create a PulsarSerializationSchema for better records serialization.
This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 0e72bfede70a00146f466b3e7491fc0f83eb6c41 Author: Yufan Sheng AuthorDate: Tue Feb 15 22:21:50 2022 +0800 [FLINK-26024][connector/pulsar] Create a PulsarSerializationSchema for better records serialization. --- .../pulsar/sink/writer/message/PulsarMessage.java | 111 ++ .../sink/writer/message/PulsarMessageBuilder.java | 127 .../writer/serializer/PulsarSchemaWrapper.java | 59 ++ .../serializer/PulsarSerializationSchema.java | 129 + .../PulsarSerializationSchemaWrapper.java | 59 ++ 5 files changed, 485 insertions(+) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessage.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessage.java new file mode 100644 index 000..0c45763 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessage.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer.message; + +import org.apache.flink.annotation.PublicEvolving; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Map; + +/** + * The message instance would be used for {@link TypedMessageBuilder}. We create this class because + * the Pulsar lacks such kind of POJO class. + */ +@PublicEvolving +public class PulsarMessage { + +@Nullable private final byte[] orderingKey; +@Nullable private final String key; +private final long eventTime; +private final Schema schema; +@Nullable private final T value; +@Nullable private final Map properties; +@Nullable private final Long sequenceId; +@Nullable private final List replicationClusters; +private final boolean disableReplication; + +/** Package private for building this class only in {@link PulsarMessageBuilder}. */ +PulsarMessage( +@Nullable byte[] orderingKey, +@Nullable String key, +long eventTime, +Schema schema, +@Nullable T value, +@Nullable Map properties, +@Nullable Long sequenceId, +@Nullable List replicationClusters, +boolean disableReplication) { +this.orderingKey = orderingKey; +this.key = key; +this.eventTime = eventTime; +this.schema = schema; +this.value = value; +this.properties = properties; +this.sequenceId = sequenceId; +this.replicationClusters = replicationClusters; +this.disableReplication = disableReplication; +} + +@Nullable +public byte[] getOrderingKey() { +return orderingKey; +} + +@Nullable +public String getKey() { +return key; +} + +public long getEventTime() { +return eventTime; +} + +public Schema getSchema() { +return schema; +} + +@Nullable +public T getValue() { +return value; +} + +@Nullable +public Map getProperties() { +return properties; +} + +@Nullable +public Long getSequenceId() { +return sequenceId; +} + +@Nullable +public List getReplicationClusters() { +return replicationClusters; +} + +public boolean isDisableReplication() { +return disableReplication; +} +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessageBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessageBuilder.java new file mode 100644 index 000..9330d09 --- /dev/null +++
[flink] 06/09: [FLINK-26022][connector/pulsar] Implement at-least-once and exactly-once Pulsar Sink.
This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 136add5d0c9c5b9b2869a9ee194f78449065b18e Author: Yufan Sheng AuthorDate: Tue Feb 15 22:22:19 2022 +0800 [FLINK-26022][connector/pulsar] Implement at-least-once and exactly-once Pulsar Sink. --- .../common/utils/PulsarTransactionUtils.java | 68 .../flink/connector/pulsar/sink/PulsarSink.java| 136 .../connector/pulsar/sink/PulsarSinkBuilder.java | 354 + .../connector/pulsar/sink/PulsarSinkOptions.java | 14 +- .../pulsar/sink/committer/PulsarCommittable.java | 71 + .../committer/PulsarCommittableSerializer.java | 65 .../pulsar/sink/committer/PulsarCommitter.java | 174 ++ .../pulsar/sink/config/SinkConfiguration.java | 17 +- .../connector/pulsar/sink/writer/PulsarWriter.java | 264 +++ .../sink/writer/context/PulsarSinkContext.java | 46 +++ .../sink/writer/context/PulsarSinkContextImpl.java | 61 .../sink/writer/router/KeyHashTopicRouter.java | 71 + .../pulsar/sink/writer/router/MessageKeyHash.java | 85 + .../sink/writer/router/RoundRobinTopicRouter.java | 63 .../pulsar/sink/writer/router/TopicRouter.java | 64 .../sink/writer/router/TopicRoutingMode.java | 87 + .../sink/writer/topic/TopicMetadataListener.java | 173 ++ .../sink/writer/topic/TopicProducerRegister.java | 202 18 files changed, 2011 insertions(+), 4 deletions(-) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java new file mode 100644 index 000..a48b4d4 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common.utils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.FlinkRuntimeException; + +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient; +import static org.apache.flink.util.ExceptionUtils.findThrowable; + +/** A suit of workarounds for the Pulsar Transaction. */ +@Internal +public final class PulsarTransactionUtils { + +private PulsarTransactionUtils() { +// No public constructor +} + +/** Create transaction with given timeout millis. */ +public static Transaction createTransaction(PulsarClient pulsarClient, long timeoutMs) { +try { +CompletableFuture future = +sneakyClient(pulsarClient::newTransaction) +.withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS) +.build(); + +return future.get(); +} catch (InterruptedException e) { +Thread.currentThread().interrupt(); +throw new IllegalStateException(e); +} catch (ExecutionException e) { +throw new FlinkRuntimeException(e); +} +} + +/** + * This is a bug in original {@link TransactionCoordinatorClientException#unwrap(Throwable)} + * method. Pulsar wraps the {@link ExecutionException} which hides the real execution exception. + */ +public static TransactionCoordinatorClientException unwrap( +TransactionCoordinatorClientException e) { +return findThrowable(e.getCause(), TransactionCoordinatorClientException.class).orElse(e); +} +} diff --git
[flink] 04/09: [FLINK-26023][connector/pulsar] Create a Pulsar sink config model for matching ProducerConfigurationData.
This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 9bc8b0f37bec419bcdc4b8cdee3abf5320df5399 Author: Yufan Sheng AuthorDate: Wed Feb 9 14:56:54 2022 +0800 [FLINK-26023][connector/pulsar] Create a Pulsar sink config model for matching ProducerConfigurationData. --- .../connector/pulsar/sink/PulsarSinkOptions.java | 259 + .../pulsar/sink/config/PulsarSinkConfigUtils.java | 112 + .../pulsar/sink/config/SinkConfiguration.java | 147 3 files changed, 518 insertions(+) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java new file mode 100644 index 000..0e16830 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.docs.ConfigGroup; +import org.apache.flink.annotation.docs.ConfigGroups; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.description.Description; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.pulsar.common.config.PulsarOptions; + +import org.apache.pulsar.client.api.CompressionType; + +import java.time.Duration; +import java.util.Map; + +import static java.util.Collections.emptyMap; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.flink.configuration.description.LinkElement.link; +import static org.apache.flink.configuration.description.TextElement.code; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PRODUCER_CONFIG_PREFIX; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.SINK_CONFIG_PREFIX; +import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_BATCHING_MAX_MESSAGES; +import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES; +import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS; + +/** + * Configurations for PulsarSink. All the options list here could be configured in {@code + * PulsarSinkBuilder#setConfig(ConfigOption, Object)}. The {@link PulsarOptions} is also required + * for pulsar source. + * + * @see PulsarOptions for shared configure options. + */ +@PublicEvolving +@ConfigGroups( +groups = { +@ConfigGroup(name = "PulsarSink", keyPrefix = SINK_CONFIG_PREFIX), +@ConfigGroup(name = "PulsarProducer", keyPrefix = PRODUCER_CONFIG_PREFIX) +}) +public final class PulsarSinkOptions { + +// Pulsar sink connector config prefix. +public static final String SINK_CONFIG_PREFIX = "pulsar.sink."; +// Pulsar producer API config prefix. +public static final String PRODUCER_CONFIG_PREFIX = "pulsar.producer."; + +private PulsarSinkOptions() { +// This is a constant class +} + + /// +// +// The configuration for pulsar sink part. +// All the configuration listed below should have the pulsar.sink prefix. +// + /// + +public static final ConfigOption PULSAR_WRITE_DELIVERY_GUARANTEE = +ConfigOptions.key(SINK_CONFIG_PREFIX + "deliveryGuarantee") +.enumType(DeliveryGuarantee.class) +.defaultValue(DeliveryGuarantee.NONE) +.withDescription("Optional delivery guarantee when committing."); + +public static final ConfigOption PULSAR_WRITE_TRANSACTION_TIMEOUT = +
[flink] 01/09: [FLINK-24246][connector/pulsar] Bump PulsarClient version to latest 2.9.1
This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 1602e4b7d26cf52cea993c410769b7b15a672aff Author: Yufan Sheng AuthorDate: Wed Feb 9 13:00:53 2022 +0800 [FLINK-24246][connector/pulsar] Bump PulsarClient version to latest 2.9.1 1. Bump the pulsar-client-all version in pom file. 2. Exclude useless dependencies for pulsar-client-all. 3. Bump the Pulsar docker version. 4. Change the dependencies to pass the tests. 5. Drop PulsarTransactionUtils and fix compile issues in tests. 6. Add bouncycastle to Pulsar e2e tests. --- flink-connectors/flink-connector-pulsar/pom.xml| 74 +++-- .../common/utils/PulsarTransactionUtils.java | 118 - .../split/PulsarUnorderedPartitionSplitReader.java | 3 +- .../PulsarDeserializationSchemaTest.java | 2 +- .../src/main/resources/META-INF/NOTICE | 16 +-- .../flink-end-to-end-tests-pulsar/pom.xml | 43 +++- .../FlinkContainerWithPulsarEnvironment.java | 5 + .../org/apache/flink/util/DockerImageVersions.java | 2 +- 8 files changed, 124 insertions(+), 139 deletions(-) diff --git a/flink-connectors/flink-connector-pulsar/pom.xml b/flink-connectors/flink-connector-pulsar/pom.xml index 87b6ba0..45047eb 100644 --- a/flink-connectors/flink-connector-pulsar/pom.xml +++ b/flink-connectors/flink-connector-pulsar/pom.xml @@ -36,12 +36,14 @@ under the License. jar - 2.8.0 + 2.9.1 0.6.1 - 3.11 - 1.33.0 + 3.11 + 3.6.3 + 4.1.72.Final + 1.33.0 @@ -138,12 +140,22 @@ under the License. ${pulsar.version} test + org.apache.commons commons-lang3 - ${commons-lang3.version} + ${pulsar-commons-lang3.version} + test + + + + + + org.apache.zookeeper + zookeeper + ${pulsar-zookeeper.version} test @@ -156,9 +168,41 @@ under the License. ${pulsar.version} + com.sun.activation + javax.activation + + + jakarta.activation + jakarta.activation-api + + + jakarta.ws.rs + jakarta.ws.rs-api + + + jakarta.xml.bind + jakarta.xml.bind-api + + + javax.validation + validation-api + + + javax.xml.bind + jaxb-api + + + net.jcip + jcip-annotations + + org.apache.pulsar pulsar-package-core + + com.beust + jcommander + @@ -171,13 +215,23 @@ under the License. - + + io.grpc grpc-bom - ${grpc.version} + ${pulsar-grpc.version} + pom + import + + + + + io.netty + netty-bom + ${pulsar-netty.version} pom import @@ -200,7 +254,9 @@ under the License.
[flink] 09/09: [FLINK-26038][connector/pulsar] Support delay message on PulsarSink.
This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 07f23e0d3383941ceb475cdd753a59d07100bdf5 Author: Yufan Sheng AuthorDate: Fri Feb 11 12:19:42 2022 +0800 [FLINK-26038][connector/pulsar] Support delay message on PulsarSink. --- .../flink/connector/pulsar/sink/PulsarSink.java| 13 - .../connector/pulsar/sink/PulsarSinkBuilder.java | 20 ++- .../connector/pulsar/sink/writer/PulsarWriter.java | 10 .../sink/writer/delayer/FixedMessageDelayer.java | 43 +++ .../pulsar/sink/writer/delayer/MessageDelayer.java | 62 ++ .../pulsar/sink/PulsarSinkBuilderTest.java | 1 - .../pulsar/sink/writer/PulsarWriterTest.java | 5 +- 7 files changed, 149 insertions(+), 5 deletions(-) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java index 811d5b5..4c6c4a9 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java @@ -28,6 +28,7 @@ import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittableSeriali import org.apache.flink.connector.pulsar.sink.committer.PulsarCommitter; import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; import org.apache.flink.connector.pulsar.sink.writer.PulsarWriter; +import org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer; import org.apache.flink.connector.pulsar.sink.writer.router.KeyHashTopicRouter; import org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter; import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter; @@ -82,6 +83,7 @@ public class PulsarSink implements TwoPhaseCommittingSink serializationSchema; private final TopicMetadataListener metadataListener; +private final MessageDelayer messageDelayer; private final TopicRouter topicRouter; PulsarSink( @@ -89,10 +91,12 @@ public class PulsarSink implements TwoPhaseCommittingSink serializationSchema, TopicMetadataListener metadataListener, TopicRoutingMode topicRoutingMode, -TopicRouter topicRouter) { +TopicRouter topicRouter, +MessageDelayer messageDelayer) { this.sinkConfiguration = checkNotNull(sinkConfiguration); this.serializationSchema = checkNotNull(serializationSchema); this.metadataListener = checkNotNull(metadataListener); +this.messageDelayer = checkNotNull(messageDelayer); checkNotNull(topicRoutingMode); // Create topic router supplier. @@ -119,7 +123,12 @@ public class PulsarSink implements TwoPhaseCommittingSink createWriter(InitContext initContext) { return new PulsarWriter<>( -sinkConfiguration, serializationSchema, metadataListener, topicRouter, initContext); +sinkConfiguration, +serializationSchema, +metadataListener, +topicRouter, +messageDelayer, +initContext); } @Internal diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java index a0352f5..1668e3d 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java @@ -25,6 +25,7 @@ import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.pulsar.common.config.PulsarConfigBuilder; import org.apache.flink.connector.pulsar.common.config.PulsarOptions; import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer; import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter; import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode; import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSchemaWrapper; @@ -101,6 +102,7 @@ public class PulsarSinkBuilder { private TopicMetadataListener metadataListener; private TopicRoutingMode topicRoutingMode; private TopicRouter topicRouter; +private MessageDelayer messageDelayer; // private builder constructor. PulsarSinkBuilder() { @@ -231,6 +233,17 @@ public class PulsarSinkBuilder { } /** + * Set a message delayer for enable Pulsar
[flink] 03/09: [FLINK-26021][connector/pulsar] Add the ability to merge the partitioned Pulsar topics.
This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit a195f729ba4d1ffe657d82888b26ab9db9121ab8 Author: Yufan Sheng AuthorDate: Wed Feb 9 14:47:28 2022 +0800 [FLINK-26021][connector/pulsar] Add the ability to merge the partitioned Pulsar topics. --- .../pulsar/source/PulsarSourceBuilder.java | 4 +- .../source/enumerator/topic/TopicNameUtils.java| 45 ++ .../enumerator/topic/TopicNameUtilsTest.java | 16 3 files changed, 64 insertions(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java index 0959b1b..b1f6250 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java @@ -29,6 +29,7 @@ import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator; import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; @@ -195,7 +196,8 @@ public final class PulsarSourceBuilder { */ public PulsarSourceBuilder setTopics(List topics) { ensureSubscriberIsNull("topics"); -this.subscriber = PulsarSubscriber.getTopicListSubscriber(topics); +List distinctTopics = TopicNameUtils.distinctTopics(topics); +this.subscriber = PulsarSubscriber.getTopicListSubscriber(distinctTopics); return this; } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java index 446622c..b5d814a 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java @@ -20,8 +20,17 @@ package org.apache.flink.connector.pulsar.source.enumerator.topic; import org.apache.flink.annotation.Internal; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; + import org.apache.pulsar.common.naming.TopicName; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + import static org.apache.flink.util.Preconditions.checkArgument; /** util for topic name. */ @@ -42,4 +51,40 @@ public final class TopicNameUtils { checkArgument(partitionId >= 0, "Illegal partition id %s", partitionId); return TopicName.get(topic).getPartition(partitionId).toString(); } + +public static boolean isPartitioned(String topic) { +return TopicName.get(topic).isPartitioned(); +} + +/** Merge the same topics into one topics. */ +public static List distinctTopics(List topics) { +Set fullTopics = new HashSet<>(); +Map> partitionedTopics = new HashMap<>(); + +for (String topic : topics) { +TopicName topicName = TopicName.get(topic); +String partitionedTopicName = topicName.getPartitionedTopicName(); + +if (!topicName.isPartitioned()) { +fullTopics.add(partitionedTopicName); +partitionedTopics.remove(partitionedTopicName); +} else if (!fullTopics.contains(partitionedTopicName)) { +List partitionIds = +partitionedTopics.computeIfAbsent( +partitionedTopicName, k -> new ArrayList<>()); +partitionIds.add(topicName.getPartitionIndex()); +} +} + +ImmutableList.Builder builder = ImmutableList.builder().addAll(fullTopics); + +for (Map.Entry> topicSet : partitionedTopics.entrySet()) { +String topicName = topicSet.getKey(); +for (Integer partitionId : topicSet.getValue()) { +builder.add(topicNameWithPartition(topicName, partitionId)); +
[flink] branch master updated (4f20772 -> 07f23e0)
This is an automated email from the ASF dual-hosted git repository. fpaul pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 4f20772 [FLINK-26148][runtime] Change the format of adaptive batch scheduler config option to jobmanager.adaptive-batch-scheduler.XXX new 1602e4b [FLINK-24246][connector/pulsar] Bump PulsarClient version to latest 2.9.1 new 36de46d [FLINK-26020][connector/pulsar] Unified Pulsar Connector config model for Pulsar source and sink. new a195f72 [FLINK-26021][connector/pulsar] Add the ability to merge the partitioned Pulsar topics. new 9bc8b0f [FLINK-26023][connector/pulsar] Create a Pulsar sink config model for matching ProducerConfigurationData. new 0e72bfed [FLINK-26024][connector/pulsar] Create a PulsarSerializationSchema for better records serialization. new 136add5 [FLINK-26022][connector/pulsar] Implement at-least-once and exactly-once Pulsar Sink. new b6be14d [FLINK-26025][connector/pulsar] Replace MockPulsar with new Pulsar test tools based on PulsarStandalone. new 714dd80 [FLINK-26026][connector/pulsar] Create unit tests for Pulsar sink connector. new 07f23e0 [FLINK-26038][connector/pulsar] Support delay message on PulsarSink. The 9 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../generated/pulsar_client_configuration.html | 8 +- .../generated/pulsar_source_configuration.html | 2 +- .../5b9eed8a-5fb6-4373-98ac-3be2a71941b8 | 11 - flink-connectors/flink-connector-pulsar/pom.xml| 90 +++-- ...arConfigUtils.java => PulsarClientFactory.java} | 209 --- .../pulsar/common/config/PulsarConfigBuilder.java | 143 +++ .../common/config/PulsarConfigValidator.java | 105 ++ .../pulsar/common/config/PulsarConfiguration.java | 104 ++ .../pulsar/common/config/PulsarOptions.java| 18 +- .../common/utils/PulsarTransactionUtils.java | 108 ++ .../flink/connector/pulsar/sink/PulsarSink.java| 145 .../connector/pulsar/sink/PulsarSinkBuilder.java | 372 ++ .../connector/pulsar/sink/PulsarSinkOptions.java | 269 + .../pulsar/sink/committer/PulsarCommittable.java | 71 .../committer/PulsarCommittableSerializer.java | 65 .../pulsar/sink/committer/PulsarCommitter.java | 174 + .../pulsar/sink/config/PulsarSinkConfigUtils.java | 112 ++ .../pulsar/sink/config/SinkConfiguration.java | 160 .../connector/pulsar/sink/writer/PulsarWriter.java | 274 ++ .../sink/writer/context/PulsarSinkContext.java | 46 +++ .../sink/writer/context/PulsarSinkContextImpl.java | 61 +++ .../writer/delayer/FixedMessageDelayer.java} | 25 +- .../pulsar/sink/writer/delayer/MessageDelayer.java | 62 +++ .../pulsar/sink/writer/message/PulsarMessage.java | 111 ++ .../sink/writer/message/PulsarMessageBuilder.java | 127 +++ .../sink/writer/router/KeyHashTopicRouter.java | 71 .../pulsar/sink/writer/router/MessageKeyHash.java | 85 + .../sink/writer/router/RoundRobinTopicRouter.java | 63 .../pulsar/sink/writer/router/TopicRouter.java | 64 .../sink/writer/router/TopicRoutingMode.java | 87 + .../writer/serializer/PulsarSchemaWrapper.java | 59 +++ .../serializer/PulsarSerializationSchema.java | 129 +++ .../PulsarSerializationSchemaWrapper.java | 59 +++ .../sink/writer/topic/TopicMetadataListener.java | 173 + .../sink/writer/topic/TopicProducerRegister.java | 202 ++ .../connector/pulsar/source/PulsarSource.java | 27 +- .../pulsar/source/PulsarSourceBuilder.java | 122 +++--- .../pulsar/source/PulsarSourceOptions.java | 12 +- .../pulsar/source/config/CursorVerification.java | 23 +- .../source/config/PulsarSourceConfigUtils.java | 138 +++ .../pulsar/source/config/SourceConfiguration.java | 190 ++ .../source/enumerator/PulsarSourceEnumerator.java | 18 +- .../source/enumerator/SplitsAssignmentState.java | 2 +- .../cursor/stop/LatestMessageStopCursor.java | 1 + .../source/enumerator/topic/TopicNameUtils.java| 45 +++ .../source/enumerator/topic/TopicPartition.java| 4 +- .../enumerator/topic/range/RangeGenerator.java | 8 + .../source/reader/PulsarSourceReaderFactory.java | 19 +- .../deserializer/PulsarDeserializationSchema.java | 9 + .../PulsarDeserializationSchemaWrapper.java| 4 +- .../reader/deserializer/PulsarSchemaWrapper.java | 12 +- .../reader/source/PulsarOrderedSourceReader.java | 5 +- .../reader/source/PulsarSourceReaderBase.java | 4 +- .../reader/source/PulsarUnorderedSourceReader.java | 3 -
[flink] branch master updated (3efd4c2 -> 4f20772)
This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 3efd4c2 [FLINK-26117][runtime] Removes GloballyCleanableResource from JobManagerRunnerRegistry add 4f20772 [FLINK-26148][runtime] Change the format of adaptive batch scheduler config option to jobmanager.adaptive-batch-scheduler.XXX No new revisions were added by this update. Summary of changes: .../generated/all_jobmanager_section.html | 48 +++--- .../generated/expert_scheduling_section.html | 46 ++--- .../generated/job_manager_configuration.html | 48 +++--- .../flink/configuration/JobManagerOptions.java | 8 ++-- 4 files changed, 75 insertions(+), 75 deletions(-)
[flink-statefun-playground] branch release-3.2 updated: [hotfix] Update go.sum file
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-3.2 in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git The following commit(s) were added to refs/heads/release-3.2 by this push: new 1ff4492 [hotfix] Update go.sum file 1ff4492 is described below commit 1ff449204b367e6dd0d0818ca76a5283890ce2c5 Author: Till Rohrmann AuthorDate: Wed Feb 16 09:54:23 2022 +0100 [hotfix] Update go.sum file --- go/greeter/go.sum | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go/greeter/go.sum b/go/greeter/go.sum index c35cbea..b343533 100644 --- a/go/greeter/go.sum +++ b/go/greeter/go.sum @@ -1,5 +1,7 @@ github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.1.0 h1:uE56xfgn4c/ytXppcW/NQUNxtPM8NpvkbU/VFMuaXN4= github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.1.0/go.mod h1:uHiPJsi71a161NMH/ISkkSPIXenkcG9A2m+uhT8UlJ4= +github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.2.0 h1:OfLhhWnnOfBUvzbQuhE7hCKJdlBW41nV3CfCF/q7UJs= +github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.2.0/go.mod h1:uHiPJsi71a161NMH/ISkkSPIXenkcG9A2m+uhT8UlJ4= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
[flink] branch master updated (b64a838 -> 3efd4c2)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from b64a838 [FLINK-26107][runtime] OperatorCoordinator uses failure handler of the current state add 6cdb97c4 [hotfix][runtime] Adds & fixes JavaDoc add 5b06297 [FLINK-26117][runtime] Introduces ofLocalResource in DispatcherResourceCleanerFactory add 3efd4c2 [FLINK-26117][runtime] Removes GloballyCleanableResource from JobManagerRunnerRegistry No new revisions were added by this update. Summary of changes: .../DefaultJobManagerRunnerRegistry.java | 9 -- .../dispatcher/JobManagerRunnerRegistry.java | 4 +- .../OnMainThreadJobManagerRunnerRegistry.java | 6 -- .../dispatcher/cleanup/DefaultResourceCleaner.java | 14 .../cleanup/DispatcherResourceCleanerFactory.java | 17 +++- .../cleanup/GloballyCleanableResource.java | 8 +- .../cleanup/LocallyCleanableResource.java | 13 +-- .../dispatcher/DispatcherCleanupITCase.java| 2 +- .../TestingJobManagerRunnerRegistry.java | 34 ++-- .../DispatcherResourceCleanerFactoryTest.java | 98 ++ 10 files changed, 94 insertions(+), 111 deletions(-)
[flink] branch master updated (540d876 -> b64a838)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 540d876 [FLINK-26017][runtime] Adds log message to when a job is marked as dirty add b64a838 [FLINK-26107][runtime] OperatorCoordinator uses failure handler of the current state No new revisions were added by this update. Summary of changes: .../coordination/OperatorCoordinatorHolder.java| 17 +-- .../DefaultOperatorCoordinatorHandler.java | 7 +- ...catorFactory.java => GlobalFailureHandler.java} | 15 ++- .../flink/runtime/scheduler/SchedulerNG.java | 4 +- .../scheduler/adaptive/CreatingExecutionGraph.java | 37 +- .../flink/runtime/scheduler/adaptive/State.java| 10 +- .../OperatorCoordinatorHolderTest.java | 4 +- .../adaptive/CreatingExecutionGraphTest.java | 132 + .../TestingOperatorCoordinatorHandler.java | 4 +- 9 files changed, 166 insertions(+), 64 deletions(-) copy flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/{ExecutionSlotAllocatorFactory.java => GlobalFailureHandler.java} (65%)
[flink] branch master updated (70ee694 -> 540d876)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 70ee694 [FLINK-26174][kinesis] Add Internal annotation to KinesisDataStreamsSink#restore add 5575c9a [FLINK-25973][runtime] Renamed ArchivedExecutionGraph.createFromInitializingJob into createSparseArchivedExecutionGraph add f1e7d12 [FLINK-25978][runtime] Removes obsolete cleanup tests from DispatcherTest add 540d876 [FLINK-26017][runtime] Adds log message to when a job is marked as dirty No new revisions were added by this update. Summary of changes: .../flink/runtime/dispatcher/Dispatcher.java | 5 +- .../cleanup/CheckpointResourcesCleanupRunner.java | 2 +- .../executiongraph/ArchivedExecutionGraph.java | 6 +- .../DefaultJobMasterServiceProcessFactory.java | 2 +- .../scheduler/adaptive/AdaptiveScheduler.java | 2 +- .../dispatcher/DispatcherResourceCleanupTest.java | 2 +- .../flink/runtime/dispatcher/DispatcherTest.java | 85 +- .../executiongraph/ArchivedExecutionGraphTest.java | 4 +- .../DefaultJobMasterServiceProcessTest.java| 2 +- .../JobMasterServiceLeadershipRunnerTest.java | 2 +- .../runtime/jobmaster/TestingJobManagerRunner.java | 2 +- .../TestingJobMasterServiceProcessFactory.java | 2 +- .../TestingJobMasterServiceProcessFactoryOld.java | 2 +- .../runtime/scheduler/ExecutionGraphInfoTest.java | 2 +- .../runtime/scheduler/adaptive/CreatedTest.java| 2 +- .../adaptive/CreatingExecutionGraphTest.java | 2 +- 16 files changed, 23 insertions(+), 101 deletions(-)
[flink-statefun-playground] branch dev updated: [hotfix] Update README.md
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git The following commit(s) were added to refs/heads/dev by this push: new f4b4d13 [hotfix] Update README.md f4b4d13 is described below commit f4b4d13f90cef6f5da416e7ec6c511a40a64566e Author: Till Rohrmann AuthorDate: Wed Feb 16 09:37:50 2022 +0100 [hotfix] Update README.md --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index e5c9173..9455be5 100644 --- a/README.md +++ b/README.md @@ -4,13 +4,15 @@ This repository contains tutorials and examples for [Stateful Functions](https:/ ## Tutorials and Examples -The repository contains tutorials and examples for all SDK that Stateful Functions supports: +The repository contains tutorials and examples for all SDKs that Stateful Functions supports: - [`Java SDK`](java) - [`Go SDK`](go) - [`JavaScript SDK`](javascript) - [`Python SDK`](python) +Each tutorial or example will have it's own `README` that explains in detail what is being covered and how to build and run the code by yourself. + Moreover, it contains examples for [how to deploy Stateful Functions](deployments) on various platforms. ## Code of Conduct
[flink-statefun-playground] branch release-3.2 updated: [hotfix] Update README.md
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-3.2 in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git The following commit(s) were added to refs/heads/release-3.2 by this push: new fba363f [hotfix] Update README.md fba363f is described below commit fba363f366ddd654146818fb0101c643cb130508 Author: Till Rohrmann AuthorDate: Wed Feb 16 09:37:50 2022 +0100 [hotfix] Update README.md --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index e5c9173..9455be5 100644 --- a/README.md +++ b/README.md @@ -4,13 +4,15 @@ This repository contains tutorials and examples for [Stateful Functions](https:/ ## Tutorials and Examples -The repository contains tutorials and examples for all SDK that Stateful Functions supports: +The repository contains tutorials and examples for all SDKs that Stateful Functions supports: - [`Java SDK`](java) - [`Go SDK`](go) - [`JavaScript SDK`](javascript) - [`Python SDK`](python) +Each tutorial or example will have it's own `README` that explains in detail what is being covered and how to build and run the code by yourself. + Moreover, it contains examples for [how to deploy Stateful Functions](deployments) on various platforms. ## Code of Conduct
[flink-statefun-playground] branch release-3.2 updated: [hotfix] Update dev/README.md to contain an introduction
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-3.2 in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git The following commit(s) were added to refs/heads/release-3.2 by this push: new 4042bae [hotfix] Update dev/README.md to contain an introduction 4042bae is described below commit 4042bae649d59693e93617e7798a6b565832bf58 Author: Till Rohrmann AuthorDate: Wed Feb 16 09:34:44 2022 +0100 [hotfix] Update dev/README.md to contain an introduction --- README.md | 21 + 1 file changed, 21 insertions(+) diff --git a/README.md b/README.md index 6459b93..e5c9173 100644 --- a/README.md +++ b/README.md @@ -1 +1,22 @@ # Stateful Functions Playground + +This repository contains tutorials and examples for [Stateful Functions](https://statefun.io). The material here is a great starting point if you've just started with getting to know the project or looking for specific examples of common usage patterns. + +## Tutorials and Examples + +The repository contains tutorials and examples for all SDK that Stateful Functions supports: + +- [`Java SDK`](java) +- [`Go SDK`](go) +- [`JavaScript SDK`](javascript) +- [`Python SDK`](python) + +Moreover, it contains examples for [how to deploy Stateful Functions](deployments) on various platforms. + +## Code of Conduct + +Apache Flink, Stateful Functions, and all its associated repositories follow the [Code of Conduct of the Apache Software Foundation](https://www.apache.org/foundation/policies/conduct). + +## License + +The code in this repository is licensed under the [Apache Software License 2.0](LICENSE).
[flink-statefun-playground] branch dev updated: [hotfix] Update dev/README.md to contain an introduction
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git The following commit(s) were added to refs/heads/dev by this push: new 586230a [hotfix] Update dev/README.md to contain an introduction 586230a is described below commit 586230ac37b567475e2143621dbf765ae8b866e8 Author: Till Rohrmann AuthorDate: Wed Feb 16 09:34:44 2022 +0100 [hotfix] Update dev/README.md to contain an introduction --- README.md | 21 + 1 file changed, 21 insertions(+) diff --git a/README.md b/README.md index 6459b93..e5c9173 100644 --- a/README.md +++ b/README.md @@ -1 +1,22 @@ # Stateful Functions Playground + +This repository contains tutorials and examples for [Stateful Functions](https://statefun.io). The material here is a great starting point if you've just started with getting to know the project or looking for specific examples of common usage patterns. + +## Tutorials and Examples + +The repository contains tutorials and examples for all SDK that Stateful Functions supports: + +- [`Java SDK`](java) +- [`Go SDK`](go) +- [`JavaScript SDK`](javascript) +- [`Python SDK`](python) + +Moreover, it contains examples for [how to deploy Stateful Functions](deployments) on various platforms. + +## Code of Conduct + +Apache Flink, Stateful Functions, and all its associated repositories follow the [Code of Conduct of the Apache Software Foundation](https://www.apache.org/foundation/policies/conduct). + +## License + +The code in this repository is licensed under the [Apache Software License 2.0](LICENSE).
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #1: [FLINK-26078] Kubernetes Operator Prototype
gyfora commented on pull request #1: URL: https://github.com/apache/flink-kubernetes-operator/pull/1#issuecomment-1041240904 @asardaes At the moment we allow users to provide a configmap containing the default Flink and logging configuration for log4j and this can be overriden from the deployment spec. We haven't focused too much effort on making this more flexible yet but we should definitely do soon. I will open a jira ticket for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #1: [FLINK-26078] Kubernetes Operator Prototype
gyfora commented on pull request #1: URL: https://github.com/apache/flink-kubernetes-operator/pull/1#issuecomment-1041236694 @FuyaoLi2017 managing multiple jobs especially on a cluster that is intended to be at least partially controlled by user (session cluster) seems to be extremely difficult. Since the user submits the job, it is already difficult to identify a job as multiple jobs can have the same names etc. I feel that this is beyond what the operator should do at this point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #1: [FLINK-26078] Kubernetes Operator Prototype
gyfora commented on a change in pull request #1: URL: https://github.com/apache/flink-kubernetes-operator/pull/1#discussion_r807662376 ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.crd.status; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** Status of an individual job within the Flink deployment. */ +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class JobStatus { +private String jobName; +private String jobId; +private String state; Review comment: Opened a jira ticket for this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[flink] branch master updated: [FLINK-26174][kinesis] Add Internal annotation to KinesisDataStreamsSink#restore
This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 70ee694 [FLINK-26174][kinesis] Add Internal annotation to KinesisDataStreamsSink#restore 70ee694 is described below commit 70ee694c62d6762de4a909988c6399bfede28c62 Author: Fabian Paul AuthorDate: Wed Feb 16 09:06:03 2022 +0100 [FLINK-26174][kinesis] Add Internal annotation to KinesisDataStreamsSink#restore This closes #18795. --- .../org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java index 6252bfe..d8288f7 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java +++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java @@ -137,6 +137,7 @@ public class KinesisDataStreamsSink extends AsyncSinkBase> restoreWriter( InitContext context,
[flink-statefun-playground] 02/06: [FLINK-26158] Update java/connected-components example to use playground ingress/egress
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-3.2 in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git commit 7f68b7a299f9d68a5966059220ab152e158a069e Author: Till Rohrmann AuthorDate: Tue Feb 15 12:41:34 2022 +0100 [FLINK-26158] Update java/connected-components example to use playground ingress/egress --- java/connected-components/README.md| 30 ++- java/connected-components/docker-compose.yml | 45 + java/connected-components/module.yaml | 22 +-- .../connectedcomponents/ConnectedComponentsFn.java | 204 +++-- .../connectedcomponents/types/EgressRecord.java| 28 +++ .../java/connectedcomponents/types/Types.java | 29 +-- .../java/connectedcomponents/types/Vertex.java | 23 ++- .../types/VertexComponentChange.java | 52 +++--- java/connected-components/vertices.txt | 12 -- 9 files changed, 224 insertions(+), 221 deletions(-) diff --git a/java/connected-components/README.md b/java/connected-components/README.md index 17cfa82..97b3c0c 100644 --- a/java/connected-components/README.md +++ b/java/connected-components/README.md @@ -9,8 +9,6 @@ This example works with Docker Compose, and runs a few services that build up an - Functions service that runs your functions and expose them through an HTTP endpoint. - StateFun runtime processes (a manager plus workers) that will handle ingress, egress, and inter-function messages as well as function state storage in a consistent and fault-tolerant manner. -- Apache Kafka broker for the application ingress and egress. StateFun currently natively supports AWS Kinesis as well, - and you can also extend to connect with other systems. To motivate this example, we'll implement a [connected components](https://en.wikipedia.org/wiki/Component_(graph_theory) algorithm on top of Stateful Functions. The program has one function - a `ConnectedComponentsFn` that consumes `Vertex` JSON events from an ingress and communicates with its neighbours to find the minimal component id. @@ -21,7 +19,6 @@ Changes of the component id of a vertex are being output via an egress. - `src/`, `pom.xml` and `Dockerfile`: These files and directories are the contents of a Java Maven project which builds our functions service, hosting the `ConnectedComponentsFn` behind a HTTP endpoint. Check out the source code under `src/main/java`. The `Dockerfile` is used to build a Docker image for our functions service. -- `vertices.txt`: A file with multiple JSON objects per line; this is used as test events produced to our application ingress. - `module.yaml`: The [Module Specification](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/deployment/module/) file to be mounted to the StateFun runtime process containers. This configures a few things for a StateFun application, such as the service endpoints of the application's functions, as well as definitions of [Ingresses and Egresses](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/io-module/overview/) which the application will use. @@ -40,7 +37,7 @@ First, lets build the example. From this directory, execute: $ docker-compose build ``` -This pulls all the necessary Docker images (StateFun and Kafka), and also builds the functions service image. This can +This pulls all the necessary Docker images (StateFun), and also builds the functions service image. This can take a few minutes as it also needs to build the function's Java project. Afterward the build completes, start running all the services: @@ -51,12 +48,33 @@ $ docker-compose up ## Play around! -You can take a look at what messages are being sent to the Kafka egress: +The connected components applications allows you to do the following actions: + +* Add a new vertex to the graph via sending a `Vertex` message to the `vertex` function + +In order to send messages to the Stateful Functions application you can run: + +``` +$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "1", "neighbours": ["2", "3"]}' localhost:8090/connected-components.fns/vertex/1 +$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "2", "neighbours": ["1", "4"]}' localhost:8090/connected-components.fns/vertex/2 +$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "3", "neighbours": ["1"]}' localhost:8090/connected-components.fns/vertex/3 +$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "4", "neighbours": ["2"]}' localhost:8090/connected-components.fns/vertex/4 +``` + +You can take a look at what messages are being sent to the Playground egress: ``` -$ docker-compose exec kafka rpk topic consume
[flink-statefun-playground] 06/06: [FLINK-26158] Update python/greeter example to use playground ingress/egress
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-3.2 in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git commit 3d2a1e4a8865c39530ec2ac2b3a2bb2c83336d11 Author: Till Rohrmann AuthorDate: Tue Feb 15 15:27:55 2022 +0100 [FLINK-26158] Update python/greeter example to use playground ingress/egress This closes #26. --- python/greeter/README.md | 37 +-- python/greeter/docker-compose.yml | 46 +++ python/greeter/functions.py | 13 +++ python/greeter/input-example.json | 2 -- python/greeter/module.yaml| 21 +- 5 files changed, 43 insertions(+), 76 deletions(-) diff --git a/python/greeter/README.md b/python/greeter/README.md index 0a5aad8..14b6682 100644 --- a/python/greeter/README.md +++ b/python/greeter/README.md @@ -5,9 +5,7 @@ This is a simple example of a stateful functions application implemented in `Pyt In this example, we imagine a service that computes personalized greetings. Our service, consist out of the following components: -* `kafka ingress` - This component forwards messages produced to the `names` kafka topic, -to the `person` stateful function. Messages produced to this topic has the following -schema `{ "name" : "bob"}`. +* `playground ingress` - Ingestion point for messages. Messages are sent to the specified target function. * `person` - This function is triggered by the ingress defined above. This function keeps track of the number of visits, and triggers the next functions: @@ -15,7 +13,7 @@ This function keeps track of the number of visits, and triggers the next functio * `greeter` - This function, computes a personalized greeting, based on the name and the number of visits of that user. The output of that computation is forward to a Kafka egress defined below. -* `kafka egress` - This wraps a Kafka producer that emits `utf-8` greetings to the `greetings` Kafka topic. +* `playground egress` - Queryable endpoint that collects the emitted greetings in the `greetings` topic. The greeting is `utf-8` encoded. ![Flow](arch.png "Flow") @@ -23,25 +21,40 @@ of visits of that user. The output of that computation is forward to a Kafka egr ## Running the example ``` -docker-compose build -docker-compose up +$ docker-compose build +$ docker-compose up ``` -To observe the customized greeting, as they appear in the `greetings` Kafka topic, run in a separate terminal: +## Play around! + +The greeter application allows you to do the following actions: + +* Create a greeting for a user via sending a `GreetRequest` message to the `person` function + +In order to send messages to the Stateful Functions application you can run: ``` -docker-compose exec kafka rpk topic consume greetings +$ curl -X PUT -H "Content-Type: application/vnd.example/GreetRequest" -d '{"name": "Bob"}' localhost:8090/example/person/Bob ``` -Try adding few more input lines to [input-example.json](input-example.json), and restart -the producer service. +To consume the customized greeting, as they appear in the `greetings` playground topic, run in a separate terminal: ``` -docker-compose restart producer +$ curl -X GET localhost:8091/greetings ``` +### Messages + +The messages are expected to be encoded as JSON. + +* `GreetRequest`: `{"name": "Bob"}`, `name` is the id of the `person` function + +## What's next? + Feeling curious? add the following print to the `person` function at [functions.py](functions.py): -```print(f"Hello there {context.address.id}!", flush=True)```. +``` +print(f"Hello there {context.address.id}!", flush=True) +``` Then, rebuild and restart only the `functions` service. diff --git a/python/greeter/docker-compose.yml b/python/greeter/docker-compose.yml index 8e4e8a3..2350ca2 100644 --- a/python/greeter/docker-compose.yml +++ b/python/greeter/docker-compose.yml @@ -34,52 +34,12 @@ services: ### statefun: -image: apache/flink-statefun-playground:3.2.0 +image: apache/flink-statefun-playground:3.2.0-1.0 ports: - "8081:8081" + - "8090:8090" + - "8091:8091" depends_on: - - kafka - functions volumes: - ./module.yaml:/module.yaml - - ### - #Kafka for ingress and egress - ### - - kafka: -image: docker.vectorized.io/vectorized/redpanda:v21.8.1 -command: - - redpanda start - - --smp 1 - - --memory 512M - - --overprovisioned - - --set redpanda.default_topic_replications=1 - - --set redpanda.auto_create_topics_enabled=true - - --kafka-addr INSIDE://0.0.0.0:9094,OUTSIDE://0.0.0.0:9092 - - --advertise-kafka-addr INSIDE://kafka:9094,OUTSIDE://kafka:9092 -
[flink-statefun-playground] 03/06: [FLINK-26158] Update java/shopping-cart to use playground ingress/egress
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-3.2 in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git commit 2815d553d200246282b65256d313d8e564fc76ff Author: Till Rohrmann AuthorDate: Tue Feb 15 14:48:24 2022 +0100 [FLINK-26158] Update java/shopping-cart to use playground ingress/egress --- java/shopping-cart/README.md | 49 ++-- java/shopping-cart/docker-compose.yml | 28 ++-- java/shopping-cart/module.yaml | 52 +++--- java/shopping-cart/playthrough/scenario_1.sh | 37 --- java/shopping-cart/playthrough/utils.sh| 14 -- .../playground/java/shoppingcart/Identifiers.java | 3 +- .../playground/java/shoppingcart/Messages.java | 31 + .../java/shoppingcart/UserShoppingCartFn.java | 10 ++--- 8 files changed, 82 insertions(+), 142 deletions(-) diff --git a/java/shopping-cart/README.md b/java/shopping-cart/README.md index c1877ed..71608ce 100644 --- a/java/shopping-cart/README.md +++ b/java/shopping-cart/README.md @@ -13,7 +13,6 @@ If you are new to stateful functions, we recommend you to first look at a more s configures a few things for a StateFun application, such as the service endpoints of the application's functions, as well as definitions of [Ingresses and Egresses](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.2/docs/io-module/overview/) which the application will use. - `docker-compose.yml`: Docker Compose file to spin up everything. -- `playthrough`: utilities for automatically playing through the interactions scenarios. ## Prerequisites @@ -26,8 +25,6 @@ This example works with Docker Compose, and runs a few services that build up an - Functions service that runs your functions and expose them through an HTTP endpoint. - StateFun runtime processes (a manager plus workers) that will handle ingress, egress, and inter-function messages as well as function state storage in a consistent and fault-tolerant manner. -- Apache Kafka broker for the application ingress and egress. StateFun currently natively supports AWS Kinesis as well, - and you can also extend to connect with other systems. To build the example, execute: @@ -36,7 +33,7 @@ cd java/shopping-cart docker-compose build ``` -This pulls all the necessary Docker images (StateFun and Kafka), and also builds the functions service image. This can +This pulls all the necessary Docker images (StateFun), and also builds the functions service image. This can take a few minutes as it also needs to build the function's Java project. Afterward the build completes, start running all the services: @@ -47,24 +44,50 @@ docker-compose up ## Play around! -The `playground` folder contains scenario(s) and utilities which allow you to easily execute a set of steps that emulate interactions with the stateful functions. +The shopping cart examples allows you to do the following actions: + +* Stock items up via sending a `RestockItem` message to the `stock` function +* Add items to a cart via sending a `AddToCart` message to the `user-shopping-cart` function +* Checkout the cart via sending a `Checkout` message to the `user-shopping-cart` function +* Clear the cart via sending a `ClearCart` message to the `user-shopping-cart` function + +### Example scenario + +The example scenario adds a socks item to the stock. -In order to run a scenario, execute: ``` -cd java/shopping-cart/playthrough -./scenario_1.sh +$ curl -X PUT -H "Content-Type: application/vnd.com.example/RestockItem" -d '{"itemId": "socks", "quantity": 50}' localhost:8090/com.example/stock/socks ``` -It will send a series of messages, results of which you can observe in the logs of the `shopping-cart-functions` component: +Then we add this item to a user cart and check it out. + ``` -docker-compose logs -f shopping-cart-functions +$ curl -X PUT -H "Content-Type: application/vnd.com.example/AddToCart" -d '{"userId": "1", "quantity": 3, "itemId": "socks"}' localhost:8090/com.example/user-shopping-cart/1 +$ curl -X PUT -H "Content-Type: application/vnd.com.example/Checkout" -d '{"userId": "1"}' localhost:8090/com.example/user-shopping-cart/1 +``` + +The receipt can then be observed by reading from the egress. + +``` +$ curl -X GET localhost:8091/receipts ``` -Note: `Caller: Optional.empty` in the logs corresponds to the messages that came via an ingress rather than from another stateful function. -To see the results produced to the egress: +The scenario will send a series of messages, results of which you can observe in the logs of the `shopping-cart-functions` component: ``` -docker-compose exec kafka rpk topic consume receipts' +docker-compose logs -f shopping-cart-functions ``` +Note: `Caller: Optional.empty` in the logs corresponds to the messages that came via an
[flink-statefun-playground] branch release-3.2 updated (c5335e8 -> 3d2a1e4)
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a change to branch release-3.2 in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git. from c5335e8 [hotfix] Apply spotless on statefun-playground-entrypoint new fdb0e78 [FLINK-26158] Update java/greeter example to use playground ingress/egress new 7f68b7a [FLINK-26158] Update java/connected-components example to use playground ingress/egress new 2815d55 [FLINK-26158] Update java/shopping-cart to use playground ingress/egress new 56e897f [FLINK-26158] Update go/greeter to use playground ingress/egress new 2c8820d [FLINK-26158] Update javascript/greeter example to use playground ingress/egress new 3d2a1e4 [FLINK-26158] Update python/greeter example to use playground ingress/egress The 6 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: go/greeter/README.md | 37 ++-- go/greeter/docker-compose.yml | 46 + go/greeter/greeter.go | 22 ++- go/greeter/input-example.json | 2 - go/greeter/module.yaml | 25 +-- java/connected-components/README.md| 30 ++- java/connected-components/docker-compose.yml | 45 + java/connected-components/module.yaml | 22 +-- .../connectedcomponents/ConnectedComponentsFn.java | 204 +++-- .../connectedcomponents/types/EgressRecord.java| 28 +++ .../java/connectedcomponents/types/Types.java | 29 +-- .../java/connectedcomponents/types/Vertex.java | 23 ++- .../types/VertexComponentChange.java | 52 +++--- java/connected-components/vertices.txt | 12 -- java/greeter/README.md | 27 ++- java/greeter/docker-compose.yml| 45 + java/greeter/module.yaml | 22 +-- .../playground/java/greeter/GreetingsFn.java | 16 +- .../java/greeter/types/EgressRecord.java | 28 +++ .../playground/java/greeter/types/Types.java | 8 +- java/greeter/user-logins.txt | 74 java/shopping-cart/README.md | 49 +++-- java/shopping-cart/docker-compose.yml | 28 +-- java/shopping-cart/module.yaml | 52 +- java/shopping-cart/playthrough/scenario_1.sh | 37 java/shopping-cart/playthrough/utils.sh| 14 -- .../playground/java/shoppingcart/Identifiers.java | 3 +- .../playground/java/shoppingcart/Messages.java | 31 .../java/shoppingcart/UserShoppingCartFn.java | 10 +- javascript/greeter/README.md | 37 ++-- javascript/greeter/docker-compose.yml | 46 + javascript/greeter/functions.js| 23 +-- javascript/greeter/input-example.json | 2 - javascript/greeter/module.yaml | 21 +-- python/greeter/README.md | 37 ++-- python/greeter/docker-compose.yml | 46 + python/greeter/functions.py| 13 +- python/greeter/input-example.json | 2 - python/greeter/module.yaml | 21 +-- 39 files changed, 519 insertions(+), 750 deletions(-) delete mode 100644 go/greeter/input-example.json create mode 100644 java/connected-components/src/main/java/org/apache/flink/statefun/playground/java/connectedcomponents/types/EgressRecord.java delete mode 100644 java/connected-components/vertices.txt create mode 100644 java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/EgressRecord.java delete mode 100644 java/greeter/user-logins.txt delete mode 100755 java/shopping-cart/playthrough/scenario_1.sh delete mode 100644 java/shopping-cart/playthrough/utils.sh delete mode 100644 javascript/greeter/input-example.json delete mode 100644 python/greeter/input-example.json
[flink-statefun-playground] 01/06: [FLINK-26158] Update java/greeter example to use playground ingress/egress
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-3.2 in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git commit fdb0e787c7b6e7d547fa279e076cc992836500fd Author: Till Rohrmann AuthorDate: Sat Feb 12 17:48:17 2022 +0100 [FLINK-26158] Update java/greeter example to use playground ingress/egress --- java/greeter/README.md | 27 ++-- java/greeter/docker-compose.yml| 45 + java/greeter/module.yaml | 22 ++- .../playground/java/greeter/GreetingsFn.java | 16 ++--- .../java/greeter/types/EgressRecord.java | 28 .../playground/java/greeter/types/Types.java | 8 ++- java/greeter/user-logins.txt | 74 -- 7 files changed, 73 insertions(+), 147 deletions(-) diff --git a/java/greeter/README.md b/java/greeter/README.md index 739b5f5..07e09bd 100644 --- a/java/greeter/README.md +++ b/java/greeter/README.md @@ -9,8 +9,6 @@ This example works with Docker Compose, and runs a few services that build up an - Functions service that runs your functions and expose them through an HTTP endpoint. - StateFun runtime processes (a manager plus workers) that will handle ingress, egress, and inter-function messages as well as function state storage in a consistent and fault-tolerant manner. -- Apache Kafka broker for the application ingress and egress. StateFun currently natively supports AWS Kinesis as well, - and you can also extend to connect with other systems. To motivate this example, we'll implement a simple user greeter application, which has two functions - a `UserFn` that expects `UserLogin` JSON events from an ingress and keeps in state storage information about users, and a `GreetingsFn` @@ -21,7 +19,6 @@ that accepts user information to generate personalized greeting messages that ar - `src/`, `pom.xml` and `Dockerfile`: These files and directories are the contents of a Java Maven project which builds our functions service, hosting the `UserFn` and `UserLogin` behind a HTTP endpoint. Check out the source code under `src/main/java`. The `Dockerfile` is used to build a Docker image for our functions service. -- `user-logins.txt`: A file with multiple JSON objects per line; this is used as test events produced to our application ingress. - `module.yaml`: The [Module Specification](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.2/docs/deployment/module/) file to be mounted to the StateFun runtime process containers. This configures a few things for a StateFun application, such as the service endpoints of the application's functions, as well as definitions of [Ingresses and Egresses](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.2/docs/io-module/overview/) which the application will use. @@ -40,7 +37,7 @@ First, lets build the example. From this directory, execute: $ docker-compose build ``` -This pulls all the necessary Docker images (StateFun and Kafka), and also builds the functions service image. This can +This pulls all the necessary Statefun Docker image, and also builds the functions service image. This can take a few minutes as it also needs to build the function's Java project. Afterward the build completes, start running all the services: @@ -51,12 +48,30 @@ $ docker-compose up ## Play around! -You can take a look at what messages are being sent to the Kafka egress: +The greeter application allows you to do the following actions: + +* Create a greeting for a user via sending a `UserLogin` message to the `user` function + +In order to send messages to the Stateful Functions application you can run: + +``` +$ curl -X PUT -H "Content-Type: application/vnd.greeter.types/UserLogin" -d '{"user_id": "1", "user_name": "Joe", "login_type": "WEB"}' localhost:8090/greeter.fns/user/1 +``` + +You can take a look at what messages are being sent to the Playground egress: ``` -$ docker-compose exec kafka rpk topic consume greetings +$ curl -X GET localhost:8091/greetings ``` +### Messages + +The messages are expected to be encoded as JSON. + +* `UserLogin`: `{"user_id": "1", "user_name": "Joe", "login_type": "WEB"}`, `user_id` is the id of the `user` function + +## What's next? + You can also try modifying the function code in the `src/main/java` directory, and do a zero-downtime upgrade of the functions. Some ideas you can try out: - Add some more state to be persisted by the `UserFn`. For example, let it additionally keep track of the user's previous login location. diff --git a/java/greeter/docker-compose.yml b/java/greeter/docker-compose.yml index cc2e1b1..b682f37 100644 --- a/java/greeter/docker-compose.yml +++ b/java/greeter/docker-compose.yml @@ -35,51 +35,12 @@ services: ### statefun: -