[jira] [Commented] (SAMZA-1649) Improve host-aware allocation to account for strict locality

2018-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/SAMZA-1649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16436697#comment-16436697
 ] 

ASF GitHub Bot commented on SAMZA-1649:
---

Github user asfgit closed the pull request at:

https://github.com/apache/samza/pull/471


> Improve host-aware allocation to account for strict locality
> 
>
> Key: SAMZA-1649
> URL: https://issues.apache.org/jira/browse/SAMZA-1649
> Project: Samza
>  Issue Type: Bug
>Reporter: Jagadish
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


samza git commit: SAMZA-1649: Improve host-aware allocation to account for strict locality

2018-04-12 Thread jagadish
Repository: samza
Updated Branches:
  refs/heads/master 96c333469 -> b6219f181


SAMZA-1649: Improve host-aware allocation to account for strict locality

Currently working on a doc for the behavior of the CapacityScheduler and 
further testing is needed on an actual cluster - but here's a summary of why we 
should set relax-locality = false:
 - Node-local requests are honored only when relax-locality = false
 - With relax-locality = true, the scheduler biases interactivity over 
data-locality for requests that ask for few resources relative to the size of 
the cluster.

In addition to the above change, this PR also modifies the allocator algorithm 
to fallback to "ANY_HOST" requests so that we make progress when the node is 
unavailable.

Author: Jagadish 

Reviewers: Prateek Maheshwari 

Closes #471 from vjagadish/relax-locality-fix


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b6219f18
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b6219f18
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b6219f18

Branch: refs/heads/master
Commit: b6219f181ff54e79aedcf6f1b7fff264925a8af7
Parents: 96c3334
Author: Jagadish 
Authored: Thu Apr 12 19:23:44 2018 -0700
Committer: Jagadish 
Committed: Thu Apr 12 19:23:44 2018 -0700

--
 .../HostAwareContainerAllocator.java|  17 ++-
 .../clustermanager/ResourceRequestState.java|  18 +++
 .../MockClusterResourceManager.java |  39 --
 .../TestHostAwareContainerAllocator.java| 124 +--
 .../job/yarn/YarnClusterResourceManager.java|   2 +-
 5 files changed, 169 insertions(+), 31 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/b6219f18/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
