[jira] [Commented] (SAMZA-1482) Restart or fail Samza jobs in YARN when input topic partition changes
[ https://issues.apache.org/jira/browse/SAMZA-1482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257775#comment-16257775 ] ASF GitHub Bot commented on SAMZA-1482: --- GitHub user nickpan47 opened a pull request: https://github.com/apache/samza/pull/363 SAMZA-1482: add config documentation for auto-restart/fail behavior o… …n partition count changes You can merge this pull request into a Git repository by running: $ git pull https://github.com/nickpan47/samza partition-change-docsite Alternatively you can review and apply these changes as the patch at: https://github.com/apache/samza/pull/363.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #363 commit 7c9f326c86a3eb3ca160d81ec85a3c111f557c86 Author: Yi Pan (Data Infrastructure)Date: 2017-11-17T23:44:20Z SAMZA-1482: add config documentation for auto-restart/fail behavior on partition count changes > Restart or fail Samza jobs in YARN when input topic partition changes > - > > Key: SAMZA-1482 > URL: https://issues.apache.org/jira/browse/SAMZA-1482 > Project: Samza > Issue Type: Bug >Reporter: Yi Pan (Data Infrastructure) >Assignee: Yi Pan (Data Infrastructure) > Fix For: 0.14.0 > > Original Estimate: 168h > Remaining Estimate: 168h > > Currently, after a Samza job is started, it works only on a set of fixed > input topic partitions at the start-up time. When input topic partitions are > expanded, we often lose the messages sent in the new partitions, until we > restart the job. > SAMZA-882 added a input stream partition count monitor inside the > JobCoordinator. This ticket is targeted to use this monitor metrics and > trigger the following actions in YARN: > # for stateless jobs, shutdown the JobCoordinator w/ UNDEFINED status code > s.t. YARN will restart the whole job > # for stateful jobs, shutdown the JobCoordinator w/ FAILED status code s.t. > YARN will stop the whole job -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (SAMZA-1479) Proposal for Kafka checkpoint manager improvements
[ https://issues.apache.org/jira/browse/SAMZA-1479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jagadish closed SAMZA-1479. --- > Proposal for Kafka checkpoint manager improvements > -- > > Key: SAMZA-1479 > URL: https://issues.apache.org/jira/browse/SAMZA-1479 > Project: Samza > Issue Type: Bug >Reporter: Jagadish >Assignee: Jagadish >Priority: Trivial > > This proposal adds the following improvements to KafkaCheckpointManager for > better testability: > * Rewrite `KafkaCheckpointLogKey` into two classes - an immutable class, and > a SerDe: > * Remove dependency on static setters in the `KafkaCheckpointLogKey` > * Change lifecycle of components in KafkaCheckpointManager > -- It's safe to start producers and consumers during `start` as opposed to > lazy loading them during writes, and reads. > -- Initialize systemProducer and systemConsumer during construction > * Simplify logic for ignoring checkpoint validations > * Re-write checkpointManager#readLog() to use a simpler API. > * Remove unnecessary complexity after the migration from 0.8 > * Remove unnecessary locking in startup, and shut-down > * Remove dependencies on SimpleConsumer configs like bufferSize, fetchSize, > socketTimeout > * Refactor KafkaCheckpointManagerFactory and remove static > getCheckpointSystemNameAndFactory > * Bug-fix : Register the taskName correctly (instead of using a dummy string > for the taskName) > Testing improvements: > * Add unit tests to verify more checkpoint scenarios > * Consolidate unit tests into utils for creating producer, consumer and admin > instances > * Convert/consolidate most long-running integration tests into unit tests -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (SAMZA-1479) Proposal for Kafka checkpoint manager improvements
[ https://issues.apache.org/jira/browse/SAMZA-1479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257698#comment-16257698 ] ASF GitHub Bot commented on SAMZA-1479: --- Github user asfgit closed the pull request at: https://github.com/apache/samza/pull/348 > Proposal for Kafka checkpoint manager improvements > -- > > Key: SAMZA-1479 > URL: https://issues.apache.org/jira/browse/SAMZA-1479 > Project: Samza > Issue Type: Bug >Reporter: Jagadish >Assignee: Jagadish >Priority: Trivial > > This proposal adds the following improvements to KafkaCheckpointManager for > better testability: > * Rewrite `KafkaCheckpointLogKey` into two classes - an immutable class, and > a SerDe: > * Remove dependency on static setters in the `KafkaCheckpointLogKey` > * Change lifecycle of components in KafkaCheckpointManager > -- It's safe to start producers and consumers during `start` as opposed to > lazy loading them during writes, and reads. > -- Initialize systemProducer and systemConsumer during construction > * Simplify logic for ignoring checkpoint validations > * Re-write checkpointManager#readLog() to use a simpler API. > * Remove unnecessary complexity after the migration from 0.8 > * Remove unnecessary locking in startup, and shut-down > * Remove dependencies on SimpleConsumer configs like bufferSize, fetchSize, > socketTimeout > * Refactor KafkaCheckpointManagerFactory and remove static > getCheckpointSystemNameAndFactory > * Bug-fix : Register the taskName correctly (instead of using a dummy string > for the taskName) > Testing improvements: > * Add unit tests to verify more checkpoint scenarios > * Consolidate unit tests into utils for creating producer, consumer and admin > instances > * Convert/consolidate most long-running integration tests into unit tests -- This message was sent by Atlassian JIRA (v6.4.14#64029)
samza git commit: SAMZA-1479; Refactor KafkaCheckpointManager, KafkaCheckpointLogKey and their tests
Repository: samza Updated Branches: refs/heads/master 9fa8beed7 -> edce6b76d SAMZA-1479; Refactor KafkaCheckpointManager, KafkaCheckpointLogKey and their tests Notable changes: * Rewrite `KafkaCheckpointLogKey` into two classes - an immutable class, and a SerDe * Remove dependency on static setters in the `KafkaCheckpointLogKey` * Change lifecycle of components in KafkaCheckpointManager - It's safe to start producers and consumers during `start` as opposed to lazy loading them during writes, and reads. - Initialize systemProducer and systemConsumer during construction * Simplify logic for ignoring checkpoint validations * Re-write checkpointManager#readLog() to use a simpler API. * Remove unnecessary complexity after the migration from 0.8 * Remove unnecessary locking in startup, and shut-down * Remove dependencies on SimpleConsumer configs like bufferSize, fetchSize, socketTimeout * Refactor KafkaCheckpointManagerFactory and remove static getCheckpointSystemNameAndFactory * Bug-fix : Register the taskName correctly (instead of using a dummy string for the taskName) * Add unit tests to verify more checkpoint scenarios * Consolidate unit tests into utils for creating producer, consumer and admin instances * Convert/consolidate most long-running integration tests into unit tests Author: JagadishReviewers: Prateek Maheshwari Closes #348 from vjagadish1989/kafka-checkpointmanager-refactor Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/edce6b76 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/edce6b76 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/edce6b76 Branch: refs/heads/master Commit: edce6b76dbf3768884ea5fdadac748df7dec031b Parents: 9fa8bee Author: Jagadish Authored: Fri Nov 17 14:41:05 2017 -0800 Committer: Jagadish Committed: Fri Nov 17 14:41:05 2017 -0800 -- .../samza/serializers/TestCheckpointSerde.scala | 8 + .../checkpoint/kafka/KafkaCheckpointLogKey.java | 110 ++ .../kafka/KafkaCheckpointLogKeySerde.java | 68 .../samza/system/kafka/KafkaStreamSpec.java | 4 + .../kafka/KafkaCheckpointLogKey.scala | 171 .../kafka/KafkaCheckpointManager.scala | 385 +- .../kafka/KafkaCheckpointManagerFactory.scala | 81 +--- .../kafka/KafkaSystemConsumerMetrics.scala | 2 - .../kafka/TestKafkaCheckpointLogKeySerde.java | 53 +++ .../kafka/TestKafkaCheckpointManagerJava.java | 247 .../kafka/TeskKafkaCheckpointLogKey.scala | 61 --- .../kafka/TestKafkaCheckpointManager.scala | 388 ++- 12 files changed, 822 insertions(+), 756 deletions(-) -- http://git-wip-us.apache.org/repos/asf/samza/blob/edce6b76/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala -- diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala index 029bbef..c2060e0 100644 --- a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala +++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala @@ -29,6 +29,7 @@ import org.junit.Assert._ import org.junit.Test import scala.collection.JavaConverters._ +import scala.collection.mutable class TestCheckpointSerde { @Test @@ -57,4 +58,11 @@ class TestCheckpointSerde { assertNotSame(mapping, backToMap) } + @Test + def testNullCheckpointSerde: Unit = { +val checkpointBytes = null.asInstanceOf[Array[Byte]] +val checkpointSerde = new CheckpointSerde +val checkpoint = checkpointSerde.fromBytes(checkpointBytes) +assertNull(checkpoint) + } } http://git-wip-us.apache.org/repos/asf/samza/blob/edce6b76/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java -- diff --git a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java new file mode 100644 index 000..05114f9 --- /dev/null +++ b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may