[jira] [Commented] (SAMZA-1482) Restart or fail Samza jobs in YARN when input topic partition changes

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/SAMZA-1482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257775#comment-16257775
 ] 

ASF GitHub Bot commented on SAMZA-1482:
---

GitHub user nickpan47 opened a pull request:

https://github.com/apache/samza/pull/363

SAMZA-1482: add config documentation for auto-restart/fail behavior o…

…n partition count changes

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nickpan47/samza partition-change-docsite

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/samza/pull/363.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #363


commit 7c9f326c86a3eb3ca160d81ec85a3c111f557c86
Author: Yi Pan (Data Infrastructure) 
Date:   2017-11-17T23:44:20Z

SAMZA-1482: add config documentation for auto-restart/fail behavior on 
partition count changes




> Restart or fail Samza jobs in YARN when input topic partition changes
> -
>
> Key: SAMZA-1482
> URL: https://issues.apache.org/jira/browse/SAMZA-1482
> Project: Samza
>  Issue Type: Bug
>Reporter: Yi Pan (Data Infrastructure)
>Assignee: Yi Pan (Data Infrastructure)
> Fix For: 0.14.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Currently, after a Samza job is started, it works only on a set of fixed 
> input topic partitions at the start-up time. When input topic partitions are 
> expanded, we often lose the messages sent in the new partitions, until we 
> restart the job.
> SAMZA-882 added a input stream partition count monitor inside the 
> JobCoordinator. This ticket is targeted to use this monitor metrics and 
> trigger the following actions in YARN:
> # for stateless jobs, shutdown the JobCoordinator w/ UNDEFINED status code 
> s.t. YARN will restart the whole job
> # for stateful jobs, shutdown the JobCoordinator w/ FAILED status code s.t. 
> YARN will stop the whole job



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (SAMZA-1479) Proposal for Kafka checkpoint manager improvements

2017-11-17 Thread Jagadish (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jagadish closed SAMZA-1479.
---

> Proposal for Kafka checkpoint manager improvements
> --
>
> Key: SAMZA-1479
> URL: https://issues.apache.org/jira/browse/SAMZA-1479
> Project: Samza
>  Issue Type: Bug
>Reporter: Jagadish
>Assignee: Jagadish
>Priority: Trivial
>
> This proposal adds the following improvements to KafkaCheckpointManager for 
> better testability:
> *  Rewrite `KafkaCheckpointLogKey` into two classes - an immutable class, and 
> a SerDe:
> * Remove dependency on static setters in the `KafkaCheckpointLogKey`
> * Change lifecycle of components in KafkaCheckpointManager 
>   -- It's safe to start producers and consumers during `start` as opposed to 
> lazy loading them during writes, and reads.
>   -- Initialize systemProducer and systemConsumer during construction
> * Simplify logic for ignoring checkpoint validations
> * Re-write checkpointManager#readLog() to use a simpler API.  
> * Remove unnecessary complexity after the migration from 0.8
> * Remove unnecessary locking in startup, and shut-down
> * Remove dependencies on SimpleConsumer configs like bufferSize, fetchSize, 
> socketTimeout
> * Refactor KafkaCheckpointManagerFactory and remove static 
> getCheckpointSystemNameAndFactory
> * Bug-fix : Register the taskName correctly (instead of using a dummy string 
> for the taskName)
> Testing improvements:
> * Add unit tests to verify more checkpoint scenarios
> * Consolidate unit tests into utils for creating producer, consumer and admin 
> instances
> * Convert/consolidate most long-running integration tests into unit tests



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (SAMZA-1479) Proposal for Kafka checkpoint manager improvements

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/SAMZA-1479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257698#comment-16257698
 ] 

ASF GitHub Bot commented on SAMZA-1479:
---

Github user asfgit closed the pull request at:

https://github.com/apache/samza/pull/348


> Proposal for Kafka checkpoint manager improvements
> --
>
> Key: SAMZA-1479
> URL: https://issues.apache.org/jira/browse/SAMZA-1479
> Project: Samza
>  Issue Type: Bug
>Reporter: Jagadish
>Assignee: Jagadish
>Priority: Trivial
>
> This proposal adds the following improvements to KafkaCheckpointManager for 
> better testability:
> *  Rewrite `KafkaCheckpointLogKey` into two classes - an immutable class, and 
> a SerDe:
> * Remove dependency on static setters in the `KafkaCheckpointLogKey`
> * Change lifecycle of components in KafkaCheckpointManager 
>   -- It's safe to start producers and consumers during `start` as opposed to 
> lazy loading them during writes, and reads.
>   -- Initialize systemProducer and systemConsumer during construction
> * Simplify logic for ignoring checkpoint validations
> * Re-write checkpointManager#readLog() to use a simpler API.  
> * Remove unnecessary complexity after the migration from 0.8
> * Remove unnecessary locking in startup, and shut-down
> * Remove dependencies on SimpleConsumer configs like bufferSize, fetchSize, 
> socketTimeout
> * Refactor KafkaCheckpointManagerFactory and remove static 
> getCheckpointSystemNameAndFactory
> * Bug-fix : Register the taskName correctly (instead of using a dummy string 
> for the taskName)
> Testing improvements:
> * Add unit tests to verify more checkpoint scenarios
> * Consolidate unit tests into utils for creating producer, consumer and admin 
> instances
> * Convert/consolidate most long-running integration tests into unit tests



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