--
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
index 66e2246..fe462e7 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
@@ -78,13 +78,18 @@ public class HostAwareContainerAllocator extends 
AbstractContainerAllocator {
 boolean expired = requestExpired(request);
 boolean resourceAvailableOnAnyHost = 
hasAllocatedResource(ResourceRequestState.ANY_HOST);
 
-if (expired && resourceAvailableOnAnyHost) {
-  log.info("Request expired. running on ANY_HOST");
-  runStreamProcessor(request, ResourceRequestState.ANY_HOST);
+if (expired) {
+  if (resourceAvailableOnAnyHost) {
+log.info("Request for container: {} on {} has expired. Running on 
ANY_HOST", request.getContainerID(), request.getPreferredHost());
+runStreamProcessor(request, ResourceRequestState.ANY_HOST);
+  } else {
+log.info("Request for container: {} on {} has expired. Requesting 
additional resources on ANY_HOST.", request.getContainerID(), 
request.getPreferredHost());
+resourceRequestState.cancelResourceRequest(request);
+requestResource(containerID, ResourceRequestState.ANY_HOST);
+  }
 } else {
-  log.info("Either the request timestamp {} is greater than resource 
request timeout {}ms or we couldn't "
-  + "find any free allocated resources in the buffer. Breaking 
out of loop.",
-  request.getRequestTimestampMs(), requestTimeout);
+  log.info("Request for container: {} on {} has not yet expired. 
Request creation time: {}. Request timeout: {}",
+  new Object[]{request.getContainerID(), 
request.getPreferredHost(), request.getRequestTimestampMs(), requestTimeout});
   break;
 }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/b6219f18/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
--
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
index fe2067c..51caa39 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
@@ -102,6 +102,24 @@ public class ResourceRequestState {
   }
 
   /**
+   * Cancels a {@link SamzaResourceRequest} previously submitted to the {@link 
ClusterResourceManager}
+   *
+   * @param request {@link SamzaResourceRequest} to cancel
+   */
+  

[jira] [Updated] (SAMZA-1655) StreamProcessor skips all events from zookeeper server after zkClient session expiration.

2018-04-12 Thread Shanthoosh Venkataraman (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shanthoosh Venkataraman updated SAMZA-1655:
---
Description: 
*Problem:*

StreamProcessor skips all events from zookeeper server after zkClient session 
expiration.

*Reason:*

All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are 
instantiated with an inmemory generationId(zkUtils.generationId).

All the zkWatchers cache this generationId passed through the constructor.

This inmemory generationId is incremented whenever the zkClient session 
expiration occurs.

Any event from zookeeper server is dropped if the inmemory generationId is not 
equal to the generationId cached in a zkWatcher.

After the zkClient session expiration in a StreamProcessor, this will result in 
zombie StreamProcessors and will stall message processing.

In worst case, if this happens to the leader StreamProcessor then the whole 
processors group will be stalled.

*Purpose of generationId:*

GenerationId was added initially to skip older stale events that are buffered 
in worker queue after a session expiration phase to guard against the following 
scenario:

1. Session expiration happens to the leader stream processor of the group.
 2. Leader stream processor joins the group as a follower after the reconnect 
to a different zkServer in the ensemble.
 3. Leader might have zkEvents buffered in it's worker queue which were 
delivered to it when it was a leader. If the leader acts upon these events it 
will cause global state corruption.

 

  was:
*Problem:*

StreamProcessor skips all events from zookeeper server after zkClient session 
expiration.

*Reason:*

All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are 
instantiated with an inmemory generationId(zkUtils.generationId).

All the zkWatchers cache this generationId passed through the constructor.

This inmemory generationId is incremented whenever the zkClient session 
expiration occurs.

Any event from zookeeper server is dropped if the inmemory generationId is not 
equal to the generationId cached in a zkWatcher.

After the zkClient session expiration in a StreamProcessor, this will result in 
zombie StreamProcessors and will stall message processing.

In worst case, if this happens to the leader StreamProcessor then the whole 
processors group will be stalled.

*Purpose of generationId:*

GenerationId was added initially to skip older stale events that are buffered 
in worker queue after a session expiration phase to guard against the following 
scenario:

1. Session expiration happens to the leader stream processor of the group.
 2. Leader stream processor joins the group as a follower after the session 
reconnect.
 3. Leader might have zkEvents buffered in it's worker queue which were 
delivered to it when it was a leader. If the leader acts upon these events it 
will cause global state corruption.

 


> StreamProcessor skips all events from zookeeper server after zkClient session 
> expiration.
> -
>
> Key: SAMZA-1655
> URL: https://issues.apache.org/jira/browse/SAMZA-1655
> Project: Samza
>  Issue Type: Bug
>Reporter: Shanthoosh Venkataraman
>Assignee: Shanthoosh Venkataraman
>Priority: Major
>
> *Problem:*
> StreamProcessor skips all events from zookeeper server after zkClient session 
> expiration.
> *Reason:*
> All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are 
> instantiated with an inmemory generationId(zkUtils.generationId).
> All the zkWatchers cache this generationId passed through the constructor.
> This inmemory generationId is incremented whenever the zkClient session 
> expiration occurs.
> Any event from zookeeper server is dropped if the inmemory generationId is 
> not equal to the generationId cached in a zkWatcher.
> After the zkClient session expiration in a StreamProcessor, this will result 
> in zombie StreamProcessors and will stall message processing.
> In worst case, if this happens to the leader StreamProcessor then the whole 
> processors group will be stalled.
> *Purpose of generationId:*
> GenerationId was added initially to skip older stale events that are buffered 
> in worker queue after a session expiration phase to guard against the 
> following scenario:
> 1. Session expiration happens to the leader stream processor of the group.
>  2. Leader stream processor joins the group as a follower after the reconnect 
> to a different zkServer in the ensemble.
>  3. Leader might have zkEvents buffered in it's worker queue which were 
> delivered to it when it was a leader. If the leader acts upon these events it 
> will cause global state corruption.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1655) StreamProcessor skips all events from zookeeper server after zkClient session expiration.

2018-04-12 Thread Shanthoosh Venkataraman (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shanthoosh Venkataraman updated SAMZA-1655:
---
Description: 
*Problem:*

StreamProcessor skips all events from zookeeper server after zkClient session 
expiration.

*Reason:*

All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are 
instantiated with an inmemory generationId(zkUtils.generationId).

All the zkWatchers cache this generationId passed through the constructor.

This inmemory generationId is incremented whenever the zkClient session 
expiration occurs.

Any event from zookeeper server is dropped if the inmemory generationId is not 
equal to the generationId cached in a zkWatcher.

After the zkClient session expiration in a StreamProcessor, this will result in 
zombie StreamProcessors and will stall message processing.

In worst case, if this happens to the leader StreamProcessor then the whole 
processors group will be stalled.

*Purpose of generationId:*

GenerationId was added initially to skip older stale events that are buffered 
in worker queue after a session expiration phase to guard against the following 
scenario:

1. Session expiration happens to the leader stream processor of the group.
 2. Leader stream processor joins the group as a follower after the session 
reconnect.
 3. Leader might have zkEvents buffered in it's worker queue which were 
delivered to it when it was a leader. If the leader acts upon these events it 
will cause global state corruption.

 

  was:
*Problem:*

StreamProcessor skips all events from zookeeper server after zkClient session 
expiration.

*Reason:*

All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are 
instantiated with an inmemory generationId(zkUtils.generationId).

All the zkWatchers cache this generationId passed through the constructor.

This inmemory generationId is incremented whenever the zkClient session 
expiration occurs.

Any event from zookeeper server is dropped if the inmemory generationId is not 
equal to the generationId cached in a zkWatcher.

After the zkClient session expiration in a StreamProcessor, this will result in 
zombie StreamProcessors and will stall message processing.

In worst case, if this happens to the leader StreamProcessor then the whole 
processors group will be stalled.

*Purpose of generationId:*

GenerationId was added initially to skip older stale events that are buffered 
in worker queue after a session expiration phase and guard against the 
following scenario:


 1. Session expiration happens to the leader stream processor of the group.
 2. Leader stream processor joins the group as a follower after the session 
reconnect.
 3. Leader might have zkEvents buffered in it's worker queue which were 
delivered to it when it was a leader. If the leader acts upon these events it 
will cause global state corruption.

 


> StreamProcessor skips all events from zookeeper server after zkClient session 
> expiration.
> -
>
> Key: SAMZA-1655
> URL: https://issues.apache.org/jira/browse/SAMZA-1655
> Project: Samza
>  Issue Type: Bug
>Reporter: Shanthoosh Venkataraman
>Assignee: Shanthoosh Venkataraman
>Priority: Major
>
> *Problem:*
> StreamProcessor skips all events from zookeeper server after zkClient session 
> expiration.
> *Reason:*
> All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are 
> instantiated with an inmemory generationId(zkUtils.generationId).
> All the zkWatchers cache this generationId passed through the constructor.
> This inmemory generationId is incremented whenever the zkClient session 
> expiration occurs.
> Any event from zookeeper server is dropped if the inmemory generationId is 
> not equal to the generationId cached in a zkWatcher.
> After the zkClient session expiration in a StreamProcessor, this will result 
> in zombie StreamProcessors and will stall message processing.
> In worst case, if this happens to the leader StreamProcessor then the whole 
> processors group will be stalled.
> *Purpose of generationId:*
> GenerationId was added initially to skip older stale events that are buffered 
> in worker queue after a session expiration phase to guard against the 
> following scenario:
> 1. Session expiration happens to the leader stream processor of the group.
>  2. Leader stream processor joins the group as a follower after the session 
> reconnect.
>  3. Leader might have zkEvents buffered in it's worker queue which were 
> delivered to it when it was a leader. If the leader acts upon these events it 
> will cause global state corruption.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (SAMZA-1650) Window high level API: Trigger is not fired at the end of trigger interval for tumbling window

2018-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/SAMZA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16436496#comment-16436496
 ] 

ASF GitHub Bot commented on SAMZA-1650:
---

Github user asfgit closed the pull request at:

https://github.com/apache/samza/pull/472


> Window high level API: Trigger is not fired at the end of trigger interval 
> for tumbling window
> --
>
> Key: SAMZA-1650
> URL: https://issues.apache.org/jira/browse/SAMZA-1650
> Project: Samza
>  Issue Type: Bug
>Reporter: Aditya
>Priority: Major
>
> [~jagadish1...@gmail.com] root-caused this to computeTriggerInterval() in 
> JobNode where we are not computing the timeInterval if joinTtlInterval is not 
> set, hence discarding windowTimerIntervals even if they are set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


samza git commit: SAMZA-1650: Fix for firing trigger at the end of trigger interval for tumbling window

2018-04-12 Thread jagadish
Repository: samza
Updated Branches:
  refs/heads/master 76de840c7 -> 96c333469


SAMZA-1650: Fix for firing trigger at the end of trigger interval for tumbling 
window

Author: Aditya Toomula 

Reviewers: Jagadish 

Closes #472 from atoomula/window


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/96c33346
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/96c33346
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/96c33346

Branch: refs/heads/master
Commit: 96c3334693f7d18d087dc5262dddfc7f315d37f1
Parents: 76de840
Author: Aditya Toomula 
Authored: Thu Apr 12 16:50:30 2018 -0700
Committer: Jagadish 
Committed: Thu Apr 12 16:50:30 2018 -0700

--
 .../main/java/org/apache/samza/execution/JobNode.java| 11 +--
 1 file changed, 5 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/96c33346/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
--
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java 
b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index c0b4ee5..bc85d00 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -334,17 +334,16 @@ public class JobNode {
 .map(spec -> ((JoinOperatorSpec) spec).getTtlMs())
 .collect(Collectors.toList());
 
-if (joinTtlIntervals.isEmpty()) {
-  return -1;
-}
-
 // Combine both the above lists
 List candidateTimerIntervals = new ArrayList<>(joinTtlIntervals);
 candidateTimerIntervals.addAll(windowTimerIntervals);
 
+if (candidateTimerIntervals.isEmpty()) {
+  return -1;
+}
+
 // Compute the gcd of the resultant list
-long timerInterval = MathUtils.gcd(candidateTimerIntervals);
-return timerInterval;
+return MathUtils.gcd(candidateTimerIntervals);
   }
 
   /**



[jira] [Commented] (SAMZA-1650) Window high level API: Trigger is not fired at the end of trigger interval for tumbling window

2018-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/SAMZA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16436494#comment-16436494
 ] 

ASF GitHub Bot commented on SAMZA-1650:
---

GitHub user atoomula opened a pull request:

https://github.com/apache/samza/pull/472

SAMZA-1650: Fix for firing trigger at the end of trigger interval for 
tumbling window



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/atoomula/samza window

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/samza/pull/472.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #472


commit 82937f851cb1e1b5a2e2048582a60cba3791efba
Author: Aditya Toomula 
Date:   2018-04-12T21:47:19Z

Window high level API: Trigger is not fired at the end of trigger interval 
for tumbling window




> Window high level API: Trigger is not fired at the end of trigger interval 
> for tumbling window
> --
>
> Key: SAMZA-1650
> URL: https://issues.apache.org/jira/browse/SAMZA-1650
> Project: Samza
>  Issue Type: Bug
>Reporter: Aditya
>Priority: Major
>
> [~jagadish1...@gmail.com] root-caused this to computeTriggerInterval() in 
> JobNode where we are not computing the timeInterval if joinTtlInterval is not 
> set, hence discarding windowTimerIntervals even if they are set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (SAMZA-1584) Improve logging in StreamProcessor.

2018-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/SAMZA-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16436339#comment-16436339
 ] 

ASF GitHub Bot commented on SAMZA-1584:
---

Github user asfgit closed the pull request at:

https://github.com/apache/samza/pull/441


> Improve logging in StreamProcessor.
> ---
>
> Key: SAMZA-1584
> URL: https://issues.apache.org/jira/browse/SAMZA-1584
> Project: Samza
>  Issue Type: Bug
>Reporter: Shanthoosh Venkataraman
>Assignee: Shanthoosh Venkataraman
>Priority: Major
>
> In existing StreamProcessor implementation, we don't have sufficient logging 
> in it's life cycle methods and callback handlers. This makes the production 
> debugging a tedious task. This Jira is created for bookkeeping purposes to 
> track work required to improve it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (SAMZA-1584) Improve logging in StreamProcessor.

2018-04-12 Thread Prateek Maheshwari (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prateek Maheshwari resolved SAMZA-1584.
---
Resolution: Fixed

> Improve logging in StreamProcessor.
> ---
>
> Key: SAMZA-1584
> URL: https://issues.apache.org/jira/browse/SAMZA-1584
> Project: Samza
>  Issue Type: Bug
>Reporter: Shanthoosh Venkataraman
>Assignee: Shanthoosh Venkataraman
>Priority: Major
>
> In existing StreamProcessor implementation, we don't have sufficient logging 
> in it's life cycle methods and callback handlers. This makes the production 
> debugging a tedious task. This Jira is created for bookkeeping purposes to 
> track work required to improve it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


samza git commit: SAMZA-1584: Improve logging in StreamProcessor.

2018-04-12 Thread pmaheshwari
Repository: samza
Updated Branches:
  refs/heads/master 3895a9070 -> 76de840c7


SAMZA-1584: Improve logging in StreamProcessor.

Add the processorID in the log lines wherever necessary(since we support 
running multiple stream applications in a JVM) and improving logging in general 
in StreamProcessor.

Author: Shanthoosh Venkataraman 
Author: Shanthoosh Venkataraman 

Reviewers: Prateek Maheshwari 

Closes #441 from shanthoosh/SAMZA-1584


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/76de840c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/76de840c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/76de840c

Branch: refs/heads/master
Commit: 76de840c734fe2b7987af22d1ba6133437c25a5e
Parents: 3895a90
Author: Shanthoosh Venkataraman 
Authored: Thu Apr 12 14:34:45 2018 -0700
Committer: Prateek Maheshwari 
Committed: Thu Apr 12 14:34:45 2018 -0700

--
 .../apache/samza/processor/StreamProcessor.java | 54 ++--
 1 file changed, 27 insertions(+), 27 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/76de840c/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
--
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index b548200..8dacc6c 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import org.apache.samza.SamzaContainerStatus;
 import org.apache.samza.annotation.InterfaceStability;
@@ -55,6 +56,7 @@ import org.slf4j.LoggerFactory;
 @InterfaceStability.Evolving
 public class StreamProcessor {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamProcessor.class);
+  private static final String CONTAINER_THREAD_NAME_FORMAT = "Samza 
StreamProcessor Container Thread-%d";
 
   private final JobCoordinator jobCoordinator;
   private final StreamProcessorLifecycleListener processorListener;
@@ -71,7 +73,7 @@ public class StreamProcessor {
 
   // Latch used to synchronize between the JobCoordinator thread and the 
container thread, when the container is
   // stopped due to re-balancing
-  /* package private */volatile CountDownLatch jcContainerShutdownLatch;
+  volatile CountDownLatch jcContainerShutdownLatch;
   private volatile boolean processorOnStartCalled = false;
 
   @VisibleForTesting
@@ -179,11 +181,12 @@ public class StreamProcessor {
 boolean containerShutdownInvoked = false;
 if (container != null) {
   try {
-LOGGER.info("Shutting down container " + container.toString() + " from 
StreamProcessor");
+LOGGER.info("Shutting down the container: {} of stream processor: 
{}.", container, processorId);
 container.shutdown();
+LOGGER.info("Waiting {} milliseconds for the container: {} to 
shutdown.", taskShutdownMs, container);
 containerShutdownInvoked = true;
-  } catch (IllegalContainerStateException icse) {
-LOGGER.info("Container was not running", icse);
+  } catch (Exception exception) {
+LOGGER.error(String.format("Ignoring the exception during the shutdown 
of container: %s.", container), exception);
   }
 }
 
@@ -191,7 +194,6 @@ public class StreamProcessor {
   LOGGER.info("Shutting down JobCoordinator from StreamProcessor");
   jobCoordinator.stop();
 }
-
   }
 
   SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) {
@@ -199,7 +201,7 @@ public class StreamProcessor {
 processorId,
 jobModel,
 config,
-Util.javaMapAsScalaMap(customMetricsReporter),
+Util.javaMapAsScalaMap(customMetricsReporter),
 taskFactory);
   }
 
@@ -213,32 +215,30 @@ public class StreamProcessor {
   if (SamzaContainerStatus.NOT_STARTED.equals(status) || 
SamzaContainerStatus.STARTED.equals(status)) {
 boolean shutdownComplete = false;
 try {
-  LOGGER.info("Shutting down container in onJobModelExpired for 
processor:" + processorId);
+  LOGGER.info("Job model expired. Shutting down the container: {} 
of stream processor: {}.", container, processorId);
   container.pause();
   shutdownComplete = 
jcContainerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS);
-  LOGGER.info("ShutdownComplete=" + shutdownComplete);
+  LOGGER.inf

[jira] [Updated] (SAMZA-1655) StreamProcessor skips all events from zookeeper server after zkClient session expiration.

2018-04-12 Thread Shanthoosh Venkataraman (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shanthoosh Venkataraman updated SAMZA-1655:
---
Description: 
*Problem:*

StreamProcessor skips all events from zookeeper server after zkClient session 
expiration.

*Reason:*

All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are 
instantiated with an inmemory generationId(zkUtils.generationId).

All the zkWatchers cache this generationId passed through the constructor.

This inmemory generationId is incremented whenever the zkClient session 
expiration occurs.

Any event from zookeeper server is dropped if the inmemory generationId is not 
equal to the generationId cached in a zkWatcher.

After the zkClient session expiration in a StreamProcessor, this will result in 
zombie StreamProcessors and will stall message processing.

In worst case, if this happens to the leader StreamProcessor then the whole 
processors group will be stalled.

*Purpose of generationId:*

GenerationId was added initially to skip older stale events that are buffered 
in worker queue after a session expiration phase and guard against the 
following scenario:


 1. Session expiration happens to the leader stream processor of the group.
 2. Leader stream processor joins the group as a follower after the session 
reconnect.
 3. Leader might have zkEvents buffered in it's worker queue which were 
delivered to it when it was a leader. If the leader acts upon these events it 
will cause global state corruption.

 

  was:
*Problem:*

StreamProcessor skips all events from zookeeper server after zkClient session 
expiration.

*Reason:*

All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are 
instantiated with an inmemory generationId(zkUtils.generationId).

All the zkWatchers cache this generationId passed through the constructor.

This inmemory generationId is incremented whenever the zkClient session 
expiration occurs.

Any event from zookeeper server is dropped if the inmemory generationId is not 
equal to the generationId cached in a zkWatcher.

After the zkClient session expiration in a StreamProcessor, this will result in 
zombie StreamProcessors and will stall message processing.

In worst case, if this happens to the leader StreamProcessor then the whole 
processors group will be stalled.

*Purpose of generationId:*

GenerationId was added initially to skip older stale events that are buffered 
in worker queue after a session expiration phase and guard against the 
following scenario:
 1. Session expiration happens to the leader stream processor of the group.
 2. Leader stream processor joins the group as a follower after the session 
reconnect.
 3. Leader might have zkEvents buffered in it's worker queue which were 
delivered to it when it was a leader. If the leader acts upon these events it 
will cause global state corruption.

 


> StreamProcessor skips all events from zookeeper server after zkClient session 
> expiration.
> -
>
> Key: SAMZA-1655
> URL: https://issues.apache.org/jira/browse/SAMZA-1655
> Project: Samza
>  Issue Type: Bug
>Reporter: Shanthoosh Venkataraman
>Assignee: Shanthoosh Venkataraman
>Priority: Major
>
> *Problem:*
> StreamProcessor skips all events from zookeeper server after zkClient session 
> expiration.
> *Reason:*
> All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are 
> instantiated with an inmemory generationId(zkUtils.generationId).
> All the zkWatchers cache this generationId passed through the constructor.
> This inmemory generationId is incremented whenever the zkClient session 
> expiration occurs.
> Any event from zookeeper server is dropped if the inmemory generationId is 
> not equal to the generationId cached in a zkWatcher.
> After the zkClient session expiration in a StreamProcessor, this will result 
> in zombie StreamProcessors and will stall message processing.
> In worst case, if this happens to the leader StreamProcessor then the whole 
> processors group will be stalled.
> *Purpose of generationId:*
> GenerationId was added initially to skip older stale events that are buffered 
> in worker queue after a session expiration phase and guard against the 
> following scenario:
>  1. Session expiration happens to the leader stream processor of the group.
>  2. Leader stream processor joins the group as a follower after the session 
> reconnect.
>  3. Leader might have zkEvents buffered in it's worker queue which were 
> delivered to it when it was a leader. If the leader acts upon these events it 
> will cause global state corruption.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (SAMZA-1655) StreamProcessor skips all events from zookeeper server after zkClient session expiration.

2018-04-12 Thread Shanthoosh Venkataraman (JIRA)

[ 
https://issues.apache.org/jira/browse/SAMZA-1655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16436310#comment-16436310
 ] 

Shanthoosh Venkataraman commented on SAMZA-1655:


[~xinyu] [~jagadish1...@gmail.com]

Please review it and share your thoughts.

> StreamProcessor skips all events from zookeeper server after zkClient session 
> expiration.
> -
>
> Key: SAMZA-1655
> URL: https://issues.apache.org/jira/browse/SAMZA-1655
> Project: Samza
>  Issue Type: Bug
>Reporter: Shanthoosh Venkataraman
>Assignee: Shanthoosh Venkataraman
>Priority: Major
>
> *Problem:*
> StreamProcessor skips all events from zookeeper server after zkClient session 
> expiration.
> *Reason:*
> All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are 
> instantiated with an inmemory generationId(zkUtils.generationId).
> All the zkWatchers cache this generationId passed through the constructor.
> This inmemory generationId is incremented whenever the zkClient session 
> expiration occurs.
> Any event from zookeeper server is dropped if the inmemory generationId is 
> not equal to the generationId cached in a zkWatcher.
> After the zkClient session expiration in a StreamProcessor, this will result 
> in zombie StreamProcessors and will stall message processing.
> In worst case, if this happens to the leader StreamProcessor then the whole 
> processors group will be stalled.
> *Purpose of generationId:*
> GenerationId was added initially to skip older stale events that are buffered 
> in worker queue after a session expiration phase and guard against the 
> following scenario:
>  1. Session expiration happens to the leader stream processor of the group.
>  2. Leader stream processor joins the group as a follower after the session 
> reconnect.
>  3. Leader might have zkEvents buffered in it's worker queue which were 
> delivered to it when it was a leader. If the leader acts upon these events it 
> will cause global state corruption.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1655) StreamProcessor skips all events from zookeeper server after zkClient session expiration.

2018-04-12 Thread Shanthoosh Venkataraman (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shanthoosh Venkataraman updated SAMZA-1655:
---
Description: 
*Problem:*

StreamProcessor skips all events from zookeeper server after zkClient session 
expiration.

*Reason:*

All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are 
instantiated with an inmemory generationId(zkUtils.generationId).

All the zkWatchers cache this generationId passed through the constructor.

This inmemory generationId is incremented whenever the zkClient session 
expiration occurs.

Any event from zookeeper server is dropped if the inmemory generationId is not 
equal to the generationId cached in a zkWatcher.

After the zkClient session expiration in a StreamProcessor, this will result in 
zombie StreamProcessors and will stall message processing.

In worst case, if this happens to the leader StreamProcessor then the whole 
processors group will be stalled.

*Purpose of generationId:*

GenerationId was added initially to skip older stale events that are buffered 
in worker queue after a session expiration phase and guard against the 
following scenario:
 1. Session expiration happens to the leader stream processor of the group.
 2. Leader stream processor joins the group as a follower after the session 
reconnect.
 3. Leader might have zkEvents buffered in it's worker queue which were 
delivered to it when it was a leader. If the leader acts upon these events it 
will cause global state corruption.

 

  was:
*Problem:* StreamProcessor skips all events from zookeeper server after 
zkClient session expiration.

*Reason:*

All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are 
instantiated with an inmemory generationId(zkUtils.generationId).

All the zkWatchers cache this generationId passed through the constructor.

This inmemory generationId is incremented whenever the zkClient session 
expiration occurs.

Any event from zookeeper server is dropped if the inmemory generationId is not 
equal to the generationId cached in a zkWatcher.

After the zkClient session expiration in a StreamProcessor, this will result in 
zombie StreamProcessors and will stall message processing.

In worst case, if this happens to the leader StreamProcessor then the whole 
processors group will be stalled.

*Purpose of generationId:*

GenerationId was added initially to skip older stale events that are buffered 
in worker queue after a session expiration phase and guard against the 
following scenario:
 1. Session expiration happens to the leader stream processor of the group.
 2. Leader stream processor joins the group as a follower after the session 
reconnect.
 3. Leader might have zkEvents buffered in it's worker queue which were 
delivered to it when it was a leader. If the leader acts upon these events it 
will cause global state corruption.

 


> StreamProcessor skips all events from zookeeper server after zkClient session 
> expiration.
> -
>
> Key: SAMZA-1655
> URL: https://issues.apache.org/jira/browse/SAMZA-1655
> Project: Samza
>  Issue Type: Bug
>Reporter: Shanthoosh Venkataraman
>Assignee: Shanthoosh Venkataraman
>Priority: Major
>
> *Problem:*
> StreamProcessor skips all events from zookeeper server after zkClient session 
> expiration.
> *Reason:*
> All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are 
> instantiated with an inmemory generationId(zkUtils.generationId).
> All the zkWatchers cache this generationId passed through the constructor.
> This inmemory generationId is incremented whenever the zkClient session 
> expiration occurs.
> Any event from zookeeper server is dropped if the inmemory generationId is 
> not equal to the generationId cached in a zkWatcher.
> After the zkClient session expiration in a StreamProcessor, this will result 
> in zombie StreamProcessors and will stall message processing.
> In worst case, if this happens to the leader StreamProcessor then the whole 
> processors group will be stalled.
> *Purpose of generationId:*
> GenerationId was added initially to skip older stale events that are buffered 
> in worker queue after a session expiration phase and guard against the 
> following scenario:
>  1. Session expiration happens to the leader stream processor of the group.
>  2. Leader stream processor joins the group as a follower after the session 
> reconnect.
>  3. Leader might have zkEvents buffered in it's worker queue which were 
> delivered to it when it was a leader. If the leader acts upon these events it 
> will cause global state corruption.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (SAMZA-1655) StreamProcessor skips all events from zookeeper server after zkClient session expiration.

2018-04-12 Thread Shanthoosh Venkataraman (JIRA)

[ 
https://issues.apache.org/jira/browse/SAMZA-1655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16436304#comment-16436304
 ] 

Shanthoosh Venkataraman commented on SAMZA-1655:


*Fix:*
A. Route all the watcher events from the zookeeper server to the 
ScheduleAfterDebounce worker queue and execute it through the worker thread. 
After this change, all the events before an session expiration will be buffered 
in the ScheduleAfterDebounce worker queue. Upon session expiration, stale 
buffered events in the queue will be deleted. When a processor has it's session 
expired from one zookeeper server and reconnects to other zookeeper server in 
the ensemble, we reconnect and recreate the ephemeral processor node(because 
the previous ephemeral node of the processor is gone). This situation is 
synonymous to a new processor joining the processors group. So clearing the 
stale buffered zookeeper events in ScheduleAfterDebounce worker queue is 
mandatory and should not cause corruption issues.
B. Remove generationId based check before zkWatcherEvent handling and the 
associated relevant code.

> StreamProcessor skips all events from zookeeper server after zkClient session 
> expiration.
> -
>
> Key: SAMZA-1655
> URL: https://issues.apache.org/jira/browse/SAMZA-1655
> Project: Samza
>  Issue Type: Bug
>Reporter: Shanthoosh Venkataraman
>Assignee: Shanthoosh Venkataraman
>Priority: Major
>
> *Problem:* StreamProcessor skips all events from zookeeper server after 
> zkClient session expiration.
> *Reason:*
> All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are 
> instantiated with an inmemory generationId(zkUtils.generationId).
> All the zkWatchers cache this generationId passed through the constructor.
> This inmemory generationId is incremented whenever the zkClient session 
> expiration occurs.
> Any event from zookeeper server is dropped if the inmemory generationId is 
> not equal to the generationId cached in a zkWatcher.
> After the zkClient session expiration in a StreamProcessor, this will result 
> in zombie StreamProcessors and will stall message processing.
> In worst case, if this happens to the leader StreamProcessor then the whole 
> processors group will be stalled.
> Purpose of generationId:
> GenerationId was added initially to skip older stale events that are buffered 
> in worker queue after a session expiration phase and guard against the 
> following scenario:
> 1. Session expiration happens to the leader stream processor of the group.
> 2. Leader stream processor joins the group as a follower after the session 
> reconnect.
> 3. Leader might have zkEvents buffered in it's worker queue which were 
> delivered to it when it was a leader. If the leader acts upon these events it 
> will cause global state corruption.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1655) StreamProcessor skips all events from zookeeper server after zkClient session expiration.

2018-04-12 Thread Shanthoosh Venkataraman (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shanthoosh Venkataraman updated SAMZA-1655:
---
Description: 
*Problem:* StreamProcessor skips all events from zookeeper server after 
zkClient session expiration.

*Reason:*

All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are 
instantiated with an inmemory generationId(zkUtils.generationId).

All the zkWatchers cache this generationId passed through the constructor.

This inmemory generationId is incremented whenever the zkClient session 
expiration occurs.

Any event from zookeeper server is dropped if the inmemory generationId is not 
equal to the generationId cached in a zkWatcher.

After the zkClient session expiration in a StreamProcessor, this will result in 
zombie StreamProcessors and will stall message processing.

In worst case, if this happens to the leader StreamProcessor then the whole 
processors group will be stalled.

*Purpose of generationId:*

GenerationId was added initially to skip older stale events that are buffered 
in worker queue after a session expiration phase and guard against the 
following scenario:
 1. Session expiration happens to the leader stream processor of the group.
 2. Leader stream processor joins the group as a follower after the session 
reconnect.
 3. Leader might have zkEvents buffered in it's worker queue which were 
delivered to it when it was a leader. If the leader acts upon these events it 
will cause global state corruption.

 

  was:
*Problem:* StreamProcessor skips all events from zookeeper server after 
zkClient session expiration.

*Reason:*

All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are 
instantiated with an inmemory generationId(zkUtils.generationId).

All the zkWatchers cache this generationId passed through the constructor.

This inmemory generationId is incremented whenever the zkClient session 
expiration occurs.

Any event from zookeeper server is dropped if the inmemory generationId is not 
equal to the generationId cached in a zkWatcher.

After the zkClient session expiration in a StreamProcessor, this will result in 
zombie StreamProcessors and will stall message processing.

In worst case, if this happens to the leader StreamProcessor then the whole 
processors group will be stalled.

Purpose of generationId:

GenerationId was added initially to skip older stale events that are buffered 
in worker queue after a session expiration phase and guard against the 
following scenario:
1. Session expiration happens to the leader stream processor of the group.
2. Leader stream processor joins the group as a follower after the session 
reconnect.
3. Leader might have zkEvents buffered in it's worker queue which were 
delivered to it when it was a leader. If the leader acts upon these events it 
will cause global state corruption.

 


> StreamProcessor skips all events from zookeeper server after zkClient session 
> expiration.
> -
>
> Key: SAMZA-1655
> URL: https://issues.apache.org/jira/browse/SAMZA-1655
> Project: Samza
>  Issue Type: Bug
>Reporter: Shanthoosh Venkataraman
>Assignee: Shanthoosh Venkataraman
>Priority: Major
>
> *Problem:* StreamProcessor skips all events from zookeeper server after 
> zkClient session expiration.
> *Reason:*
> All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are 
> instantiated with an inmemory generationId(zkUtils.generationId).
> All the zkWatchers cache this generationId passed through the constructor.
> This inmemory generationId is incremented whenever the zkClient session 
> expiration occurs.
> Any event from zookeeper server is dropped if the inmemory generationId is 
> not equal to the generationId cached in a zkWatcher.
> After the zkClient session expiration in a StreamProcessor, this will result 
> in zombie StreamProcessors and will stall message processing.
> In worst case, if this happens to the leader StreamProcessor then the whole 
> processors group will be stalled.
> *Purpose of generationId:*
> GenerationId was added initially to skip older stale events that are buffered 
> in worker queue after a session expiration phase and guard against the 
> following scenario:
>  1. Session expiration happens to the leader stream processor of the group.
>  2. Leader stream processor joins the group as a follower after the session 
> reconnect.
>  3. Leader might have zkEvents buffered in it's worker queue which were 
> delivered to it when it was a leader. If the leader acts upon these events it 
> will cause global state corruption.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (SAMZA-1655) StreamProcessor skips all events from zookeeper server after zkClient session expiration.

2018-04-12 Thread Shanthoosh Venkataraman (JIRA)
Shanthoosh Venkataraman created SAMZA-1655:
--

 Summary: StreamProcessor skips all events from zookeeper server 
after zkClient session expiration.
 Key: SAMZA-1655
 URL: https://issues.apache.org/jira/browse/SAMZA-1655
 Project: Samza
  Issue Type: Bug
Reporter: Shanthoosh Venkataraman
Assignee: Shanthoosh Venkataraman


*Problem:* StreamProcessor skips all events from zookeeper server after 
zkClient session expiration.

*Reason:*

All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are 
instantiated with an inmemory generationId(zkUtils.generationId).

All the zkWatchers cache this generationId passed through the constructor.

This inmemory generationId is incremented whenever the zkClient session 
expiration occurs.

Any event from zookeeper server is dropped if the inmemory generationId is not 
equal to the generationId cached in a zkWatcher.

After the zkClient session expiration in a StreamProcessor, this will result in 
zombie StreamProcessors and will stall message processing.

In worst case, if this happens to the leader StreamProcessor then the whole 
processors group will be stalled.

Purpose of generationId:

GenerationId was added initially to skip older stale events that are buffered 
in worker queue after a session expiration phase and guard against the 
following scenario:
1. Session expiration happens to the leader stream processor of the group.
2. Leader stream processor joins the group as a follower after the session 
reconnect.
3. Leader might have zkEvents buffered in it's worker queue which were 
delivered to it when it was a leader. If the leader acts upon these events it 
will cause global state corruption.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


samza git commit: minor fix on eventhubs size limit for event body and partition key

2018-04-12 Thread jagadish
Repository: samza
Updated Branches:
  refs/heads/master bbcf14eba -> 3895a9070


minor fix on eventhubs size limit for event body and partition key

Author: Hai Lu 

Reviewers: Jagadish , Srinivasulu 
Punuru

Closes #470 from lhaiesp/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3895a907
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3895a907
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3895a907

Branch: refs/heads/master
Commit: 3895a90706a8b5db3e622a0bb7b581852b60f7fa
Parents: bbcf14e
Author: Hai Lu 
Authored: Thu Apr 12 13:31:39 2018 -0700
Committer: Jagadish 
Committed: Thu Apr 12 13:31:39 2018 -0700

--
 .../documentation/versioned/azure/eventhubs.md  |  4 
 .../samza/system/eventhub/EventHubConfig.java   |  6 --
 .../eventhub/producer/EventHubSystemProducer.java   | 16 +---
 3 files changed, 21 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/3895a907/docs/learn/documentation/versioned/azure/eventhubs.md
--
diff --git a/docs/learn/documentation/versioned/azure/eventhubs.md 
b/docs/learn/documentation/versioned/azure/eventhubs.md
index 7d76be3..ba7f760 100644
--- a/docs/learn/documentation/versioned/azure/eventhubs.md
+++ b/docs/learn/documentation/versioned/azure/eventhubs.md
@@ -58,6 +58,10 @@ collector.send(envelope);
 
 Each 
[OutgoingMessageEnvelope](https://samza.apache.org/learn/documentation/latest/api/javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html)
 is converted into an 
[EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data)
 instance whose body is set to the `message` in the envelope. Additionally, the 
`key` and the `produce timestamp` are set as properties in the EventData before 
sending it to EventHubs.
 
+ Size limit of partition key:
+
+Note that EventHubs has a limit on the length of partition key (128 
characters). In 
[EventHubSystemProducer](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java)
 we truncate the partition key if the size of the key exceeds the limit.
+
 ### Advanced configuration:
 
 # Producer partitioning: 

http://git-wip-us.apache.org/repos/asf/samza/blob/3895a907/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
--
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
index 6639dd8..e40b3c2 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
@@ -72,8 +72,10 @@ public class EventHubConfig extends MapConfig {
   public static final String CONFIG_CONSUMER_BUFFER_CAPACITY = 
"systems.%s.eventhubs.receive.queue.size";
   public static final int DEFAULT_CONFIG_CONSUMER_BUFFER_CAPACITY = 100;
 
-  // By default we will skip messages larger than 1MB.
-  private static final int DEFAULT_MAX_MESSAGE_SIZE = 1024 * 1024;
+  // By default we want to skip messages larger than 1MB. Also allow some 
buffer (24KB) to account for the overhead of
+  // metadata and key. So the default max message size will be 1000 KB 
(instead of precisely 1MB)
+  private static final int MESSAGE_HEADER_OVERHEAD = 24 * 1024;
+  private static final int DEFAULT_MAX_MESSAGE_SIZE = 1024 * 1024 - 
MESSAGE_HEADER_OVERHEAD;
 
   private final Map physcialToId = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/samza/blob/3895a907/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
--
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
index 3639bbc..55a2ae0 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
@@ -23,6 +23,7 @@ import com.microsoft.azure.eventhubs.EventData;
 import com.microsoft.azure.eventhubs.EventHubClient;
 import com.microsoft.azure.eventhubs.EventHubException;
 import com.microsoft.azure.eventhubs.PartitionSender;
+import com.microsoft.azure.eventhubs.impl.ClientConstants;
 import com.microsoft.azure.eventhubs.impl.EventDataImpl;
 import java.nio.charset.Charset;
 import java.time.Duration;
@@ -211

[jira] [Created] (SAMZA-1654) calculate the exact size of the AMQP message for skipping large message

2018-04-12 Thread Hai (JIRA)
Hai created SAMZA-1654:
--

 Summary: calculate the exact size of the AMQP message for skipping 
large message
 Key: SAMZA-1654
 URL: https://issues.apache.org/jira/browse/SAMZA-1654
 Project: Samza
  Issue Type: Improvement
Reporter: Hai
Assignee: Hai


Today we only calculate the event size based on the body size which is not 
accurate. We are waiting for EventHubs client to expose the API that returns 
the actual eventData size. See more info below

 

https://github.com/Azure/azure-event-hubs-java/issues/305



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (SAMZA-1653) Support waitForFinish() in remote and add duration to both

2018-04-12 Thread Xinyu Liu (JIRA)
Xinyu Liu created SAMZA-1653:


 Summary: Support waitForFinish() in remote and add duration to both
 Key: SAMZA-1653
 URL: https://issues.apache.org/jira/browse/SAMZA-1653
 Project: Samza
  Issue Type: Bug
Reporter: Xinyu Liu
Assignee: Xinyu Liu


# WE need the consistent api for waitForFinish() in remote runner
 # we need another variant of waitForFinish(duration)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (SAMZA-1649) Improve host-aware allocation to account for strict locality

2018-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/SAMZA-1649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16436176#comment-16436176
 ] 

ASF GitHub Bot commented on SAMZA-1649:
---

GitHub user vjagadish opened a pull request:

https://github.com/apache/samza/pull/471

SAMZA-1649: Improve host-aware allocation to account for strict locality

Working on a doc for the behavior of the CapacityScheduler and further 
testing is needed on an actual cluster - but here's a summary: 
 - Node-local requests are honored only when relax-locality = false
 - With relax-locality = true, the scheduler biases interactivity over 
data-locality for requests that ask for few resources relative to the size of 
the cluster. 

In addition to the above change, this PR also modifies the allocator 
algorithm to fallback to "ANY_HOST" requests so that we make progress when the 
node is unavailable.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vjagadish1989/samza relax-locality-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/samza/pull/471.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #471


commit a117356b59065a50f9bbffacacfb799d3ff2dc9f
Author: Jagadish 
Date:   2018-04-11T21:33:52Z

Initial patch for relax-locality

commit 002cb87d02fef3fee73439daee36074402675484
Author: Jagadish 
Date:   2018-04-12T00:22:05Z

initial stab at an unit test

commit 7ce9e9457e97280a8b09ad6684d22079fe527f12
Author: Jagadish 
Date:   2018-04-12T07:10:20Z

Add more assertions for ensuring containers get released

commit 2e51d378053b1f4df1e0892cbcdeaeae02f78d81
Author: Jagadish 
Date:   2018-04-12T15:37:21Z

Improve documentation and refactor the testExpiredContainers

commit 5ceafc5a50ab3874286889c68df4660b06237472
Author: Jagadish 
Date:   2018-04-12T16:15:04Z

Minor clean-ups: remove deprecated usages

commit f5b9d566879f34f813e65da0a2e0152d4470b8d4
Author: Jagadish 
Date:   2018-04-12T16:27:31Z

Remove additional debug logging. Simulate a cluster manager timeout




> Improve host-aware allocation to account for strict locality
> 
>
> Key: SAMZA-1649
> URL: https://issues.apache.org/jira/browse/SAMZA-1649
> Project: Samza
>  Issue Type: Bug
>Reporter: Jagadish
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (SAMZA-1652) Window high level API: End Of Stream message does not fire Window trigger

2018-04-12 Thread Aditya (JIRA)
Aditya created SAMZA-1652:
-

 Summary: Window high level API: End Of Stream message does not 
fire Window trigger
 Key: SAMZA-1652
 URL: https://issues.apache.org/jira/browse/SAMZA-1652
 Project: Samza
  Issue Type: Bug
Reporter: Aditya


StreamOperatorTask has code to handle EOS but EOS message itself is not passed 
to  StreamOperatorTask as it does not implement EndOfStreamListenerTask. 
[~jagadish1...@gmail.com] is looking into it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (SAMZA-1651) Samza-sql: Implement GROUP BY SQL operator

2018-04-12 Thread Aditya (JIRA)
Aditya created SAMZA-1651:
-

 Summary: Samza-sql: Implement GROUP BY SQL operator
 Key: SAMZA-1651
 URL: https://issues.apache.org/jira/browse/SAMZA-1651
 Project: Samza
  Issue Type: Bug
Reporter: Aditya






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (SAMZA-1650) Window high level API: Trigger is not fired at the end of trigger interval for tumbling window

2018-04-12 Thread Aditya (JIRA)
Aditya created SAMZA-1650:
-

 Summary: Window high level API: Trigger is not fired at the end of 
trigger interval for tumbling window
 Key: SAMZA-1650
 URL: https://issues.apache.org/jira/browse/SAMZA-1650
 Project: Samza
  Issue Type: Bug
Reporter: Aditya


[~jagadish1...@gmail.com] root-caused this to computeTriggerInterval() in 
JobNode where we are not computing the timeInterval if joinTtlInterval is not 
set, hence discarding windowTimerIntervals even if they are set.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (SAMZA-1649) Improve host-aware allocation to account for strict locality

2018-04-12 Thread Jagadish (JIRA)
Jagadish created SAMZA-1649:
---

 Summary: Improve host-aware allocation to account for strict 
locality
 Key: SAMZA-1649
 URL: https://issues.apache.org/jira/browse/SAMZA-1649
 Project: Samza
  Issue Type: Bug
Reporter: Jagadish






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (SAMZA-1648) Collection Stream

2018-04-12 Thread Sanil Jain (JIRA)
Sanil Jain created SAMZA-1648:
-

 Summary: Collection Stream 
 Key: SAMZA-1648
 URL: https://issues.apache.org/jira/browse/SAMZA-1648
 Project: Samza
  Issue Type: New Feature
  Components: test
Reporter: Sanil Jain
Assignee: Sanil Jain
 Fix For: 0.15.0


Create the downstream wiring with In Memory System with stream of collection, 
this api should provide end points to initialize a stream from an input 
collection to in memory file system and also have utility to access the state 
of this stream at any given time

Link to SEP:
[Samza Integration Test 
Framework|https://cwiki.apache.org/confluence/display/SAMZA/SEP-12%3A+Continuous+Integration+Test+Framework]




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)