[jira] (SAMZA-1081) Update all existing SystemAdmins to implement the new createStream() method

2017-01-30 Thread Jake Maes (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Jake Maes created an issue 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Samza /  SAMZA-1081 
 
 
 
  Update all existing SystemAdmins to implement the new createStream() method  
 
 
 
 
 
 
 
 
 

Issue Type:
 
  Sub-task 
 
 
 

Assignee:
 
 Jake Maes 
 
 
 

Created:
 

 30/Jan/17 18:03 
 
 
 

Priority:
 
  Major 
 
 
 

Reporter:
 
 Jake Maes 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



samza git commit: SAMZA-1025: Documentation for HdfsSystemConsumer

2017-01-30 Thread jagadish
Repository: samza
Updated Branches:
  refs/heads/master db8b228d9 -> f01b28628


SAMZA-1025: Documentation for HdfsSystemConsumer


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f01b2862
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f01b2862
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f01b2862

Branch: refs/heads/master
Commit: f01b2862879b640456a46e0a6fe375cfe7e17b37
Parents: db8b228
Author: Hai Lu 
Authored: Mon Jan 30 10:58:15 2017 -0800
Committer: vjagadish1989 
Committed: Mon Jan 30 10:58:46 2017 -0800

--
 .../documentation/versioned/hdfs/consumer.md| 109 +++
 .../documentation/versioned/hdfs/producer.md|   2 +-
 docs/learn/documentation/versioned/index.html   |   1 +
 .../versioned/jobs/configuration-table.html |  40 +++
 4 files changed, 151 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/f01b2862/docs/learn/documentation/versioned/hdfs/consumer.md
--
diff --git a/docs/learn/documentation/versioned/hdfs/consumer.md 
b/docs/learn/documentation/versioned/hdfs/consumer.md
new file mode 100644
index 000..401b228
--- /dev/null
+++ b/docs/learn/documentation/versioned/hdfs/consumer.md
@@ -0,0 +1,109 @@
+---
+layout: page
+title: Reading from HDFS
+---
+
+
+You can configure your Samza job to read from HDFS files. The 
[HdfsSystemConsumer](javadocs/org/apache/samza/system/hdfs/HdfsSystemConsumer.html)
 can read from HDFS files. Avro encoded records are supported out of the box 
and it is easy to extend to support other formats (plain text, csv, json etc). 
See `Event format` section below.
+
+### Environment
+
+Your job needs to run on the same YARN cluster which hosts the HDFS you want 
to consume from.
+
+### Partitioning
+
+Partitioning works at the level of individual HDFS files. Each file is treated 
as a stream partition, while a directory that contains these files is a stream. 
For example, if you want to read from a HDFS path which contains 10 individual 
files, there will naturally be 10 partitions created. You can configure up to 
10 Samza containers to process these partitions. If you want to read from a 
single HDFS file, there is currently no way to break down the consumption - you 
can only have one container to process the file.
+
+### Event format
+
+[HdfsSystemConsumer](javadocs/org/apache/samza/system/hdfs/HdfsSystemConsumer.html)
 currently supports reading from avro files. The received 
[IncomingMessageEnvelope](javadocs/org/apache/samza/system/IncomingMessageEnvelope.html)
 contains three significant fields: 
+1. The key which is empty
+2. The message which is set to the avro 
[GenericRecord](https://avro.apache.org/docs/1.7.6/api/java/org/apache/avro/generic/GenericRecord.html)
+3. The stream partition which is set to the name of the HDFS file
+
+To extend the support beyond avro files (e.g. json, csv, etc.), you can 
implement the interface 
[SingleFileHdfsReader](javadocs/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.html)
 (take a look at the implementation of 
[AvroFileHdfsReader](javadocs/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader)
 as a sample).
+
+### End of stream support
+
+One major difference between HDFS data and Kafka data is that while a 
kafka topic has an unbounded stream of messages, HDFS files are bounded and 
have a notion of EOF.
+
+You can choose to implement 
[EndOfStreamListenerTask](javadocs/org/apache/samza/task/EndOfStreamListenerTask.html)
 to receive a callback when all partitions are at end of stream. When all 
partitions being processed by the task are at end of stream (i.e. EOF has been 
reached for all files), the Samza job exits automatically.
+
+### Basic Configuration
+
+Here is a few of the basic configs to set up HdfsSystemConsumer:
+
+```
+# The HDFS system consumer is implemented under the 
org.apache.samza.system.hdfs package,
+# so use HdfsSystemFactory as the system factory for your system
+systems.hdfs-clickstream.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
+
+# You need to specify the path of files you want to consume in task.inputs
+task.inputs=hdfs-clickstream.hdfs:/data/clickstream/2016/09/11
+
+# You can specify a white list of files you want your job to process (in Java 
Pattern style)
+systems.hdfs-clickstream.partitioner.defaultPartitioner.whitelist=.*avro
+
+# You can specify a black list of files you don't want your job to process (in 
Java Pattern style),
+# by default it's empty.
+# Note that you can have both white list and black list, in which case both 
will be applied.
+systems.hdfs-clickstream.partitioner.defaultPartitioner.blacklist=somefile.avro
+
+```
+
+### Security Configuration
+
+The following additional confi

[jira] (SAMZA-1025) Documentation page for HDFS System Consumer feature

2017-01-30 Thread Jagadish (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Jagadish commented on  SAMZA-1025 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Documentation page for HDFS System Consumer feature  
 
 
 
 
 
 
 
 
 
 
Thank you Hai for the patch! Committed.  
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (SAMZA-1025) Documentation page for HDFS System Consumer feature

2017-01-30 Thread Jagadish (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Jagadish resolved as Fixed 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Samza /  SAMZA-1025 
 
 
 
  Documentation page for HDFS System Consumer feature  
 
 
 
 
 
 
 
 
 

Change By:
 
 Jagadish 
 
 
 

Resolution:
 
 Fixed 
 
 
 

Fix Version/s:
 
 0.12.0 
 
 
 

Status:
 
 In Progress Resolved 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[2/3] samza git commit: Specification of various Window and Trigger APIs in Samza

2017-01-30 Thread jagadish
http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
--
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java 
b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
index 1a4ed8f..f7e1f36 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
@@ -16,85 +16,360 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.samza.operators.windows;
 
+import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.triggers.TimeTrigger;
+import org.apache.samza.operators.triggers.Trigger;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.internal.WindowInternal;
 
+import java.time.Duration;
 import java.util.Collection;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 
-
 /**
- * This class defines a collection of {@link Window} functions. The public 
classes and methods here are intended to be
- * used by the user (i.e. programmers) to create {@link Window} function 
directly.
+ * APIs for creating different types of {@link Window}s.
+ *
+ * Groups the incoming {@link MessageEnvelope}s in the {@link 
org.apache.samza.operators.MessageStream} into finite windows for processing.
+ *
+ *  Each window is uniquely identified by its {@link WindowKey}. A window 
can have one or more associated {@link Trigger}s
+ * that determine when results from the {@link Window} are emitted. Each 
emitted result contains one or more
+ * {@link MessageEnvelope}s in the window and is called a {@link WindowPane}.
+ *
+ *  A window can have early triggers that allow emitting {@link 
WindowPane}s speculatively before all data for the window
+ * has arrived or late triggers that allow handling of late data arrivals.
+ *
+ * window wk1
+ *  ++
+ *  ++---+
+ *  |   ||   |
+ *  | pane 1|pane2   |   pane3   |
+ *  +---++---+
+ *
+ ---
+ *incoming message stream --+
+ ---
+ *  window wk2
+ *  +-+-+
+ *  |   pane 1|   pane 2  |  pane 3 |
+ *  | |   | |
+ *  +-+---+-+
+ *
+ *  window wk3
+ *  +--+---+-+
+ *  |  |   | |
+ *  | pane 1   |  pane 2   |   pane 3|
+ *  |  |   | |
+ *  +--+---+-+
+ *
+ *
+ *  A {@link Window} can be one of the following types:
+ * 
+ *   
+ * Tumbling Windows: A tumbling window defines a series of 
non-overlapping, fixed size, contiguous intervals.
+ *   
+ * Session Windows: A session window groups a {@link 
org.apache.samza.operators.MessageStream} into sessions.
+ * A session captures some period of activity over a {@link 
org.apache.samza.operators.MessageStream}.
+ * The boundary for a session is defined by a {@code sessionGap}. All 
{@link MessageEnvelope}s that that arrive within
+ * the gap are grouped into the same session.
+ *   
+ * Global Windows: A global window defines a single infinite window over 
the entire {@link org.apache.samza.operators.MessageStream}.
+ * An early trigger must be specified when defining a global window.
+ * 
+ *
+ *  A {@link Window} is defined as "keyed" when the incoming {@link 
MessageEnvelope}s are first grouped based on their key
+ * and triggers are fired and window panes are emitted per-key. It is possible 
to construct "keyed" variants of all the above window
+ * types.
  *
  */
+@InterfaceStability.Unstable
 public final class Windows {
 
+  private Windows() { }
+
   /**
-   * private constructor to prevent instantiation
+   * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s 
into fixed-size, non-overlapping processing
+   * time based windows based on the provided keyFn and applies the provided 
fold function to them.
+   *
+   * The below example computes the maximu

[1/3] samza git commit: Specification of various Window and Trigger APIs in Samza

2017-01-30 Thread jagadish
Repository: samza
Updated Branches:
  refs/heads/master f01b28628 -> 6dc33a850


http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
--
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index 2ad6461..5991e2f 100644
--- 
a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -27,10 +27,6 @@ import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
-import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.windows.SessionWindow;
-import org.apache.samza.operators.windows.WindowFn;
-import org.apache.samza.operators.windows.WindowOutput;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.TaskCoordinator;
@@ -44,7 +40,6 @@ import java.util.Set;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -137,19 +132,6 @@ public class TestMessageStreamImpl {
   }
 
   @Test
-  public void testWindow() {
-MessageStreamImpl inputStream = new 
MessageStreamImpl<>();
-SessionWindow window = 
mock(SessionWindow.class);
-doReturn(mock(WindowFn.class)).when(window).getInternalWindowFn();
-MessageStream> outStream = 
inputStream.window(window);
-Collection subs = inputStream.getRegisteredOperatorSpecs();
-assertEquals(subs.size(), 1);
-OperatorSpec wndOp = subs.iterator().next();
-assertTrue(wndOp instanceof WindowOperatorSpec);
-assertEquals(((WindowOperatorSpec) wndOp).getOutputStream(), outStream);
-  }
-
-  @Test
   public void testJoin() {
 MessageStreamImpl source1 = new MessageStreamImpl<>();
 MessageStreamImpl source2 = new MessageStreamImpl<>();

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/test/java/org/apache/samza/operators/TestStateStoreImpl.java
--
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/TestStateStoreImpl.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/TestStateStoreImpl.java
deleted file mode 100644
index 8fa7ccc..000
--- 
a/samza-operator/src/test/java/org/apache/samza/operators/TestStateStoreImpl.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.operators.windows.StoreFunctions;
-import org.apache.samza.operators.windows.WindowState;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.task.TaskContext;
-import org.junit.Test;
-
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-
-public class TestStateStoreImpl {
-  @Test
-  public void testStateStoreImpl() {
-StoreFunctions 
mockStoreFunctions = mock(StoreFunctions.class);
-// test constructor
-StateStoreImpl storeImpl = new 
StateStoreImpl<>(mockStoreFunctions, "myStoreName");
-TaskContext mockContext = mock(TaskContext.class);
-KeyValueStore mockKvStore = mock(KeyValueStore.class);
-when(mockContext.getStore("myStoreName")).thenReturn(mockKvStore);
-// test init()
-storeImpl.init(mockContext);
-verify(mockContext, times(1)).getStore("myStoreName");
-Func

[3/3] samza git commit: Specification of various Window and Trigger APIs in Samza

2017-01-30 Thread jagadish
Specification of various Window and Trigger APIs in Samza

- Defined APIs for specifying different types of windows - sessions, tumbling, 
global and keyed variants.
- Defined APIs for specifying early and late triggers for a window.
- Standardized all above Window types to be expressed as a combination of 
default, early and late triggers.
- Defined classes for different types of trigger specifications.
- Hide the WindowState class from programmers and move it from samza-api to 
samza-operator. We can choose to add it later if need be.
- Removed some implementation classes in Window and Trigger. We can revisit 
them later when we implement Windows.
- New API for specifying Time durations meaningfully in Samza.
- Unit tests for most of the above changes.
- Misc. Documentation, readability related changes to public APIs.

Author: vjagadish1989 

Reviewers: Yi Pan , Prateek Maheshwari 


Closes #30 from vjagadish/samza-operator-v3


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6dc33a85
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6dc33a85
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6dc33a85

Branch: refs/heads/master
Commit: 6dc33a8504e3e2335cffe3317be1c0ccb5448458
Parents: f01b286
Author: vjagadish1989 
Authored: Mon Jan 30 13:50:16 2017 -0800
Committer: vjagadish1989 
Committed: Mon Jan 30 13:50:16 2017 -0800

--
 .../apache/samza/operators/MessageStream.java   |  46 ++-
 .../samza/operators/triggers/AnyTrigger.java|  39 ++
 .../samza/operators/triggers/CountTrigger.java  |  38 ++
 .../triggers/DurationCharacteristic.java|  26 ++
 .../operators/triggers/RepeatingTrigger.java|  34 ++
 .../triggers/TimeSinceFirstMessageTrigger.java  |  46 +++
 .../triggers/TimeSinceLastMessageTrigger.java   |  44 +++
 .../samza/operators/triggers/TimeTrigger.java   |  44 +++
 .../samza/operators/triggers/Trigger.java   |  34 ++
 .../samza/operators/triggers/Triggers.java  | 108 ++
 .../operators/windows/AccumulationMode.java |  34 ++
 .../samza/operators/windows/SessionWindow.java  | 102 -
 .../samza/operators/windows/StoreFunctions.java |  67 
 .../apache/samza/operators/windows/Trigger.java |  94 -
 .../samza/operators/windows/TriggerBuilder.java | 320 
 .../apache/samza/operators/windows/Window.java  |  90 -
 .../samza/operators/windows/WindowFn.java   |  59 ---
 .../samza/operators/windows/WindowKey.java  |  46 +++
 .../samza/operators/windows/WindowOutput.java   |  51 ---
 .../samza/operators/windows/WindowPane.java |  57 +++
 .../samza/operators/windows/WindowState.java|  85 -
 .../apache/samza/operators/windows/Windows.java | 369 ---
 .../windows/internal/WindowInternal.java| 110 ++
 .../samza/operators/windows/TestTrigger.java|  68 
 .../operators/windows/TestTriggerBuilder.java   | 226 
 .../operators/windows/TestWindowOutput.java |   5 +-
 .../samza/operators/windows/TestWindows.java| 109 --
 .../org/apache/samza/task/AsyncRunLoop.java |   4 +-
 .../samza/operators/MessageStreamImpl.java  |  26 +-
 .../apache/samza/operators/StateStoreImpl.java  |  56 ---
 .../samza/operators/impl/OperatorImpls.java |   5 +-
 .../impl/SessionWindowOperatorImpl.java |  67 
 .../operators/impl/WindowOperatorImpl.java  |  40 ++
 .../samza/operators/spec/OperatorSpecs.java |  31 +-
 .../operators/spec/PartialJoinOperatorSpec.java |  24 --
 .../operators/spec/WindowOperatorSpec.java  |  92 +
 .../samza/operators/spec/WindowState.java   |  85 +
 .../apache/samza/operators/BroadcastTask.java   |  35 +-
 .../samza/operators/TestMessageStreamImpl.java  |  18 -
 .../samza/operators/TestStateStoreImpl.java |  72 
 .../org/apache/samza/operators/WindowTask.java  |  17 +-
 .../samza/operators/impl/TestOperatorImpls.java |  11 +-
 .../operators/impl/TestSessionWindowImpl.java   | 111 --
 .../samza/operators/spec/TestOperatorSpecs.java |  50 +--
 44 files changed, 1294 insertions(+), 1801 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
--
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java 
b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index d18536b..6a2f95b 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -26,8 +26,7 @@ import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFun

samza git commit: SAMZA-1077: SamzaContainer should catch all Throwables instead of only exceptions

2017-01-30 Thread jagadish
Repository: samza
Updated Branches:
  refs/heads/master 6dc33a850 -> c6c10d31e


SAMZA-1077: SamzaContainer should catch all Throwables instead of only 
exceptions

Author: vjagadish1989 

Reviewers: Jake Maes 

Closes #30 from vjagadish1989/samza-1077


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c6c10d31
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c6c10d31
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c6c10d31

Branch: refs/heads/master
Commit: c6c10d31e37be3c9b07d202d17bcba6cb1213c3b
Parents: 6dc33a8
Author: vjagadish1989 
Authored: Mon Jan 30 14:41:57 2017 -0800
Committer: vjagadish1989 
Committed: Mon Jan 30 14:44:10 2017 -0800

--
 .../apache/samza/container/SamzaContainer.scala |  4 +-
 .../samza/container/TestSamzaContainer.scala| 59 
 2 files changed, 61 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/c6c10d31/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
--
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index f1d62c5..e49da57 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -668,8 +668,8 @@ class SamzaContainer(
   addShutdownHook
   runLoop.run
 } catch {
-  case e: Exception =>
-error("Caught exception in process loop.", e)
+  case e: Throwable =>
+error("Caught exception/error in process loop.", e)
 throw e
 } finally {
   info("Shutting down.")

http://git-wip-us.apache.org/repos/asf/samza/blob/c6c10d31/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
--
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 5895037..0d86833 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -238,6 +238,65 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
   }
 
   @Test
+  def testErrorInTaskInitShutsDownTask {
+val task = new StreamTask with InitableTask with ClosableTask {
+  var wasShutdown = false
+
+  def init(config: Config, context: TaskContext) {
+throw new NoSuchMethodError("Trigger a shutdown, please.")
+  }
+
+  def process(envelope: IncomingMessageEnvelope, collector: 
MessageCollector, coordinator: TaskCoordinator) {
+  }
+
+  def close {
+wasShutdown = true
+  }
+}
+val config = new MapConfig
+val taskName = new TaskName("taskName")
+val consumerMultiplexer = new SystemConsumers(
+  new RoundRobinChooser,
+  Map[String, SystemConsumer]())
+val producerMultiplexer = new SystemProducers(
+  Map[String, SystemProducer](),
+  new SerdeManager)
+val collector = new TaskInstanceCollector(producerMultiplexer)
+val containerContext = new SamzaContainerContext(0, config, 
Set[TaskName](taskName))
+val taskInstance: TaskInstance[StreamTask] = new TaskInstance[StreamTask](
+  task,
+  taskName,
+  config,
+  new TaskInstanceMetrics,
+  null,
+  consumerMultiplexer,
+  collector,
+  containerContext
+)
+val runLoop = new RunLoop(
+  taskInstances = Map(taskName -> taskInstance),
+  consumerMultiplexer = consumerMultiplexer,
+  metrics = new SamzaContainerMetrics,
+  maxThrottlingDelayMs = TimeUnit.SECONDS.toMillis(1))
+val container = new SamzaContainer(
+  containerContext = containerContext,
+  taskInstances = Map(taskName -> taskInstance),
+  runLoop = runLoop,
+  consumerMultiplexer = consumerMultiplexer,
+  producerMultiplexer = producerMultiplexer,
+  metrics = new SamzaContainerMetrics,
+  jmxServer = null
+)
+try {
+  container.run
+  fail("Expected error to be thrown in run method.")
+} catch {
+  case e: Throwable => // Expected
+}
+assertTrue(task.wasShutdown)
+  }
+
+  @Test
   def testStartStoresIncrementsCounter {
 val task = new StreamTask {
   def process(envelope: IncomingMessageEnvelope, collector: 
MessageCollector, coordinator: TaskCoordinator) {



[jira] (SAMZA-1077) SamzaContainer should catch all Throwables instead of only exceptions

2017-01-30 Thread ASF GitHub Bot (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 ASF GitHub Bot commented on  SAMZA-1077 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: SamzaContainer should catch all Throwables instead of only exceptions  
 
 
 
 
 
 
 
 
 
 
Github user vjagadish closed the pull request at: 
 https://github.com/apache/samza/pull/40 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



samza git commit: SAMZA-984; Upgraded RocksDB version to 5.0.1 and added configuration for managing RocksDB logging.

2017-01-30 Thread jagadish
Repository: samza
Updated Branches:
  refs/heads/master c6c10d31e -> 1c7e4d7aa


SAMZA-984; Upgraded RocksDB version to 5.0.1 and added configuration for 
managing RocksDB logging.

Author: Prateek Maheshwari 

Reviewers: Jake Maes , Jagadish 

Closes #46 from prateekm/rocksdb-upgrade


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1c7e4d7a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1c7e4d7a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1c7e4d7a

Branch: refs/heads/master
Commit: 1c7e4d7aaeb4b5036034c9182fa0259262bcda8e
Parents: c6c10d3
Author: Prateek Maheshwari 
Authored: Mon Jan 30 14:58:50 2017 -0800
Committer: vjagadish1989 
Committed: Mon Jan 30 14:58:50 2017 -0800

--
 .../versioned/jobs/configuration-table.html | 15 
 gradle/dependency-versions.gradle   |  2 +-
 .../samza/storage/kv/RocksDbOptionsHelper.java  | 83 +++-
 .../samza/storage/kv/RocksDbKeyValueStore.scala | 10 +--
 4 files changed, 69 insertions(+), 41 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/1c7e4d7a/docs/learn/documentation/versioned/jobs/configuration-table.html
--
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html 
b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 7bac935..a26bc43 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1422,6 +1422,21 @@
 
 
 
+
+stores.store-name.rocksdb.max.log.file.size.bytes
+67108864
+
+The maximum size in bytes of the RocksDB LOG file 
before it is rotated.
+
+
+
+
+stores.store-name.rocksdb.keep.log.file.num
+2
+
+The number of RocksDB LOG files (including rotated 
LOG.old.* files) to keep.
+
+
 
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/1c7e4d7a/gradle/dependency-versions.gradle
--
diff --git a/gradle/dependency-versions.gradle 
b/gradle/dependency-versions.gradle
index 976a49c..db59672 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -30,7 +30,7 @@
   metricsVersion = "2.2.0"
   kafkaVersion = "0.10.0.1"
   commonsHttpClientVersion = "3.1"
-  rocksdbVersion = "3.13.1"
+  rocksdbVersion = "5.0.1"
   yarnVersion = "2.6.1"
   slf4jVersion = "1.6.2"
   log4jVersion = "1.2.17"

http://git-wip-us.apache.org/repos/asf/samza/blob/1c7e4d7a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
--
diff --git 
a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
 
b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
index d4f765c..9b8f44b 100644
--- 
a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
+++ 
b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
@@ -34,6 +34,13 @@ import org.slf4j.LoggerFactory;
 public class RocksDbOptionsHelper {
   private static final Logger log = 
LoggerFactory.getLogger(RocksDbOptionsHelper.class);
 
+  private static final String ROCKSDB_COMPRESSION = "rocksdb.compression";
+  private static final String ROCKSDB_BLOCK_SIZE_BYTES = 
"rocksdb.block.size.bytes";
+  private static final String ROCKSDB_COMPACTION_STYLE = 
"rocksdb.compaction.style";
+  private static final String ROCKSDB_NUM_WRITE_BUFFERS = 
"rocksdb.num.write.buffers";
+  private static final String ROCKSDB_MAX_LOG_FILE_SIZE_BYTES = 
"rocksdb.max.log.file.size.bytes";
+  private static final String ROCKSDB_KEEP_LOG_FILE_NUM = 
"rocksdb.keep.log.file.num";
+
   public static Options options(Config storeConfig, SamzaContainerContext 
containerContext) {
 Options options = new Options();
 Long writeBufSize = 
storeConfig.getLong("container.write.buffer.size.bytes", 32 * 1024 * 1024);
@@ -42,59 +49,65 @@ public class RocksDbOptionsHelper {
 options.setWriteBufferSize((int) (writeBufSize / numTasks));
 
 CompressionType compressionType = CompressionType.SNAPPY_COMPRESSION;
-String compressionInConfig = storeConfig.get("rocksdb.compression", 
"snappy");
+String compressionInConfig = storeConfig.get(ROCKSDB_COMPRESSION, 
"snappy");
 switch (compressionInConfig) {
-case "snappy":
-  comp

[jira] (SAMZA-860) Expose RocksDB log options via Samza configuration

2017-01-30 Thread ASF GitHub Bot (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 ASF GitHub Bot commented on  SAMZA-860 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Expose RocksDB log options via Samza configuration  
 
 
 
 
 
 
 
 
 
 
Github user asfgit closed the pull request at: 
 https://github.com/apache/samza/pull/46 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (SAMZA-984) Upgrade to RocksDB to 4.8.0

2017-01-30 Thread Jagadish (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Jagadish commented on  SAMZA-984 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Upgrade to RocksDB to 4.8.0  
 
 
 
 
 
 
 
 
 
 
Thanks Prateek Maheshwari for the patch! Committed to master. 
Can you please update the JIRA that there are no regressions from the integration/perf tests? 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (SAMZA-984) Upgrade to RocksDB to 4.8.0

2017-01-30 Thread Jagadish (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Jagadish assigned an issue to Prateek Maheshwari 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Samza /  SAMZA-984 
 
 
 
  Upgrade to RocksDB to 4.8.0  
 
 
 
 
 
 
 
 
 

Change By:
 
 Jagadish 
 
 
 

Assignee:
 
 Prateek Maheshwari 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (SAMZA-1077) SamzaContainer should catch all Throwables instead of only exceptions

2017-01-30 Thread Jagadish (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Jagadish commented on  SAMZA-1077 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: SamzaContainer should catch all Throwables instead of only exceptions  
 
 
 
 
 
 
 
 
 
 
Committed to master. Thanks Jake Maes for the review. 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (SAMZA-1077) SamzaContainer should catch all Throwables instead of only exceptions

2017-01-30 Thread Jagadish (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Jagadish assigned an issue to Jagadish 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Samza /  SAMZA-1077 
 
 
 
  SamzaContainer should catch all Throwables instead of only exceptions  
 
 
 
 
 
 
 
 
 

Change By:
 
 Jagadish 
 
 
 

Assignee:
 
 Jagadish 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (SAMZA-984) Upgrade to RocksDB to 4.8.0

2017-01-30 Thread Prateek Maheshwari (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Prateek Maheshwari commented on  SAMZA-984 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Upgrade to RocksDB to 4.8.0  
 
 
 
 
 
 
 
 
 
 
Jagadish Did not notice any regressions in testing. 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



samza git commit: SAMZA-1079: Add timeouts for reads from HttpFileSystem. Add tests.

2017-01-30 Thread jagadish
Repository: samza
Updated Branches:
  refs/heads/master 1c7e4d7aa -> bf9bc2c37


SAMZA-1079: Add timeouts for reads from HttpFileSystem. Add tests.

* Wrote a unit/integration test to simulate a stuck connection when reading 
binaries for the job.
Other misc. changes:
- Moved some debug log messages to be info for better debugging.

Author: vjagadish1989 

Reviewers: jmakes,nickpan47

Closes #42 from vjagadish/http-fs


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/bf9bc2c3
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/bf9bc2c3
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/bf9bc2c3

Branch: refs/heads/master
Commit: bf9bc2c373f3456159fdc610f268992e8d9a476c
Parents: 1c7e4d7
Author: vjagadish1989 
Authored: Mon Jan 30 15:10:06 2017 -0800
Committer: vjagadish1989 
Committed: Mon Jan 30 15:10:06 2017 -0800

--
 .../samza/util/hadoop/HttpFileSystem.scala  |  15 +-
 .../yarn/util/hadoop/TestHttpFileSystem.java| 161 +++
 2 files changed, 173 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/bf9bc2c3/samza-yarn/src/main/scala/org/apache/samza/util/hadoop/HttpFileSystem.scala
--
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/util/hadoop/HttpFileSystem.scala 
b/samza-yarn/src/main/scala/org/apache/samza/util/hadoop/HttpFileSystem.scala
index 7dff90e..fa65a4e 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/util/hadoop/HttpFileSystem.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/util/hadoop/HttpFileSystem.scala
@@ -37,20 +37,29 @@ import org.apache.hadoop.util.Progressable
 import org.apache.samza.util.Logging
 
 class HttpFileSystem extends FileSystem with Logging {
-  val DEFAULT_BLOCK_SIZE = 4 * 1024;
+  val DEFAULT_BLOCK_SIZE = 4 * 1024
   var uri: URI = null
+  var connectionTimeoutMs = 5 * 60 * 1000
+  var socketReadTimeoutMs = 5 * 60 * 1000
+
+  def setConnectionTimeoutMs(timeout: Int): Unit = connectionTimeoutMs = 
timeout
+
+  def setSocketReadTimeoutMs(timeout: Int): Unit = socketReadTimeoutMs = 
timeout
 
   override def initialize(uri: URI, conf: Configuration) {
 super.initialize(uri, conf)
-debug("init uri %s" format (uri))
+info("init uri %s" format (uri))
 this.uri = uri
   }
 
   override def getUri = uri
 
   override def open(f: Path, bufferSize: Int): FSDataInputStream = {
-debug("open http file %s" format (f))
+info("open http file %s" format (f))
 val client = new HttpClient
+
client.getHttpConnectionManager.getParams.setConnectionTimeout(connectionTimeoutMs)
+client.getHttpConnectionManager.getParams.setSoTimeout(socketReadTimeoutMs)
+
 val method = new GetMethod(f.toUri.toString)
 val statusCode = client.executeMethod(method)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/bf9bc2c3/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/hadoop/TestHttpFileSystem.java
--
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/hadoop/TestHttpFileSystem.java
 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/hadoop/TestHttpFileSystem.java
new file mode 100644
index 000..6f42856
--- /dev/null
+++ 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/hadoop/TestHttpFileSystem.java
@@ -0,0 +1,161 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.job.yarn.util.hadoop;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.apache.samza.util.hadoop.HttpFileSystem;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+i

[jira] (SAMZA-1079) HttpFileSystem should timeout for blocking reads when localizing containers.

2017-01-30 Thread Jagadish (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Jagadish commented on  SAMZA-1079 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: HttpFileSystem should timeout for blocking reads when localizing containers.  
 
 
 
 
 
 
 
 
 
 
Thank you Yi Pan (Data Infrastructure) for the review. Submitted! 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (SAMZA-1079) HttpFileSystem should timeout for blocking reads when localizing containers.

2017-01-30 Thread ASF GitHub Bot (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 ASF GitHub Bot commented on  SAMZA-1079 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: HttpFileSystem should timeout for blocking reads when localizing containers.  
 
 
 
 
 
 
 
 
 
 
Github user asfgit closed the pull request at: 
 https://github.com/apache/samza/pull/42 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (SAMZA-1077) SamzaContainer should catch all Throwables instead of only exceptions

2017-01-30 Thread Jagadish (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Jagadish resolved as Fixed 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Samza /  SAMZA-1077 
 
 
 
  SamzaContainer should catch all Throwables instead of only exceptions  
 
 
 
 
 
 
 
 
 

Change By:
 
 Jagadish 
 
 
 

Resolution:
 
 Fixed 
 
 
 

Status:
 
 Open Resolved 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (SAMZA-1074) Cannot build hello-samza on various CDH version

2017-01-30 Thread ASF GitHub Bot (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 ASF GitHub Bot commented on  SAMZA-1074 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Cannot build hello-samza on various CDH version  
 
 
 
 
 
 
 
 
 
 
Github user asfgit closed the pull request at: 
 https://github.com/apache/samza/pull/38 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



samza git commit: SAMZA-1074; Fix builds for hello-samza on various CDH versions

2017-01-30 Thread jagadish
Repository: samza
Updated Branches:
  refs/heads/master bf9bc2c37 -> 154cda2c7


SAMZA-1074; Fix builds for hello-samza on various CDH versions

This issue is mainly related of hello-samza, but modification of document is 
needed to help user understand this process well.

Author: Dongkyu Hwangbo 

Reviewers: jvenkatr

Closes #38 from dkhwangbo/SAMZA-1074


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/154cda2c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/154cda2c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/154cda2c

Branch: refs/heads/master
Commit: 154cda2c7f7b6569557a188484e262882af3f4f2
Parents: bf9bc2c
Author: Dongkyu Hwangbo 
Authored: Mon Jan 30 15:56:56 2017 -0800
Committer: vjagadish1989 
Committed: Mon Jan 30 15:56:56 2017 -0800

--
 docs/learn/tutorials/versioned/deploy-samza-to-CDH.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/154cda2c/docs/learn/tutorials/versioned/deploy-samza-to-CDH.md
--
diff --git a/docs/learn/tutorials/versioned/deploy-samza-to-CDH.md 
b/docs/learn/tutorials/versioned/deploy-samza-to-CDH.md
index 8016aca..fff209f 100644
--- a/docs/learn/tutorials/versioned/deploy-samza-to-CDH.md
+++ b/docs/learn/tutorials/versioned/deploy-samza-to-CDH.md
@@ -26,7 +26,7 @@ The tutorial assumes you have successfully run 
[hello-samza](../../../startup/he
 We need to use a specific compile option to build hello-samza package for CDH 
5.4.0
 
 {% highlight bash %}
-mvn clean package -Denv=cdh5.4.0
+mvn clean package -Dhadoop.version=cdh5.4.0
 {% endhighlight %}
 
 ### Upload Package to Cluster
@@ -37,7 +37,7 @@ There are a few ways of uploading the package to the 
cluster's HDFS. If you do n
 hadoop fs -put path/to/hello-samza-0.11.0-dist.tar.gz /path/for/tgz
 {% endhighlight %}
 
-### Get Deloying Scripts
+### Get Deploying Scripts
 
 Untar the job package (assume you will run from the current directory)
 



[jira] (SAMZA-1080) Standalone Samza with No Coordination

2017-01-30 Thread Navina Ramesh (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Navina Ramesh updated an issue 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Samza /  SAMZA-1080 
 
 
 
  Standalone Samza with No Coordination  
 
 
 
 
 
 
 
 
 

Change By:
 
 Navina Ramesh 
 
 
 

Fix Version/s:
 
 0.13.0 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[1/2] samza git commit: SAMZA-1080 : Initial Standalone JobCoordinator and StreamProcessor API

2017-01-30 Thread navina
Repository: samza
Updated Branches:
  refs/heads/master 154cda2c7 -> 2a3a5ac7f


http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
--
diff --git 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
deleted file mode 100644
index df63b97..000
--- 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.coordinator
-
-
-import java.util
-import java.util.concurrent.atomic.AtomicReference
-
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.{Config, StorageConfig}
-import 
org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
-import org.apache.samza.container.grouper.task.{BalancingTaskNameGrouper, 
TaskNameGrouperFactory}
-import org.apache.samza.container.{LocalityManager, TaskName}
-import org.apache.samza.coordinator.server.{HttpServer, JobServlet}
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
-import org.apache.samza.job.model.{JobModel, TaskModel}
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.storage.ChangelogPartitionManager
-import org.apache.samza.system.{ExtendedSystemAdmin, StreamMetadataCache, 
SystemFactory, SystemStreamPartition, SystemStreamPartitionMatcher}
-import org.apache.samza.util.{Logging, Util}
-import org.apache.samza.{Partition, SamzaException}
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-
-/**
- * Helper companion object that is responsible for wiring up a JobCoordinator
- * given a Config object.
- */
-object JobModelManager extends Logging {
-
-  /**
-   * a volatile value to store the current instantiated 
JobCoordinator
-   */
-  @volatile var currentJobModelManager: JobModelManager = null
-  val jobModelRef: AtomicReference[JobModel] = new AtomicReference[JobModel]()
-
-  /**
-   * @param coordinatorSystemConfig A config object that contains job.name,
-   * job.id, and all system..*
-   * configuration. The method will use this config to read all configuration
-   * from the coordinator stream, and instantiate a JobCoordinator.
-   */
-  def apply(coordinatorSystemConfig: Config, metricsRegistryMap: 
MetricsRegistryMap): JobModelManager = {
-val coordinatorStreamSystemFactory: CoordinatorStreamSystemFactory = new 
CoordinatorStreamSystemFactory()
-val coordinatorSystemConsumer = 
coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig,
 metricsRegistryMap)
-val coordinatorSystemProducer = 
coordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(coordinatorSystemConfig,
 metricsRegistryMap)
-info("Registering coordinator system stream.")
-coordinatorSystemConsumer.register
-debug("Starting coordinator system stream.")
-coordinatorSystemConsumer.start
-debug("Bootstrapping coordinator system stream.")
-coordinatorSystemConsumer.bootstrap
-val source = "Job-coordinator"
-coordinatorSystemProducer.register(source)
-info("Registering coordinator system stream producer.")
-val config = coordinatorSystemConsumer.getConfig
-info("Got config: %s" format config)
-val changelogManager = new 
ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, 
source)
-val localityManager = new LocalityManager(coordinatorSystemProducer, 
coordinatorSystemConsumer)
-
-val systemNames = getSystemNames(config)
-
-// Map the name of each system to the corresponding SystemAdmin
-val systemAdmins = systemNames.map(systemName => {
-  val systemFactoryClassName = config
-.getSystemFactory(systemName)
-.getOrElse(throw new SamzaException("A stream uses system %s, which is 
missing from the configurati

[2/2] samza git commit: SAMZA-1080 : Initial Standalone JobCoordinator and StreamProcessor API

2017-01-30 Thread navina
SAMZA-1080 : Initial Standalone JobCoordinator and StreamProcessor API

This patch contains changes associated with the Standalone StreamProcessor, 
where there is no coordination. This will work for load-balanced consumers like 
new Kafka consumer and statically partitioned cases.

Additionally, we have introduced TaskFactory for StreamTask and AsyncStreamTask.

Author: navina 

Reviewers: xinyuiscool,fredji97

Closes #44 from navina/Noop-JC


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2a3a5ac7
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2a3a5ac7
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2a3a5ac7

Branch: refs/heads/master
Commit: 2a3a5ac7f21dc4213ff6ec96e11a798bff096d04
Parents: 154cda2
Author: navina 
Authored: Mon Jan 30 18:30:02 2017 -0800
Committer: navina 
Committed: Mon Jan 30 18:30:02 2017 -0800

--
 NOTICE  |   1 +
 .../samza/task/AsyncStreamTaskFactory.java  |  28 ++
 .../apache/samza/task/StreamTaskFactory.java|  31 ++
 .../samza/config/JobCoordinatorConfig.java  |  40 ++
 .../org/apache/samza/config/TaskConfigJava.java |  25 ++
 .../apache/samza/container/RunLoopFactory.java  |  20 +-
 .../AllSspToSingleTaskGrouperFactory.java   |  69 
 .../task/SingleContainerGrouperFactory.java |  56 +++
 .../samza/coordinator/JobCoordinator.java   |  72 
 .../coordinator/JobCoordinatorFactory.java  |  35 ++
 .../processor/SamzaContainerController.java | 152 
 .../apache/samza/processor/StreamProcessor.java | 173 +
 .../standalone/StandaloneJobCoordinator.java| 148 
 .../StandaloneJobCoordinatorFactory.java|  31 ++
 .../org/apache/samza/task/AsyncRunLoop.java |  37 +-
 .../org/apache/samza/config/JobConfig.scala |   3 +
 .../org/apache/samza/container/RunLoop.scala|  18 +-
 .../apache/samza/container/SamzaContainer.scala | 178 ++---
 .../apache/samza/container/TaskInstance.scala   |  21 +-
 .../samza/coordinator/JobCoordinator.scala  | 360 --
 .../samza/coordinator/JobModelManager.scala | 362 +++
 .../samza/job/local/ThreadJobFactory.scala  |  23 +-
 .../org/apache/samza/task/TestAsyncRunLoop.java |  20 +-
 .../apache/samza/container/TestRunLoop.scala|  15 +-
 .../samza/container/TestSamzaContainer.scala|   6 +-
 .../samza/container/TestTaskInstance.scala  |   6 +-
 .../test/StandaloneIntegrationTestHarness.java  |  84 +
 .../apache/samza/test/StandaloneTestUtils.java  | 111 ++
 .../test/processor/IdentityStreamTask.java  |  72 
 .../test/processor/TestStreamProcessor.java | 226 
 .../AbstractIntegrationTestHarness.scala|  60 +++
 .../AbstractKafkaServerTestHarness.scala| 123 +++
 .../harness/AbstractZookeeperTestHarness.scala  |  72 
 33 files changed, 2185 insertions(+), 493 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/NOTICE
--
diff --git a/NOTICE b/NOTICE
index 3352dda..2ee5fdc 100644
--- a/NOTICE
+++ b/NOTICE
@@ -3,3 +3,4 @@ Copyright 2014 The Apache Software Foundation
 
 This product includes software developed at The Apache Software
 Foundation (http://www.apache.org/).
+

http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java
--
diff --git 
a/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java 
b/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java
new file mode 100644
index 000..e5ce9c4
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+/**
+ * Build {@link AsyncStreamTask} instances.
+ * Implementations should return a new instance 

[jira] (SAMZA-1080) Standalone Samza with No Coordination

2017-01-30 Thread ASF GitHub Bot (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 ASF GitHub Bot commented on  SAMZA-1080 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Standalone Samza with No Coordination  
 
 
 
 
 
 
 
 
 
 
Github user asfgit closed the pull request at: 
 https://github.com/apache/samza/pull/44 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (SAMZA-1080) Standalone Samza with No Coordination

2017-01-30 Thread Navina Ramesh (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Navina Ramesh resolved as Fixed 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
Issue resolved by pull request 44 https://github.com/apache/samza/pull/44 
 
 
 
 
 
 
 
 
 
 Samza /  SAMZA-1080 
 
 
 
  Standalone Samza with No Coordination  
 
 
 
 
 
 
 
 
 

Change By:
 
 Navina Ramesh 
 
 
 

Resolution:
 
 Fixed 
 
 
 

Status:
 
 In Progress Resolved 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)