samza git commit: SAMZA-1479; Refactor KafkaCheckpointManager, KafkaCheckpointLogKey and their tests

2017-11-17 Thread jagadish
Repository: samza
Updated Branches:
  refs/heads/master 9fa8beed7 -> edce6b76d


SAMZA-1479; Refactor KafkaCheckpointManager, KafkaCheckpointLogKey and their 
tests

Notable changes:
* Rewrite `KafkaCheckpointLogKey` into two classes - an immutable class, and a 
SerDe
* Remove dependency on static setters in the `KafkaCheckpointLogKey`
* Change lifecycle of components in KafkaCheckpointManager
  - It's safe to start producers and consumers during `start` as opposed to 
lazy loading them during writes, and reads.
  - Initialize systemProducer and systemConsumer during construction
* Simplify logic for ignoring checkpoint validations
* Re-write checkpointManager#readLog() to use a simpler API.
* Remove unnecessary complexity after the migration from 0.8
* Remove unnecessary locking in startup, and shut-down
* Remove dependencies on SimpleConsumer configs like bufferSize, fetchSize, 
socketTimeout
* Refactor KafkaCheckpointManagerFactory and remove static 
getCheckpointSystemNameAndFactory
* Bug-fix : Register the taskName correctly (instead of using a dummy string 
for the taskName)

* Add unit tests to verify more checkpoint scenarios
* Consolidate unit tests into utils for creating producer, consumer and admin 
instances
* Convert/consolidate most long-running integration tests into unit tests

Author: Jagadish 

Reviewers: Prateek Maheshwari 

Closes #348 from vjagadish1989/kafka-checkpointmanager-refactor


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/edce6b76
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/edce6b76
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/edce6b76

Branch: refs/heads/master
Commit: edce6b76dbf3768884ea5fdadac748df7dec031b
Parents: 9fa8bee
Author: Jagadish 
Authored: Fri Nov 17 14:41:05 2017 -0800
Committer: Jagadish 
Committed: Fri Nov 17 14:41:05 2017 -0800

--
 .../samza/serializers/TestCheckpointSerde.scala |   8 +
 .../checkpoint/kafka/KafkaCheckpointLogKey.java | 110 ++
 .../kafka/KafkaCheckpointLogKeySerde.java   |  68 
 .../samza/system/kafka/KafkaStreamSpec.java |   4 +
 .../kafka/KafkaCheckpointLogKey.scala   | 171 
 .../kafka/KafkaCheckpointManager.scala  | 385 +-
 .../kafka/KafkaCheckpointManagerFactory.scala   |  81 +---
 .../kafka/KafkaSystemConsumerMetrics.scala  |   2 -
 .../kafka/TestKafkaCheckpointLogKeySerde.java   |  53 +++
 .../kafka/TestKafkaCheckpointManagerJava.java   | 247 
 .../kafka/TeskKafkaCheckpointLogKey.scala   |  61 ---
 .../kafka/TestKafkaCheckpointManager.scala  | 388 ++-
 12 files changed, 822 insertions(+), 756 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/edce6b76/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
--
diff --git 
a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
 
b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
index 029bbef..c2060e0 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
@@ -29,6 +29,7 @@ import org.junit.Assert._
 import org.junit.Test
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 class TestCheckpointSerde {
   @Test
@@ -57,4 +58,11 @@ class TestCheckpointSerde {
 assertNotSame(mapping, backToMap)
   }
 
+  @Test
+  def testNullCheckpointSerde: Unit = {
+val checkpointBytes = null.asInstanceOf[Array[Byte]]
+val checkpointSerde = new CheckpointSerde
+val checkpoint = checkpointSerde.fromBytes(checkpointBytes)
+assertNull(checkpoint)
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/edce6b76/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
--
diff --git 
a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
 
b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
new file mode 100644
index 000..05114f9
--- /dev/null
+++ 
b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may