[jira] [Commented] (SAMZA-1649) Improve host-aware allocation to account for strict locality
[ https://issues.apache.org/jira/browse/SAMZA-1649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16436697#comment-16436697 ] ASF GitHub Bot commented on SAMZA-1649: --- Github user asfgit closed the pull request at: https://github.com/apache/samza/pull/471 > Improve host-aware allocation to account for strict locality > > > Key: SAMZA-1649 > URL: https://issues.apache.org/jira/browse/SAMZA-1649 > Project: Samza > Issue Type: Bug >Reporter: Jagadish >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
samza git commit: SAMZA-1649: Improve host-aware allocation to account for strict locality
Repository: samza Updated Branches: refs/heads/master 96c333469 -> b6219f181 SAMZA-1649: Improve host-aware allocation to account for strict locality Currently working on a doc for the behavior of the CapacityScheduler and further testing is needed on an actual cluster - but here's a summary of why we should set relax-locality = false: - Node-local requests are honored only when relax-locality = false - With relax-locality = true, the scheduler biases interactivity over data-locality for requests that ask for few resources relative to the size of the cluster. In addition to the above change, this PR also modifies the allocator algorithm to fallback to "ANY_HOST" requests so that we make progress when the node is unavailable. Author: Jagadish Reviewers: Prateek Maheshwari Closes #471 from vjagadish/relax-locality-fix Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b6219f18 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b6219f18 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b6219f18 Branch: refs/heads/master Commit: b6219f181ff54e79aedcf6f1b7fff264925a8af7 Parents: 96c3334 Author: Jagadish Authored: Thu Apr 12 19:23:44 2018 -0700 Committer: Jagadish Committed: Thu Apr 12 19:23:44 2018 -0700 -- .../HostAwareContainerAllocator.java| 17 ++- .../clustermanager/ResourceRequestState.java| 18 +++ .../MockClusterResourceManager.java | 39 -- .../TestHostAwareContainerAllocator.java| 124 +-- .../job/yarn/YarnClusterResourceManager.java| 2 +- 5 files changed, 169 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/samza/blob/b6219f18/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java -- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java index 66e2246..fe462e7 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java @@ -78,13 +78,18 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator { boolean expired = requestExpired(request); boolean resourceAvailableOnAnyHost = hasAllocatedResource(ResourceRequestState.ANY_HOST); -if (expired && resourceAvailableOnAnyHost) { - log.info("Request expired. running on ANY_HOST"); - runStreamProcessor(request, ResourceRequestState.ANY_HOST); +if (expired) { + if (resourceAvailableOnAnyHost) { +log.info("Request for container: {} on {} has expired. Running on ANY_HOST", request.getContainerID(), request.getPreferredHost()); +runStreamProcessor(request, ResourceRequestState.ANY_HOST); + } else { +log.info("Request for container: {} on {} has expired. Requesting additional resources on ANY_HOST.", request.getContainerID(), request.getPreferredHost()); +resourceRequestState.cancelResourceRequest(request); +requestResource(containerID, ResourceRequestState.ANY_HOST); + } } else { - log.info("Either the request timestamp {} is greater than resource request timeout {}ms or we couldn't " - + "find any free allocated resources in the buffer. Breaking out of loop.", - request.getRequestTimestampMs(), requestTimeout); + log.info("Request for container: {} on {} has not yet expired. Request creation time: {}. Request timeout: {}", + new Object[]{request.getContainerID(), request.getPreferredHost(), request.getRequestTimestampMs(), requestTimeout}); break; } } http://git-wip-us.apache.org/repos/asf/samza/blob/b6219f18/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java -- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java index fe2067c..51caa39 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java @@ -102,6 +102,24 @@ public class ResourceRequestState { } /** + * Cancels a {@link SamzaResourceRequest} previously submitted to the {@link ClusterResourceManager} + * + * @param request {@link SamzaResourceRequest} to cancel + */ +
[jira] [Updated] (SAMZA-1655) StreamProcessor skips all events from zookeeper server after zkClient session expiration.
[ https://issues.apache.org/jira/browse/SAMZA-1655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shanthoosh Venkataraman updated SAMZA-1655: --- Description: *Problem:* StreamProcessor skips all events from zookeeper server after zkClient session expiration. *Reason:* All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are instantiated with an inmemory generationId(zkUtils.generationId). All the zkWatchers cache this generationId passed through the constructor. This inmemory generationId is incremented whenever the zkClient session expiration occurs. Any event from zookeeper server is dropped if the inmemory generationId is not equal to the generationId cached in a zkWatcher. After the zkClient session expiration in a StreamProcessor, this will result in zombie StreamProcessors and will stall message processing. In worst case, if this happens to the leader StreamProcessor then the whole processors group will be stalled. *Purpose of generationId:* GenerationId was added initially to skip older stale events that are buffered in worker queue after a session expiration phase to guard against the following scenario: 1. Session expiration happens to the leader stream processor of the group. 2. Leader stream processor joins the group as a follower after the reconnect to a different zkServer in the ensemble. 3. Leader might have zkEvents buffered in it's worker queue which were delivered to it when it was a leader. If the leader acts upon these events it will cause global state corruption. was: *Problem:* StreamProcessor skips all events from zookeeper server after zkClient session expiration. *Reason:* All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are instantiated with an inmemory generationId(zkUtils.generationId). All the zkWatchers cache this generationId passed through the constructor. This inmemory generationId is incremented whenever the zkClient session expiration occurs. Any event from zookeeper server is dropped if the inmemory generationId is not equal to the generationId cached in a zkWatcher. After the zkClient session expiration in a StreamProcessor, this will result in zombie StreamProcessors and will stall message processing. In worst case, if this happens to the leader StreamProcessor then the whole processors group will be stalled. *Purpose of generationId:* GenerationId was added initially to skip older stale events that are buffered in worker queue after a session expiration phase to guard against the following scenario: 1. Session expiration happens to the leader stream processor of the group. 2. Leader stream processor joins the group as a follower after the session reconnect. 3. Leader might have zkEvents buffered in it's worker queue which were delivered to it when it was a leader. If the leader acts upon these events it will cause global state corruption. > StreamProcessor skips all events from zookeeper server after zkClient session > expiration. > - > > Key: SAMZA-1655 > URL: https://issues.apache.org/jira/browse/SAMZA-1655 > Project: Samza > Issue Type: Bug >Reporter: Shanthoosh Venkataraman >Assignee: Shanthoosh Venkataraman >Priority: Major > > *Problem:* > StreamProcessor skips all events from zookeeper server after zkClient session > expiration. > *Reason:* > All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are > instantiated with an inmemory generationId(zkUtils.generationId). > All the zkWatchers cache this generationId passed through the constructor. > This inmemory generationId is incremented whenever the zkClient session > expiration occurs. > Any event from zookeeper server is dropped if the inmemory generationId is > not equal to the generationId cached in a zkWatcher. > After the zkClient session expiration in a StreamProcessor, this will result > in zombie StreamProcessors and will stall message processing. > In worst case, if this happens to the leader StreamProcessor then the whole > processors group will be stalled. > *Purpose of generationId:* > GenerationId was added initially to skip older stale events that are buffered > in worker queue after a session expiration phase to guard against the > following scenario: > 1. Session expiration happens to the leader stream processor of the group. > 2. Leader stream processor joins the group as a follower after the reconnect > to a different zkServer in the ensemble. > 3. Leader might have zkEvents buffered in it's worker queue which were > delivered to it when it was a leader. If the leader acts upon these events it > will cause global state corruption. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (SAMZA-1655) StreamProcessor skips all events from zookeeper server after zkClient session expiration.
[ https://issues.apache.org/jira/browse/SAMZA-1655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shanthoosh Venkataraman updated SAMZA-1655: --- Description: *Problem:* StreamProcessor skips all events from zookeeper server after zkClient session expiration. *Reason:* All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are instantiated with an inmemory generationId(zkUtils.generationId). All the zkWatchers cache this generationId passed through the constructor. This inmemory generationId is incremented whenever the zkClient session expiration occurs. Any event from zookeeper server is dropped if the inmemory generationId is not equal to the generationId cached in a zkWatcher. After the zkClient session expiration in a StreamProcessor, this will result in zombie StreamProcessors and will stall message processing. In worst case, if this happens to the leader StreamProcessor then the whole processors group will be stalled. *Purpose of generationId:* GenerationId was added initially to skip older stale events that are buffered in worker queue after a session expiration phase to guard against the following scenario: 1. Session expiration happens to the leader stream processor of the group. 2. Leader stream processor joins the group as a follower after the session reconnect. 3. Leader might have zkEvents buffered in it's worker queue which were delivered to it when it was a leader. If the leader acts upon these events it will cause global state corruption. was: *Problem:* StreamProcessor skips all events from zookeeper server after zkClient session expiration. *Reason:* All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are instantiated with an inmemory generationId(zkUtils.generationId). All the zkWatchers cache this generationId passed through the constructor. This inmemory generationId is incremented whenever the zkClient session expiration occurs. Any event from zookeeper server is dropped if the inmemory generationId is not equal to the generationId cached in a zkWatcher. After the zkClient session expiration in a StreamProcessor, this will result in zombie StreamProcessors and will stall message processing. In worst case, if this happens to the leader StreamProcessor then the whole processors group will be stalled. *Purpose of generationId:* GenerationId was added initially to skip older stale events that are buffered in worker queue after a session expiration phase and guard against the following scenario: 1. Session expiration happens to the leader stream processor of the group. 2. Leader stream processor joins the group as a follower after the session reconnect. 3. Leader might have zkEvents buffered in it's worker queue which were delivered to it when it was a leader. If the leader acts upon these events it will cause global state corruption. > StreamProcessor skips all events from zookeeper server after zkClient session > expiration. > - > > Key: SAMZA-1655 > URL: https://issues.apache.org/jira/browse/SAMZA-1655 > Project: Samza > Issue Type: Bug >Reporter: Shanthoosh Venkataraman >Assignee: Shanthoosh Venkataraman >Priority: Major > > *Problem:* > StreamProcessor skips all events from zookeeper server after zkClient session > expiration. > *Reason:* > All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are > instantiated with an inmemory generationId(zkUtils.generationId). > All the zkWatchers cache this generationId passed through the constructor. > This inmemory generationId is incremented whenever the zkClient session > expiration occurs. > Any event from zookeeper server is dropped if the inmemory generationId is > not equal to the generationId cached in a zkWatcher. > After the zkClient session expiration in a StreamProcessor, this will result > in zombie StreamProcessors and will stall message processing. > In worst case, if this happens to the leader StreamProcessor then the whole > processors group will be stalled. > *Purpose of generationId:* > GenerationId was added initially to skip older stale events that are buffered > in worker queue after a session expiration phase to guard against the > following scenario: > 1. Session expiration happens to the leader stream processor of the group. > 2. Leader stream processor joins the group as a follower after the session > reconnect. > 3. Leader might have zkEvents buffered in it's worker queue which were > delivered to it when it was a leader. If the leader acts upon these events it > will cause global state corruption. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (SAMZA-1650) Window high level API: Trigger is not fired at the end of trigger interval for tumbling window
[ https://issues.apache.org/jira/browse/SAMZA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16436496#comment-16436496 ] ASF GitHub Bot commented on SAMZA-1650: --- Github user asfgit closed the pull request at: https://github.com/apache/samza/pull/472 > Window high level API: Trigger is not fired at the end of trigger interval > for tumbling window > -- > > Key: SAMZA-1650 > URL: https://issues.apache.org/jira/browse/SAMZA-1650 > Project: Samza > Issue Type: Bug >Reporter: Aditya >Priority: Major > > [~jagadish1...@gmail.com] root-caused this to computeTriggerInterval() in > JobNode where we are not computing the timeInterval if joinTtlInterval is not > set, hence discarding windowTimerIntervals even if they are set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
samza git commit: SAMZA-1650: Fix for firing trigger at the end of trigger interval for tumbling window
Repository: samza Updated Branches: refs/heads/master 76de840c7 -> 96c333469 SAMZA-1650: Fix for firing trigger at the end of trigger interval for tumbling window Author: Aditya Toomula Reviewers: Jagadish Closes #472 from atoomula/window Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/96c33346 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/96c33346 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/96c33346 Branch: refs/heads/master Commit: 96c3334693f7d18d087dc5262dddfc7f315d37f1 Parents: 76de840 Author: Aditya Toomula Authored: Thu Apr 12 16:50:30 2018 -0700 Committer: Jagadish Committed: Thu Apr 12 16:50:30 2018 -0700 -- .../main/java/org/apache/samza/execution/JobNode.java| 11 +-- 1 file changed, 5 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/samza/blob/96c33346/samza-core/src/main/java/org/apache/samza/execution/JobNode.java -- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java index c0b4ee5..bc85d00 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java @@ -334,17 +334,16 @@ public class JobNode { .map(spec -> ((JoinOperatorSpec) spec).getTtlMs()) .collect(Collectors.toList()); -if (joinTtlIntervals.isEmpty()) { - return -1; -} - // Combine both the above lists List candidateTimerIntervals = new ArrayList<>(joinTtlIntervals); candidateTimerIntervals.addAll(windowTimerIntervals); +if (candidateTimerIntervals.isEmpty()) { + return -1; +} + // Compute the gcd of the resultant list -long timerInterval = MathUtils.gcd(candidateTimerIntervals); -return timerInterval; +return MathUtils.gcd(candidateTimerIntervals); } /**
[jira] [Commented] (SAMZA-1650) Window high level API: Trigger is not fired at the end of trigger interval for tumbling window
[ https://issues.apache.org/jira/browse/SAMZA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16436494#comment-16436494 ] ASF GitHub Bot commented on SAMZA-1650: --- GitHub user atoomula opened a pull request: https://github.com/apache/samza/pull/472 SAMZA-1650: Fix for firing trigger at the end of trigger interval for tumbling window You can merge this pull request into a Git repository by running: $ git pull https://github.com/atoomula/samza window Alternatively you can review and apply these changes as the patch at: https://github.com/apache/samza/pull/472.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #472 commit 82937f851cb1e1b5a2e2048582a60cba3791efba Author: Aditya Toomula Date: 2018-04-12T21:47:19Z Window high level API: Trigger is not fired at the end of trigger interval for tumbling window > Window high level API: Trigger is not fired at the end of trigger interval > for tumbling window > -- > > Key: SAMZA-1650 > URL: https://issues.apache.org/jira/browse/SAMZA-1650 > Project: Samza > Issue Type: Bug >Reporter: Aditya >Priority: Major > > [~jagadish1...@gmail.com] root-caused this to computeTriggerInterval() in > JobNode where we are not computing the timeInterval if joinTtlInterval is not > set, hence discarding windowTimerIntervals even if they are set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (SAMZA-1584) Improve logging in StreamProcessor.
[ https://issues.apache.org/jira/browse/SAMZA-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16436339#comment-16436339 ] ASF GitHub Bot commented on SAMZA-1584: --- Github user asfgit closed the pull request at: https://github.com/apache/samza/pull/441 > Improve logging in StreamProcessor. > --- > > Key: SAMZA-1584 > URL: https://issues.apache.org/jira/browse/SAMZA-1584 > Project: Samza > Issue Type: Bug >Reporter: Shanthoosh Venkataraman >Assignee: Shanthoosh Venkataraman >Priority: Major > > In existing StreamProcessor implementation, we don't have sufficient logging > in it's life cycle methods and callback handlers. This makes the production > debugging a tedious task. This Jira is created for bookkeeping purposes to > track work required to improve it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (SAMZA-1584) Improve logging in StreamProcessor.
[ https://issues.apache.org/jira/browse/SAMZA-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Prateek Maheshwari resolved SAMZA-1584. --- Resolution: Fixed > Improve logging in StreamProcessor. > --- > > Key: SAMZA-1584 > URL: https://issues.apache.org/jira/browse/SAMZA-1584 > Project: Samza > Issue Type: Bug >Reporter: Shanthoosh Venkataraman >Assignee: Shanthoosh Venkataraman >Priority: Major > > In existing StreamProcessor implementation, we don't have sufficient logging > in it's life cycle methods and callback handlers. This makes the production > debugging a tedious task. This Jira is created for bookkeeping purposes to > track work required to improve it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
samza git commit: SAMZA-1584: Improve logging in StreamProcessor.
Repository: samza Updated Branches: refs/heads/master 3895a9070 -> 76de840c7 SAMZA-1584: Improve logging in StreamProcessor. Add the processorID in the log lines wherever necessary(since we support running multiple stream applications in a JVM) and improving logging in general in StreamProcessor. Author: Shanthoosh Venkataraman Author: Shanthoosh Venkataraman Reviewers: Prateek Maheshwari Closes #441 from shanthoosh/SAMZA-1584 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/76de840c Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/76de840c Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/76de840c Branch: refs/heads/master Commit: 76de840c734fe2b7987af22d1ba6133437c25a5e Parents: 3895a90 Author: Shanthoosh Venkataraman Authored: Thu Apr 12 14:34:45 2018 -0700 Committer: Prateek Maheshwari Committed: Thu Apr 12 14:34:45 2018 -0700 -- .../apache/samza/processor/StreamProcessor.java | 54 ++-- 1 file changed, 27 insertions(+), 27 deletions(-) -- http://git-wip-us.apache.org/repos/asf/samza/blob/76de840c/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java -- diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index b548200..8dacc6c 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.samza.SamzaContainerStatus; import org.apache.samza.annotation.InterfaceStability; @@ -55,6 +56,7 @@ import org.slf4j.LoggerFactory; @InterfaceStability.Evolving public class StreamProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(StreamProcessor.class); + private static final String CONTAINER_THREAD_NAME_FORMAT = "Samza StreamProcessor Container Thread-%d"; private final JobCoordinator jobCoordinator; private final StreamProcessorLifecycleListener processorListener; @@ -71,7 +73,7 @@ public class StreamProcessor { // Latch used to synchronize between the JobCoordinator thread and the container thread, when the container is // stopped due to re-balancing - /* package private */volatile CountDownLatch jcContainerShutdownLatch; + volatile CountDownLatch jcContainerShutdownLatch; private volatile boolean processorOnStartCalled = false; @VisibleForTesting @@ -179,11 +181,12 @@ public class StreamProcessor { boolean containerShutdownInvoked = false; if (container != null) { try { -LOGGER.info("Shutting down container " + container.toString() + " from StreamProcessor"); +LOGGER.info("Shutting down the container: {} of stream processor: {}.", container, processorId); container.shutdown(); +LOGGER.info("Waiting {} milliseconds for the container: {} to shutdown.", taskShutdownMs, container); containerShutdownInvoked = true; - } catch (IllegalContainerStateException icse) { -LOGGER.info("Container was not running", icse); + } catch (Exception exception) { +LOGGER.error(String.format("Ignoring the exception during the shutdown of container: %s.", container), exception); } } @@ -191,7 +194,6 @@ public class StreamProcessor { LOGGER.info("Shutting down JobCoordinator from StreamProcessor"); jobCoordinator.stop(); } - } SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) { @@ -199,7 +201,7 @@ public class StreamProcessor { processorId, jobModel, config, -Util.javaMapAsScalaMap(customMetricsReporter), +Util.javaMapAsScalaMap(customMetricsReporter), taskFactory); } @@ -213,32 +215,30 @@ public class StreamProcessor { if (SamzaContainerStatus.NOT_STARTED.equals(status) || SamzaContainerStatus.STARTED.equals(status)) { boolean shutdownComplete = false; try { - LOGGER.info("Shutting down container in onJobModelExpired for processor:" + processorId); + LOGGER.info("Job model expired. Shutting down the container: {} of stream processor: {}.", container, processorId); container.pause(); shutdownComplete = jcContainerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS); - LOGGER.info("ShutdownComplete=" + shutdownComplete); + LOGGER.inf
[jira] [Updated] (SAMZA-1655) StreamProcessor skips all events from zookeeper server after zkClient session expiration.
[ https://issues.apache.org/jira/browse/SAMZA-1655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shanthoosh Venkataraman updated SAMZA-1655: --- Description: *Problem:* StreamProcessor skips all events from zookeeper server after zkClient session expiration. *Reason:* All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are instantiated with an inmemory generationId(zkUtils.generationId). All the zkWatchers cache this generationId passed through the constructor. This inmemory generationId is incremented whenever the zkClient session expiration occurs. Any event from zookeeper server is dropped if the inmemory generationId is not equal to the generationId cached in a zkWatcher. After the zkClient session expiration in a StreamProcessor, this will result in zombie StreamProcessors and will stall message processing. In worst case, if this happens to the leader StreamProcessor then the whole processors group will be stalled. *Purpose of generationId:* GenerationId was added initially to skip older stale events that are buffered in worker queue after a session expiration phase and guard against the following scenario: 1. Session expiration happens to the leader stream processor of the group. 2. Leader stream processor joins the group as a follower after the session reconnect. 3. Leader might have zkEvents buffered in it's worker queue which were delivered to it when it was a leader. If the leader acts upon these events it will cause global state corruption. was: *Problem:* StreamProcessor skips all events from zookeeper server after zkClient session expiration. *Reason:* All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are instantiated with an inmemory generationId(zkUtils.generationId). All the zkWatchers cache this generationId passed through the constructor. This inmemory generationId is incremented whenever the zkClient session expiration occurs. Any event from zookeeper server is dropped if the inmemory generationId is not equal to the generationId cached in a zkWatcher. After the zkClient session expiration in a StreamProcessor, this will result in zombie StreamProcessors and will stall message processing. In worst case, if this happens to the leader StreamProcessor then the whole processors group will be stalled. *Purpose of generationId:* GenerationId was added initially to skip older stale events that are buffered in worker queue after a session expiration phase and guard against the following scenario: 1. Session expiration happens to the leader stream processor of the group. 2. Leader stream processor joins the group as a follower after the session reconnect. 3. Leader might have zkEvents buffered in it's worker queue which were delivered to it when it was a leader. If the leader acts upon these events it will cause global state corruption. > StreamProcessor skips all events from zookeeper server after zkClient session > expiration. > - > > Key: SAMZA-1655 > URL: https://issues.apache.org/jira/browse/SAMZA-1655 > Project: Samza > Issue Type: Bug >Reporter: Shanthoosh Venkataraman >Assignee: Shanthoosh Venkataraman >Priority: Major > > *Problem:* > StreamProcessor skips all events from zookeeper server after zkClient session > expiration. > *Reason:* > All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are > instantiated with an inmemory generationId(zkUtils.generationId). > All the zkWatchers cache this generationId passed through the constructor. > This inmemory generationId is incremented whenever the zkClient session > expiration occurs. > Any event from zookeeper server is dropped if the inmemory generationId is > not equal to the generationId cached in a zkWatcher. > After the zkClient session expiration in a StreamProcessor, this will result > in zombie StreamProcessors and will stall message processing. > In worst case, if this happens to the leader StreamProcessor then the whole > processors group will be stalled. > *Purpose of generationId:* > GenerationId was added initially to skip older stale events that are buffered > in worker queue after a session expiration phase and guard against the > following scenario: > 1. Session expiration happens to the leader stream processor of the group. > 2. Leader stream processor joins the group as a follower after the session > reconnect. > 3. Leader might have zkEvents buffered in it's worker queue which were > delivered to it when it was a leader. If the leader acts upon these events it > will cause global state corruption. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (SAMZA-1655) StreamProcessor skips all events from zookeeper server after zkClient session expiration.
[ https://issues.apache.org/jira/browse/SAMZA-1655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16436310#comment-16436310 ] Shanthoosh Venkataraman commented on SAMZA-1655: [~xinyu] [~jagadish1...@gmail.com] Please review it and share your thoughts. > StreamProcessor skips all events from zookeeper server after zkClient session > expiration. > - > > Key: SAMZA-1655 > URL: https://issues.apache.org/jira/browse/SAMZA-1655 > Project: Samza > Issue Type: Bug >Reporter: Shanthoosh Venkataraman >Assignee: Shanthoosh Venkataraman >Priority: Major > > *Problem:* > StreamProcessor skips all events from zookeeper server after zkClient session > expiration. > *Reason:* > All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are > instantiated with an inmemory generationId(zkUtils.generationId). > All the zkWatchers cache this generationId passed through the constructor. > This inmemory generationId is incremented whenever the zkClient session > expiration occurs. > Any event from zookeeper server is dropped if the inmemory generationId is > not equal to the generationId cached in a zkWatcher. > After the zkClient session expiration in a StreamProcessor, this will result > in zombie StreamProcessors and will stall message processing. > In worst case, if this happens to the leader StreamProcessor then the whole > processors group will be stalled. > *Purpose of generationId:* > GenerationId was added initially to skip older stale events that are buffered > in worker queue after a session expiration phase and guard against the > following scenario: > 1. Session expiration happens to the leader stream processor of the group. > 2. Leader stream processor joins the group as a follower after the session > reconnect. > 3. Leader might have zkEvents buffered in it's worker queue which were > delivered to it when it was a leader. If the leader acts upon these events it > will cause global state corruption. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (SAMZA-1655) StreamProcessor skips all events from zookeeper server after zkClient session expiration.
[ https://issues.apache.org/jira/browse/SAMZA-1655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shanthoosh Venkataraman updated SAMZA-1655: --- Description: *Problem:* StreamProcessor skips all events from zookeeper server after zkClient session expiration. *Reason:* All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are instantiated with an inmemory generationId(zkUtils.generationId). All the zkWatchers cache this generationId passed through the constructor. This inmemory generationId is incremented whenever the zkClient session expiration occurs. Any event from zookeeper server is dropped if the inmemory generationId is not equal to the generationId cached in a zkWatcher. After the zkClient session expiration in a StreamProcessor, this will result in zombie StreamProcessors and will stall message processing. In worst case, if this happens to the leader StreamProcessor then the whole processors group will be stalled. *Purpose of generationId:* GenerationId was added initially to skip older stale events that are buffered in worker queue after a session expiration phase and guard against the following scenario: 1. Session expiration happens to the leader stream processor of the group. 2. Leader stream processor joins the group as a follower after the session reconnect. 3. Leader might have zkEvents buffered in it's worker queue which were delivered to it when it was a leader. If the leader acts upon these events it will cause global state corruption. was: *Problem:* StreamProcessor skips all events from zookeeper server after zkClient session expiration. *Reason:* All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are instantiated with an inmemory generationId(zkUtils.generationId). All the zkWatchers cache this generationId passed through the constructor. This inmemory generationId is incremented whenever the zkClient session expiration occurs. Any event from zookeeper server is dropped if the inmemory generationId is not equal to the generationId cached in a zkWatcher. After the zkClient session expiration in a StreamProcessor, this will result in zombie StreamProcessors and will stall message processing. In worst case, if this happens to the leader StreamProcessor then the whole processors group will be stalled. *Purpose of generationId:* GenerationId was added initially to skip older stale events that are buffered in worker queue after a session expiration phase and guard against the following scenario: 1. Session expiration happens to the leader stream processor of the group. 2. Leader stream processor joins the group as a follower after the session reconnect. 3. Leader might have zkEvents buffered in it's worker queue which were delivered to it when it was a leader. If the leader acts upon these events it will cause global state corruption. > StreamProcessor skips all events from zookeeper server after zkClient session > expiration. > - > > Key: SAMZA-1655 > URL: https://issues.apache.org/jira/browse/SAMZA-1655 > Project: Samza > Issue Type: Bug >Reporter: Shanthoosh Venkataraman >Assignee: Shanthoosh Venkataraman >Priority: Major > > *Problem:* > StreamProcessor skips all events from zookeeper server after zkClient session > expiration. > *Reason:* > All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are > instantiated with an inmemory generationId(zkUtils.generationId). > All the zkWatchers cache this generationId passed through the constructor. > This inmemory generationId is incremented whenever the zkClient session > expiration occurs. > Any event from zookeeper server is dropped if the inmemory generationId is > not equal to the generationId cached in a zkWatcher. > After the zkClient session expiration in a StreamProcessor, this will result > in zombie StreamProcessors and will stall message processing. > In worst case, if this happens to the leader StreamProcessor then the whole > processors group will be stalled. > *Purpose of generationId:* > GenerationId was added initially to skip older stale events that are buffered > in worker queue after a session expiration phase and guard against the > following scenario: > 1. Session expiration happens to the leader stream processor of the group. > 2. Leader stream processor joins the group as a follower after the session > reconnect. > 3. Leader might have zkEvents buffered in it's worker queue which were > delivered to it when it was a leader. If the leader acts upon these events it > will cause global state corruption. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (SAMZA-1655) StreamProcessor skips all events from zookeeper server after zkClient session expiration.
[ https://issues.apache.org/jira/browse/SAMZA-1655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16436304#comment-16436304 ] Shanthoosh Venkataraman commented on SAMZA-1655: *Fix:* A. Route all the watcher events from the zookeeper server to the ScheduleAfterDebounce worker queue and execute it through the worker thread. After this change, all the events before an session expiration will be buffered in the ScheduleAfterDebounce worker queue. Upon session expiration, stale buffered events in the queue will be deleted. When a processor has it's session expired from one zookeeper server and reconnects to other zookeeper server in the ensemble, we reconnect and recreate the ephemeral processor node(because the previous ephemeral node of the processor is gone). This situation is synonymous to a new processor joining the processors group. So clearing the stale buffered zookeeper events in ScheduleAfterDebounce worker queue is mandatory and should not cause corruption issues. B. Remove generationId based check before zkWatcherEvent handling and the associated relevant code. > StreamProcessor skips all events from zookeeper server after zkClient session > expiration. > - > > Key: SAMZA-1655 > URL: https://issues.apache.org/jira/browse/SAMZA-1655 > Project: Samza > Issue Type: Bug >Reporter: Shanthoosh Venkataraman >Assignee: Shanthoosh Venkataraman >Priority: Major > > *Problem:* StreamProcessor skips all events from zookeeper server after > zkClient session expiration. > *Reason:* > All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are > instantiated with an inmemory generationId(zkUtils.generationId). > All the zkWatchers cache this generationId passed through the constructor. > This inmemory generationId is incremented whenever the zkClient session > expiration occurs. > Any event from zookeeper server is dropped if the inmemory generationId is > not equal to the generationId cached in a zkWatcher. > After the zkClient session expiration in a StreamProcessor, this will result > in zombie StreamProcessors and will stall message processing. > In worst case, if this happens to the leader StreamProcessor then the whole > processors group will be stalled. > Purpose of generationId: > GenerationId was added initially to skip older stale events that are buffered > in worker queue after a session expiration phase and guard against the > following scenario: > 1. Session expiration happens to the leader stream processor of the group. > 2. Leader stream processor joins the group as a follower after the session > reconnect. > 3. Leader might have zkEvents buffered in it's worker queue which were > delivered to it when it was a leader. If the leader acts upon these events it > will cause global state corruption. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (SAMZA-1655) StreamProcessor skips all events from zookeeper server after zkClient session expiration.
[ https://issues.apache.org/jira/browse/SAMZA-1655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shanthoosh Venkataraman updated SAMZA-1655: --- Description: *Problem:* StreamProcessor skips all events from zookeeper server after zkClient session expiration. *Reason:* All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are instantiated with an inmemory generationId(zkUtils.generationId). All the zkWatchers cache this generationId passed through the constructor. This inmemory generationId is incremented whenever the zkClient session expiration occurs. Any event from zookeeper server is dropped if the inmemory generationId is not equal to the generationId cached in a zkWatcher. After the zkClient session expiration in a StreamProcessor, this will result in zombie StreamProcessors and will stall message processing. In worst case, if this happens to the leader StreamProcessor then the whole processors group will be stalled. *Purpose of generationId:* GenerationId was added initially to skip older stale events that are buffered in worker queue after a session expiration phase and guard against the following scenario: 1. Session expiration happens to the leader stream processor of the group. 2. Leader stream processor joins the group as a follower after the session reconnect. 3. Leader might have zkEvents buffered in it's worker queue which were delivered to it when it was a leader. If the leader acts upon these events it will cause global state corruption. was: *Problem:* StreamProcessor skips all events from zookeeper server after zkClient session expiration. *Reason:* All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are instantiated with an inmemory generationId(zkUtils.generationId). All the zkWatchers cache this generationId passed through the constructor. This inmemory generationId is incremented whenever the zkClient session expiration occurs. Any event from zookeeper server is dropped if the inmemory generationId is not equal to the generationId cached in a zkWatcher. After the zkClient session expiration in a StreamProcessor, this will result in zombie StreamProcessors and will stall message processing. In worst case, if this happens to the leader StreamProcessor then the whole processors group will be stalled. Purpose of generationId: GenerationId was added initially to skip older stale events that are buffered in worker queue after a session expiration phase and guard against the following scenario: 1. Session expiration happens to the leader stream processor of the group. 2. Leader stream processor joins the group as a follower after the session reconnect. 3. Leader might have zkEvents buffered in it's worker queue which were delivered to it when it was a leader. If the leader acts upon these events it will cause global state corruption. > StreamProcessor skips all events from zookeeper server after zkClient session > expiration. > - > > Key: SAMZA-1655 > URL: https://issues.apache.org/jira/browse/SAMZA-1655 > Project: Samza > Issue Type: Bug >Reporter: Shanthoosh Venkataraman >Assignee: Shanthoosh Venkataraman >Priority: Major > > *Problem:* StreamProcessor skips all events from zookeeper server after > zkClient session expiration. > *Reason:* > All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are > instantiated with an inmemory generationId(zkUtils.generationId). > All the zkWatchers cache this generationId passed through the constructor. > This inmemory generationId is incremented whenever the zkClient session > expiration occurs. > Any event from zookeeper server is dropped if the inmemory generationId is > not equal to the generationId cached in a zkWatcher. > After the zkClient session expiration in a StreamProcessor, this will result > in zombie StreamProcessors and will stall message processing. > In worst case, if this happens to the leader StreamProcessor then the whole > processors group will be stalled. > *Purpose of generationId:* > GenerationId was added initially to skip older stale events that are buffered > in worker queue after a session expiration phase and guard against the > following scenario: > 1. Session expiration happens to the leader stream processor of the group. > 2. Leader stream processor joins the group as a follower after the session > reconnect. > 3. Leader might have zkEvents buffered in it's worker queue which were > delivered to it when it was a leader. If the leader acts upon these events it > will cause global state corruption. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (SAMZA-1655) StreamProcessor skips all events from zookeeper server after zkClient session expiration.
Shanthoosh Venkataraman created SAMZA-1655: -- Summary: StreamProcessor skips all events from zookeeper server after zkClient session expiration. Key: SAMZA-1655 URL: https://issues.apache.org/jira/browse/SAMZA-1655 Project: Samza Issue Type: Bug Reporter: Shanthoosh Venkataraman Assignee: Shanthoosh Venkataraman *Problem:* StreamProcessor skips all events from zookeeper server after zkClient session expiration. *Reason:* All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are instantiated with an inmemory generationId(zkUtils.generationId). All the zkWatchers cache this generationId passed through the constructor. This inmemory generationId is incremented whenever the zkClient session expiration occurs. Any event from zookeeper server is dropped if the inmemory generationId is not equal to the generationId cached in a zkWatcher. After the zkClient session expiration in a StreamProcessor, this will result in zombie StreamProcessors and will stall message processing. In worst case, if this happens to the leader StreamProcessor then the whole processors group will be stalled. Purpose of generationId: GenerationId was added initially to skip older stale events that are buffered in worker queue after a session expiration phase and guard against the following scenario: 1. Session expiration happens to the leader stream processor of the group. 2. Leader stream processor joins the group as a follower after the session reconnect. 3. Leader might have zkEvents buffered in it's worker queue which were delivered to it when it was a leader. If the leader acts upon these events it will cause global state corruption. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
samza git commit: minor fix on eventhubs size limit for event body and partition key
Repository: samza Updated Branches: refs/heads/master bbcf14eba -> 3895a9070 minor fix on eventhubs size limit for event body and partition key Author: Hai Lu Reviewers: Jagadish , Srinivasulu Punuru Closes #470 from lhaiesp/master Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3895a907 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3895a907 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3895a907 Branch: refs/heads/master Commit: 3895a90706a8b5db3e622a0bb7b581852b60f7fa Parents: bbcf14e Author: Hai Lu Authored: Thu Apr 12 13:31:39 2018 -0700 Committer: Jagadish Committed: Thu Apr 12 13:31:39 2018 -0700 -- .../documentation/versioned/azure/eventhubs.md | 4 .../samza/system/eventhub/EventHubConfig.java | 6 -- .../eventhub/producer/EventHubSystemProducer.java | 16 +--- 3 files changed, 21 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/samza/blob/3895a907/docs/learn/documentation/versioned/azure/eventhubs.md -- diff --git a/docs/learn/documentation/versioned/azure/eventhubs.md b/docs/learn/documentation/versioned/azure/eventhubs.md index 7d76be3..ba7f760 100644 --- a/docs/learn/documentation/versioned/azure/eventhubs.md +++ b/docs/learn/documentation/versioned/azure/eventhubs.md @@ -58,6 +58,10 @@ collector.send(envelope); Each [OutgoingMessageEnvelope](https://samza.apache.org/learn/documentation/latest/api/javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html) is converted into an [EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data) instance whose body is set to the `message` in the envelope. Additionally, the `key` and the `produce timestamp` are set as properties in the EventData before sending it to EventHubs. + Size limit of partition key: + +Note that EventHubs has a limit on the length of partition key (128 characters). In [EventHubSystemProducer](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java) we truncate the partition key if the size of the key exceeds the limit. + ### Advanced configuration: # Producer partitioning: http://git-wip-us.apache.org/repos/asf/samza/blob/3895a907/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java -- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java index 6639dd8..e40b3c2 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java @@ -72,8 +72,10 @@ public class EventHubConfig extends MapConfig { public static final String CONFIG_CONSUMER_BUFFER_CAPACITY = "systems.%s.eventhubs.receive.queue.size"; public static final int DEFAULT_CONFIG_CONSUMER_BUFFER_CAPACITY = 100; - // By default we will skip messages larger than 1MB. - private static final int DEFAULT_MAX_MESSAGE_SIZE = 1024 * 1024; + // By default we want to skip messages larger than 1MB. Also allow some buffer (24KB) to account for the overhead of + // metadata and key. So the default max message size will be 1000 KB (instead of precisely 1MB) + private static final int MESSAGE_HEADER_OVERHEAD = 24 * 1024; + private static final int DEFAULT_MAX_MESSAGE_SIZE = 1024 * 1024 - MESSAGE_HEADER_OVERHEAD; private final Map physcialToId = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/samza/blob/3895a907/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java -- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java index 3639bbc..55a2ae0 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java @@ -23,6 +23,7 @@ import com.microsoft.azure.eventhubs.EventData; import com.microsoft.azure.eventhubs.EventHubClient; import com.microsoft.azure.eventhubs.EventHubException; import com.microsoft.azure.eventhubs.PartitionSender; +import com.microsoft.azure.eventhubs.impl.ClientConstants; import com.microsoft.azure.eventhubs.impl.EventDataImpl; import java.nio.charset.Charset; import java.time.Duration; @@ -211
[jira] [Created] (SAMZA-1654) calculate the exact size of the AMQP message for skipping large message
Hai created SAMZA-1654: -- Summary: calculate the exact size of the AMQP message for skipping large message Key: SAMZA-1654 URL: https://issues.apache.org/jira/browse/SAMZA-1654 Project: Samza Issue Type: Improvement Reporter: Hai Assignee: Hai Today we only calculate the event size based on the body size which is not accurate. We are waiting for EventHubs client to expose the API that returns the actual eventData size. See more info below https://github.com/Azure/azure-event-hubs-java/issues/305 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (SAMZA-1653) Support waitForFinish() in remote and add duration to both
Xinyu Liu created SAMZA-1653: Summary: Support waitForFinish() in remote and add duration to both Key: SAMZA-1653 URL: https://issues.apache.org/jira/browse/SAMZA-1653 Project: Samza Issue Type: Bug Reporter: Xinyu Liu Assignee: Xinyu Liu # WE need the consistent api for waitForFinish() in remote runner # we need another variant of waitForFinish(duration) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (SAMZA-1649) Improve host-aware allocation to account for strict locality
[ https://issues.apache.org/jira/browse/SAMZA-1649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16436176#comment-16436176 ] ASF GitHub Bot commented on SAMZA-1649: --- GitHub user vjagadish opened a pull request: https://github.com/apache/samza/pull/471 SAMZA-1649: Improve host-aware allocation to account for strict locality Working on a doc for the behavior of the CapacityScheduler and further testing is needed on an actual cluster - but here's a summary: - Node-local requests are honored only when relax-locality = false - With relax-locality = true, the scheduler biases interactivity over data-locality for requests that ask for few resources relative to the size of the cluster. In addition to the above change, this PR also modifies the allocator algorithm to fallback to "ANY_HOST" requests so that we make progress when the node is unavailable. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vjagadish1989/samza relax-locality-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/samza/pull/471.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #471 commit a117356b59065a50f9bbffacacfb799d3ff2dc9f Author: Jagadish Date: 2018-04-11T21:33:52Z Initial patch for relax-locality commit 002cb87d02fef3fee73439daee36074402675484 Author: Jagadish Date: 2018-04-12T00:22:05Z initial stab at an unit test commit 7ce9e9457e97280a8b09ad6684d22079fe527f12 Author: Jagadish Date: 2018-04-12T07:10:20Z Add more assertions for ensuring containers get released commit 2e51d378053b1f4df1e0892cbcdeaeae02f78d81 Author: Jagadish Date: 2018-04-12T15:37:21Z Improve documentation and refactor the testExpiredContainers commit 5ceafc5a50ab3874286889c68df4660b06237472 Author: Jagadish Date: 2018-04-12T16:15:04Z Minor clean-ups: remove deprecated usages commit f5b9d566879f34f813e65da0a2e0152d4470b8d4 Author: Jagadish Date: 2018-04-12T16:27:31Z Remove additional debug logging. Simulate a cluster manager timeout > Improve host-aware allocation to account for strict locality > > > Key: SAMZA-1649 > URL: https://issues.apache.org/jira/browse/SAMZA-1649 > Project: Samza > Issue Type: Bug >Reporter: Jagadish >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (SAMZA-1652) Window high level API: End Of Stream message does not fire Window trigger
Aditya created SAMZA-1652: - Summary: Window high level API: End Of Stream message does not fire Window trigger Key: SAMZA-1652 URL: https://issues.apache.org/jira/browse/SAMZA-1652 Project: Samza Issue Type: Bug Reporter: Aditya StreamOperatorTask has code to handle EOS but EOS message itself is not passed to StreamOperatorTask as it does not implement EndOfStreamListenerTask. [~jagadish1...@gmail.com] is looking into it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (SAMZA-1651) Samza-sql: Implement GROUP BY SQL operator
Aditya created SAMZA-1651: - Summary: Samza-sql: Implement GROUP BY SQL operator Key: SAMZA-1651 URL: https://issues.apache.org/jira/browse/SAMZA-1651 Project: Samza Issue Type: Bug Reporter: Aditya -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (SAMZA-1650) Window high level API: Trigger is not fired at the end of trigger interval for tumbling window
Aditya created SAMZA-1650: - Summary: Window high level API: Trigger is not fired at the end of trigger interval for tumbling window Key: SAMZA-1650 URL: https://issues.apache.org/jira/browse/SAMZA-1650 Project: Samza Issue Type: Bug Reporter: Aditya [~jagadish1...@gmail.com] root-caused this to computeTriggerInterval() in JobNode where we are not computing the timeInterval if joinTtlInterval is not set, hence discarding windowTimerIntervals even if they are set. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (SAMZA-1649) Improve host-aware allocation to account for strict locality
Jagadish created SAMZA-1649: --- Summary: Improve host-aware allocation to account for strict locality Key: SAMZA-1649 URL: https://issues.apache.org/jira/browse/SAMZA-1649 Project: Samza Issue Type: Bug Reporter: Jagadish -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (SAMZA-1648) Collection Stream
Sanil Jain created SAMZA-1648: - Summary: Collection Stream Key: SAMZA-1648 URL: https://issues.apache.org/jira/browse/SAMZA-1648 Project: Samza Issue Type: New Feature Components: test Reporter: Sanil Jain Assignee: Sanil Jain Fix For: 0.15.0 Create the downstream wiring with In Memory System with stream of collection, this api should provide end points to initialize a stream from an input collection to in memory file system and also have utility to access the state of this stream at any given time Link to SEP: [Samza Integration Test Framework|https://cwiki.apache.org/confluence/display/SAMZA/SEP-12%3A+Continuous+Integration+Test+Framework] -- This message was sent by Atlassian JIRA (v7.6.3#76005)