[jira] (SAMZA-1081) Update all existing SystemAdmins to implement the new createStream() method
Title: Message Title Jake Maes created an issue Samza / SAMZA-1081 Update all existing SystemAdmins to implement the new createStream() method Issue Type: Sub-task Assignee: Jake Maes Created: 30/Jan/17 18:03 Priority: Major Reporter: Jake Maes Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
samza git commit: SAMZA-1025: Documentation for HdfsSystemConsumer
Repository: samza Updated Branches: refs/heads/master db8b228d9 -> f01b28628 SAMZA-1025: Documentation for HdfsSystemConsumer Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f01b2862 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f01b2862 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f01b2862 Branch: refs/heads/master Commit: f01b2862879b640456a46e0a6fe375cfe7e17b37 Parents: db8b228 Author: Hai Lu Authored: Mon Jan 30 10:58:15 2017 -0800 Committer: vjagadish1989 Committed: Mon Jan 30 10:58:46 2017 -0800 -- .../documentation/versioned/hdfs/consumer.md| 109 +++ .../documentation/versioned/hdfs/producer.md| 2 +- docs/learn/documentation/versioned/index.html | 1 + .../versioned/jobs/configuration-table.html | 40 +++ 4 files changed, 151 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/samza/blob/f01b2862/docs/learn/documentation/versioned/hdfs/consumer.md -- diff --git a/docs/learn/documentation/versioned/hdfs/consumer.md b/docs/learn/documentation/versioned/hdfs/consumer.md new file mode 100644 index 000..401b228 --- /dev/null +++ b/docs/learn/documentation/versioned/hdfs/consumer.md @@ -0,0 +1,109 @@ +--- +layout: page +title: Reading from HDFS +--- + + +You can configure your Samza job to read from HDFS files. The [HdfsSystemConsumer](javadocs/org/apache/samza/system/hdfs/HdfsSystemConsumer.html) can read from HDFS files. Avro encoded records are supported out of the box and it is easy to extend to support other formats (plain text, csv, json etc). See `Event format` section below. + +### Environment + +Your job needs to run on the same YARN cluster which hosts the HDFS you want to consume from. + +### Partitioning + +Partitioning works at the level of individual HDFS files. Each file is treated as a stream partition, while a directory that contains these files is a stream. For example, if you want to read from a HDFS path which contains 10 individual files, there will naturally be 10 partitions created. You can configure up to 10 Samza containers to process these partitions. If you want to read from a single HDFS file, there is currently no way to break down the consumption - you can only have one container to process the file. + +### Event format + +[HdfsSystemConsumer](javadocs/org/apache/samza/system/hdfs/HdfsSystemConsumer.html) currently supports reading from avro files. The received [IncomingMessageEnvelope](javadocs/org/apache/samza/system/IncomingMessageEnvelope.html) contains three significant fields: +1. The key which is empty +2. The message which is set to the avro [GenericRecord](https://avro.apache.org/docs/1.7.6/api/java/org/apache/avro/generic/GenericRecord.html) +3. The stream partition which is set to the name of the HDFS file + +To extend the support beyond avro files (e.g. json, csv, etc.), you can implement the interface [SingleFileHdfsReader](javadocs/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.html) (take a look at the implementation of [AvroFileHdfsReader](javadocs/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader) as a sample). + +### End of stream support + +One major difference between HDFS data and Kafka data is that while a kafka topic has an unbounded stream of messages, HDFS files are bounded and have a notion of EOF. + +You can choose to implement [EndOfStreamListenerTask](javadocs/org/apache/samza/task/EndOfStreamListenerTask.html) to receive a callback when all partitions are at end of stream. When all partitions being processed by the task are at end of stream (i.e. EOF has been reached for all files), the Samza job exits automatically. + +### Basic Configuration + +Here is a few of the basic configs to set up HdfsSystemConsumer: + +``` +# The HDFS system consumer is implemented under the org.apache.samza.system.hdfs package, +# so use HdfsSystemFactory as the system factory for your system +systems.hdfs-clickstream.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory + +# You need to specify the path of files you want to consume in task.inputs +task.inputs=hdfs-clickstream.hdfs:/data/clickstream/2016/09/11 + +# You can specify a white list of files you want your job to process (in Java Pattern style) +systems.hdfs-clickstream.partitioner.defaultPartitioner.whitelist=.*avro + +# You can specify a black list of files you don't want your job to process (in Java Pattern style), +# by default it's empty. +# Note that you can have both white list and black list, in which case both will be applied. +systems.hdfs-clickstream.partitioner.defaultPartitioner.blacklist=somefile.avro + +``` + +### Security Configuration + +The following additional confi
[jira] (SAMZA-1025) Documentation page for HDFS System Consumer feature
Title: Message Title Jagadish commented on SAMZA-1025 Re: Documentation page for HDFS System Consumer feature Thank you Hai for the patch! Committed. Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (SAMZA-1025) Documentation page for HDFS System Consumer feature
Title: Message Title Jagadish resolved as Fixed Samza / SAMZA-1025 Documentation page for HDFS System Consumer feature Change By: Jagadish Resolution: Fixed Fix Version/s: 0.12.0 Status: In Progress Resolved Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[2/3] samza git commit: Specification of various Window and Trigger APIs in Samza
http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java -- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java index 1a4ed8f..f7e1f36 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java @@ -16,85 +16,360 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.samza.operators.windows; +import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.triggers.TimeTrigger; +import org.apache.samza.operators.triggers.Trigger; +import org.apache.samza.operators.triggers.Triggers; +import org.apache.samza.operators.windows.internal.WindowInternal; +import java.time.Duration; import java.util.Collection; +import java.util.function.BiFunction; import java.util.function.Function; - /** - * This class defines a collection of {@link Window} functions. The public classes and methods here are intended to be - * used by the user (i.e. programmers) to create {@link Window} function directly. + * APIs for creating different types of {@link Window}s. + * + * Groups the incoming {@link MessageEnvelope}s in the {@link org.apache.samza.operators.MessageStream} into finite windows for processing. + * + * Each window is uniquely identified by its {@link WindowKey}. A window can have one or more associated {@link Trigger}s + * that determine when results from the {@link Window} are emitted. Each emitted result contains one or more + * {@link MessageEnvelope}s in the window and is called a {@link WindowPane}. + * + * A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data for the window + * has arrived or late triggers that allow handling of late data arrivals. + * + * window wk1 + * ++ + * ++---+ + * | || | + * | pane 1|pane2 | pane3 | + * +---++---+ + * + --- + *incoming message stream --+ + --- + * window wk2 + * +-+-+ + * | pane 1| pane 2 | pane 3 | + * | | | | + * +-+---+-+ + * + * window wk3 + * +--+---+-+ + * | | | | + * | pane 1 | pane 2 | pane 3| + * | | | | + * +--+---+-+ + * + * + * A {@link Window} can be one of the following types: + * + * + * Tumbling Windows: A tumbling window defines a series of non-overlapping, fixed size, contiguous intervals. + * + * Session Windows: A session window groups a {@link org.apache.samza.operators.MessageStream} into sessions. + * A session captures some period of activity over a {@link org.apache.samza.operators.MessageStream}. + * The boundary for a session is defined by a {@code sessionGap}. All {@link MessageEnvelope}s that that arrive within + * the gap are grouped into the same session. + * + * Global Windows: A global window defines a single infinite window over the entire {@link org.apache.samza.operators.MessageStream}. + * An early trigger must be specified when defining a global window. + * + * + * A {@link Window} is defined as "keyed" when the incoming {@link MessageEnvelope}s are first grouped based on their key + * and triggers are fired and window panes are emitted per-key. It is possible to construct "keyed" variants of all the above window + * types. * */ +@InterfaceStability.Unstable public final class Windows { + private Windows() { } + /** - * private constructor to prevent instantiation + * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into fixed-size, non-overlapping processing + * time based windows based on the provided keyFn and applies the provided fold function to them. + * + * The below example computes the maximu
[1/3] samza git commit: Specification of various Window and Trigger APIs in Samza
Repository: samza Updated Branches: refs/heads/master f01b28628 -> 6dc33a850 http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java -- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java index 2ad6461..5991e2f 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java +++ b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java @@ -27,10 +27,6 @@ import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.PartialJoinOperatorSpec; import org.apache.samza.operators.spec.SinkOperatorSpec; import org.apache.samza.operators.spec.StreamOperatorSpec; -import org.apache.samza.operators.spec.WindowOperatorSpec; -import org.apache.samza.operators.windows.SessionWindow; -import org.apache.samza.operators.windows.WindowFn; -import org.apache.samza.operators.windows.WindowOutput; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemStream; import org.apache.samza.task.TaskCoordinator; @@ -44,7 +40,6 @@ import java.util.Set; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -137,19 +132,6 @@ public class TestMessageStreamImpl { } @Test - public void testWindow() { -MessageStreamImpl inputStream = new MessageStreamImpl<>(); -SessionWindow window = mock(SessionWindow.class); -doReturn(mock(WindowFn.class)).when(window).getInternalWindowFn(); -MessageStream> outStream = inputStream.window(window); -Collection subs = inputStream.getRegisteredOperatorSpecs(); -assertEquals(subs.size(), 1); -OperatorSpec wndOp = subs.iterator().next(); -assertTrue(wndOp instanceof WindowOperatorSpec); -assertEquals(((WindowOperatorSpec) wndOp).getOutputStream(), outStream); - } - - @Test public void testJoin() { MessageStreamImpl source1 = new MessageStreamImpl<>(); MessageStreamImpl source2 = new MessageStreamImpl<>(); http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/test/java/org/apache/samza/operators/TestStateStoreImpl.java -- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestStateStoreImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/TestStateStoreImpl.java deleted file mode 100644 index 8fa7ccc..000 --- a/samza-operator/src/test/java/org/apache/samza/operators/TestStateStoreImpl.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import org.apache.samza.operators.windows.StoreFunctions; -import org.apache.samza.operators.windows.WindowState; -import org.apache.samza.storage.kv.Entry; -import org.apache.samza.storage.kv.KeyValueStore; -import org.apache.samza.task.TaskContext; -import org.junit.Test; - -import java.util.function.BiFunction; -import java.util.function.Function; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - - -public class TestStateStoreImpl { - @Test - public void testStateStoreImpl() { -StoreFunctions mockStoreFunctions = mock(StoreFunctions.class); -// test constructor -StateStoreImpl storeImpl = new StateStoreImpl<>(mockStoreFunctions, "myStoreName"); -TaskContext mockContext = mock(TaskContext.class); -KeyValueStore mockKvStore = mock(KeyValueStore.class); -when(mockContext.getStore("myStoreName")).thenReturn(mockKvStore); -// test init() -storeImpl.init(mockContext); -verify(mockContext, times(1)).getStore("myStoreName"); -Func
[3/3] samza git commit: Specification of various Window and Trigger APIs in Samza
Specification of various Window and Trigger APIs in Samza - Defined APIs for specifying different types of windows - sessions, tumbling, global and keyed variants. - Defined APIs for specifying early and late triggers for a window. - Standardized all above Window types to be expressed as a combination of default, early and late triggers. - Defined classes for different types of trigger specifications. - Hide the WindowState class from programmers and move it from samza-api to samza-operator. We can choose to add it later if need be. - Removed some implementation classes in Window and Trigger. We can revisit them later when we implement Windows. - New API for specifying Time durations meaningfully in Samza. - Unit tests for most of the above changes. - Misc. Documentation, readability related changes to public APIs. Author: vjagadish1989 Reviewers: Yi Pan , Prateek Maheshwari Closes #30 from vjagadish/samza-operator-v3 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6dc33a85 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6dc33a85 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6dc33a85 Branch: refs/heads/master Commit: 6dc33a8504e3e2335cffe3317be1c0ccb5448458 Parents: f01b286 Author: vjagadish1989 Authored: Mon Jan 30 13:50:16 2017 -0800 Committer: vjagadish1989 Committed: Mon Jan 30 13:50:16 2017 -0800 -- .../apache/samza/operators/MessageStream.java | 46 ++- .../samza/operators/triggers/AnyTrigger.java| 39 ++ .../samza/operators/triggers/CountTrigger.java | 38 ++ .../triggers/DurationCharacteristic.java| 26 ++ .../operators/triggers/RepeatingTrigger.java| 34 ++ .../triggers/TimeSinceFirstMessageTrigger.java | 46 +++ .../triggers/TimeSinceLastMessageTrigger.java | 44 +++ .../samza/operators/triggers/TimeTrigger.java | 44 +++ .../samza/operators/triggers/Trigger.java | 34 ++ .../samza/operators/triggers/Triggers.java | 108 ++ .../operators/windows/AccumulationMode.java | 34 ++ .../samza/operators/windows/SessionWindow.java | 102 - .../samza/operators/windows/StoreFunctions.java | 67 .../apache/samza/operators/windows/Trigger.java | 94 - .../samza/operators/windows/TriggerBuilder.java | 320 .../apache/samza/operators/windows/Window.java | 90 - .../samza/operators/windows/WindowFn.java | 59 --- .../samza/operators/windows/WindowKey.java | 46 +++ .../samza/operators/windows/WindowOutput.java | 51 --- .../samza/operators/windows/WindowPane.java | 57 +++ .../samza/operators/windows/WindowState.java| 85 - .../apache/samza/operators/windows/Windows.java | 369 --- .../windows/internal/WindowInternal.java| 110 ++ .../samza/operators/windows/TestTrigger.java| 68 .../operators/windows/TestTriggerBuilder.java | 226 .../operators/windows/TestWindowOutput.java | 5 +- .../samza/operators/windows/TestWindows.java| 109 -- .../org/apache/samza/task/AsyncRunLoop.java | 4 +- .../samza/operators/MessageStreamImpl.java | 26 +- .../apache/samza/operators/StateStoreImpl.java | 56 --- .../samza/operators/impl/OperatorImpls.java | 5 +- .../impl/SessionWindowOperatorImpl.java | 67 .../operators/impl/WindowOperatorImpl.java | 40 ++ .../samza/operators/spec/OperatorSpecs.java | 31 +- .../operators/spec/PartialJoinOperatorSpec.java | 24 -- .../operators/spec/WindowOperatorSpec.java | 92 + .../samza/operators/spec/WindowState.java | 85 + .../apache/samza/operators/BroadcastTask.java | 35 +- .../samza/operators/TestMessageStreamImpl.java | 18 - .../samza/operators/TestStateStoreImpl.java | 72 .../org/apache/samza/operators/WindowTask.java | 17 +- .../samza/operators/impl/TestOperatorImpls.java | 11 +- .../operators/impl/TestSessionWindowImpl.java | 111 -- .../samza/operators/spec/TestOperatorSpecs.java | 50 +-- 44 files changed, 1294 insertions(+), 1801 deletions(-) -- http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java -- diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java index d18536b..6a2f95b 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java +++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java @@ -26,8 +26,7 @@ import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.functions.SinkFun
samza git commit: SAMZA-1077: SamzaContainer should catch all Throwables instead of only exceptions
Repository: samza Updated Branches: refs/heads/master 6dc33a850 -> c6c10d31e SAMZA-1077: SamzaContainer should catch all Throwables instead of only exceptions Author: vjagadish1989 Reviewers: Jake Maes Closes #30 from vjagadish1989/samza-1077 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c6c10d31 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c6c10d31 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c6c10d31 Branch: refs/heads/master Commit: c6c10d31e37be3c9b07d202d17bcba6cb1213c3b Parents: 6dc33a8 Author: vjagadish1989 Authored: Mon Jan 30 14:41:57 2017 -0800 Committer: vjagadish1989 Committed: Mon Jan 30 14:44:10 2017 -0800 -- .../apache/samza/container/SamzaContainer.scala | 4 +- .../samza/container/TestSamzaContainer.scala| 59 2 files changed, 61 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/samza/blob/c6c10d31/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala -- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index f1d62c5..e49da57 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -668,8 +668,8 @@ class SamzaContainer( addShutdownHook runLoop.run } catch { - case e: Exception => -error("Caught exception in process loop.", e) + case e: Throwable => +error("Caught exception/error in process loop.", e) throw e } finally { info("Shutting down.") http://git-wip-us.apache.org/repos/asf/samza/blob/c6c10d31/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala -- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index 5895037..0d86833 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -238,6 +238,65 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { } @Test + def testErrorInTaskInitShutsDownTask { +val task = new StreamTask with InitableTask with ClosableTask { + var wasShutdown = false + + def init(config: Config, context: TaskContext) { +throw new NoSuchMethodError("Trigger a shutdown, please.") + } + + def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { + } + + def close { +wasShutdown = true + } +} +val config = new MapConfig +val taskName = new TaskName("taskName") +val consumerMultiplexer = new SystemConsumers( + new RoundRobinChooser, + Map[String, SystemConsumer]()) +val producerMultiplexer = new SystemProducers( + Map[String, SystemProducer](), + new SerdeManager) +val collector = new TaskInstanceCollector(producerMultiplexer) +val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName)) +val taskInstance: TaskInstance[StreamTask] = new TaskInstance[StreamTask]( + task, + taskName, + config, + new TaskInstanceMetrics, + null, + consumerMultiplexer, + collector, + containerContext +) +val runLoop = new RunLoop( + taskInstances = Map(taskName -> taskInstance), + consumerMultiplexer = consumerMultiplexer, + metrics = new SamzaContainerMetrics, + maxThrottlingDelayMs = TimeUnit.SECONDS.toMillis(1)) +val container = new SamzaContainer( + containerContext = containerContext, + taskInstances = Map(taskName -> taskInstance), + runLoop = runLoop, + consumerMultiplexer = consumerMultiplexer, + producerMultiplexer = producerMultiplexer, + metrics = new SamzaContainerMetrics, + jmxServer = null +) +try { + container.run + fail("Expected error to be thrown in run method.") +} catch { + case e: Throwable => // Expected +} +assertTrue(task.wasShutdown) + } + + @Test def testStartStoresIncrementsCounter { val task = new StreamTask { def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
[jira] (SAMZA-1077) SamzaContainer should catch all Throwables instead of only exceptions
Title: Message Title ASF GitHub Bot commented on SAMZA-1077 Re: SamzaContainer should catch all Throwables instead of only exceptions Github user vjagadish closed the pull request at: https://github.com/apache/samza/pull/40 Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
samza git commit: SAMZA-984; Upgraded RocksDB version to 5.0.1 and added configuration for managing RocksDB logging.
Repository: samza Updated Branches: refs/heads/master c6c10d31e -> 1c7e4d7aa SAMZA-984; Upgraded RocksDB version to 5.0.1 and added configuration for managing RocksDB logging. Author: Prateek Maheshwari Reviewers: Jake Maes , Jagadish Closes #46 from prateekm/rocksdb-upgrade Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1c7e4d7a Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1c7e4d7a Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1c7e4d7a Branch: refs/heads/master Commit: 1c7e4d7aaeb4b5036034c9182fa0259262bcda8e Parents: c6c10d3 Author: Prateek Maheshwari Authored: Mon Jan 30 14:58:50 2017 -0800 Committer: vjagadish1989 Committed: Mon Jan 30 14:58:50 2017 -0800 -- .../versioned/jobs/configuration-table.html | 15 gradle/dependency-versions.gradle | 2 +- .../samza/storage/kv/RocksDbOptionsHelper.java | 83 +++- .../samza/storage/kv/RocksDbKeyValueStore.scala | 10 +-- 4 files changed, 69 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/samza/blob/1c7e4d7a/docs/learn/documentation/versioned/jobs/configuration-table.html -- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 7bac935..a26bc43 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -1422,6 +1422,21 @@ + +stores.store-name.rocksdb.max.log.file.size.bytes +67108864 + +The maximum size in bytes of the RocksDB LOG file before it is rotated. + + + + +stores.store-name.rocksdb.keep.log.file.num +2 + +The number of RocksDB LOG files (including rotated LOG.old.* files) to keep. + + http://git-wip-us.apache.org/repos/asf/samza/blob/1c7e4d7a/gradle/dependency-versions.gradle -- diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index 976a49c..db59672 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -30,7 +30,7 @@ metricsVersion = "2.2.0" kafkaVersion = "0.10.0.1" commonsHttpClientVersion = "3.1" - rocksdbVersion = "3.13.1" + rocksdbVersion = "5.0.1" yarnVersion = "2.6.1" slf4jVersion = "1.6.2" log4jVersion = "1.2.17" http://git-wip-us.apache.org/repos/asf/samza/blob/1c7e4d7a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java -- diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java index d4f765c..9b8f44b 100644 --- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java +++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java @@ -34,6 +34,13 @@ import org.slf4j.LoggerFactory; public class RocksDbOptionsHelper { private static final Logger log = LoggerFactory.getLogger(RocksDbOptionsHelper.class); + private static final String ROCKSDB_COMPRESSION = "rocksdb.compression"; + private static final String ROCKSDB_BLOCK_SIZE_BYTES = "rocksdb.block.size.bytes"; + private static final String ROCKSDB_COMPACTION_STYLE = "rocksdb.compaction.style"; + private static final String ROCKSDB_NUM_WRITE_BUFFERS = "rocksdb.num.write.buffers"; + private static final String ROCKSDB_MAX_LOG_FILE_SIZE_BYTES = "rocksdb.max.log.file.size.bytes"; + private static final String ROCKSDB_KEEP_LOG_FILE_NUM = "rocksdb.keep.log.file.num"; + public static Options options(Config storeConfig, SamzaContainerContext containerContext) { Options options = new Options(); Long writeBufSize = storeConfig.getLong("container.write.buffer.size.bytes", 32 * 1024 * 1024); @@ -42,59 +49,65 @@ public class RocksDbOptionsHelper { options.setWriteBufferSize((int) (writeBufSize / numTasks)); CompressionType compressionType = CompressionType.SNAPPY_COMPRESSION; -String compressionInConfig = storeConfig.get("rocksdb.compression", "snappy"); +String compressionInConfig = storeConfig.get(ROCKSDB_COMPRESSION, "snappy"); switch (compressionInConfig) { -case "snappy": - comp
[jira] (SAMZA-860) Expose RocksDB log options via Samza configuration
Title: Message Title ASF GitHub Bot commented on SAMZA-860 Re: Expose RocksDB log options via Samza configuration Github user asfgit closed the pull request at: https://github.com/apache/samza/pull/46 Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (SAMZA-984) Upgrade to RocksDB to 4.8.0
Title: Message Title Jagadish commented on SAMZA-984 Re: Upgrade to RocksDB to 4.8.0 Thanks Prateek Maheshwari for the patch! Committed to master. Can you please update the JIRA that there are no regressions from the integration/perf tests? Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (SAMZA-984) Upgrade to RocksDB to 4.8.0
Title: Message Title Jagadish assigned an issue to Prateek Maheshwari Samza / SAMZA-984 Upgrade to RocksDB to 4.8.0 Change By: Jagadish Assignee: Prateek Maheshwari Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (SAMZA-1077) SamzaContainer should catch all Throwables instead of only exceptions
Title: Message Title Jagadish commented on SAMZA-1077 Re: SamzaContainer should catch all Throwables instead of only exceptions Committed to master. Thanks Jake Maes for the review. Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (SAMZA-1077) SamzaContainer should catch all Throwables instead of only exceptions
Title: Message Title Jagadish assigned an issue to Jagadish Samza / SAMZA-1077 SamzaContainer should catch all Throwables instead of only exceptions Change By: Jagadish Assignee: Jagadish Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (SAMZA-984) Upgrade to RocksDB to 4.8.0
Title: Message Title Prateek Maheshwari commented on SAMZA-984 Re: Upgrade to RocksDB to 4.8.0 Jagadish Did not notice any regressions in testing. Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
samza git commit: SAMZA-1079: Add timeouts for reads from HttpFileSystem. Add tests.
Repository: samza Updated Branches: refs/heads/master 1c7e4d7aa -> bf9bc2c37 SAMZA-1079: Add timeouts for reads from HttpFileSystem. Add tests. * Wrote a unit/integration test to simulate a stuck connection when reading binaries for the job. Other misc. changes: - Moved some debug log messages to be info for better debugging. Author: vjagadish1989 Reviewers: jmakes,nickpan47 Closes #42 from vjagadish/http-fs Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/bf9bc2c3 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/bf9bc2c3 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/bf9bc2c3 Branch: refs/heads/master Commit: bf9bc2c373f3456159fdc610f268992e8d9a476c Parents: 1c7e4d7 Author: vjagadish1989 Authored: Mon Jan 30 15:10:06 2017 -0800 Committer: vjagadish1989 Committed: Mon Jan 30 15:10:06 2017 -0800 -- .../samza/util/hadoop/HttpFileSystem.scala | 15 +- .../yarn/util/hadoop/TestHttpFileSystem.java| 161 +++ 2 files changed, 173 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/samza/blob/bf9bc2c3/samza-yarn/src/main/scala/org/apache/samza/util/hadoop/HttpFileSystem.scala -- diff --git a/samza-yarn/src/main/scala/org/apache/samza/util/hadoop/HttpFileSystem.scala b/samza-yarn/src/main/scala/org/apache/samza/util/hadoop/HttpFileSystem.scala index 7dff90e..fa65a4e 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/util/hadoop/HttpFileSystem.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/util/hadoop/HttpFileSystem.scala @@ -37,20 +37,29 @@ import org.apache.hadoop.util.Progressable import org.apache.samza.util.Logging class HttpFileSystem extends FileSystem with Logging { - val DEFAULT_BLOCK_SIZE = 4 * 1024; + val DEFAULT_BLOCK_SIZE = 4 * 1024 var uri: URI = null + var connectionTimeoutMs = 5 * 60 * 1000 + var socketReadTimeoutMs = 5 * 60 * 1000 + + def setConnectionTimeoutMs(timeout: Int): Unit = connectionTimeoutMs = timeout + + def setSocketReadTimeoutMs(timeout: Int): Unit = socketReadTimeoutMs = timeout override def initialize(uri: URI, conf: Configuration) { super.initialize(uri, conf) -debug("init uri %s" format (uri)) +info("init uri %s" format (uri)) this.uri = uri } override def getUri = uri override def open(f: Path, bufferSize: Int): FSDataInputStream = { -debug("open http file %s" format (f)) +info("open http file %s" format (f)) val client = new HttpClient + client.getHttpConnectionManager.getParams.setConnectionTimeout(connectionTimeoutMs) +client.getHttpConnectionManager.getParams.setSoTimeout(socketReadTimeoutMs) + val method = new GetMethod(f.toUri.toString) val statusCode = client.executeMethod(method) http://git-wip-us.apache.org/repos/asf/samza/blob/bf9bc2c3/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/hadoop/TestHttpFileSystem.java -- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/hadoop/TestHttpFileSystem.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/hadoop/TestHttpFileSystem.java new file mode 100644 index 000..6f42856 --- /dev/null +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/hadoop/TestHttpFileSystem.java @@ -0,0 +1,161 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.job.yarn.util.hadoop; + +import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.samza.coordinator.server.HttpServer; +import org.apache.samza.util.hadoop.HttpFileSystem; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +i
[jira] (SAMZA-1079) HttpFileSystem should timeout for blocking reads when localizing containers.
Title: Message Title Jagadish commented on SAMZA-1079 Re: HttpFileSystem should timeout for blocking reads when localizing containers. Thank you Yi Pan (Data Infrastructure) for the review. Submitted! Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (SAMZA-1079) HttpFileSystem should timeout for blocking reads when localizing containers.
Title: Message Title ASF GitHub Bot commented on SAMZA-1079 Re: HttpFileSystem should timeout for blocking reads when localizing containers. Github user asfgit closed the pull request at: https://github.com/apache/samza/pull/42 Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (SAMZA-1077) SamzaContainer should catch all Throwables instead of only exceptions
Title: Message Title Jagadish resolved as Fixed Samza / SAMZA-1077 SamzaContainer should catch all Throwables instead of only exceptions Change By: Jagadish Resolution: Fixed Status: Open Resolved Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (SAMZA-1074) Cannot build hello-samza on various CDH version
Title: Message Title ASF GitHub Bot commented on SAMZA-1074 Re: Cannot build hello-samza on various CDH version Github user asfgit closed the pull request at: https://github.com/apache/samza/pull/38 Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
samza git commit: SAMZA-1074; Fix builds for hello-samza on various CDH versions
Repository: samza Updated Branches: refs/heads/master bf9bc2c37 -> 154cda2c7 SAMZA-1074; Fix builds for hello-samza on various CDH versions This issue is mainly related of hello-samza, but modification of document is needed to help user understand this process well. Author: Dongkyu Hwangbo Reviewers: jvenkatr Closes #38 from dkhwangbo/SAMZA-1074 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/154cda2c Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/154cda2c Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/154cda2c Branch: refs/heads/master Commit: 154cda2c7f7b6569557a188484e262882af3f4f2 Parents: bf9bc2c Author: Dongkyu Hwangbo Authored: Mon Jan 30 15:56:56 2017 -0800 Committer: vjagadish1989 Committed: Mon Jan 30 15:56:56 2017 -0800 -- docs/learn/tutorials/versioned/deploy-samza-to-CDH.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/samza/blob/154cda2c/docs/learn/tutorials/versioned/deploy-samza-to-CDH.md -- diff --git a/docs/learn/tutorials/versioned/deploy-samza-to-CDH.md b/docs/learn/tutorials/versioned/deploy-samza-to-CDH.md index 8016aca..fff209f 100644 --- a/docs/learn/tutorials/versioned/deploy-samza-to-CDH.md +++ b/docs/learn/tutorials/versioned/deploy-samza-to-CDH.md @@ -26,7 +26,7 @@ The tutorial assumes you have successfully run [hello-samza](../../../startup/he We need to use a specific compile option to build hello-samza package for CDH 5.4.0 {% highlight bash %} -mvn clean package -Denv=cdh5.4.0 +mvn clean package -Dhadoop.version=cdh5.4.0 {% endhighlight %} ### Upload Package to Cluster @@ -37,7 +37,7 @@ There are a few ways of uploading the package to the cluster's HDFS. If you do n hadoop fs -put path/to/hello-samza-0.11.0-dist.tar.gz /path/for/tgz {% endhighlight %} -### Get Deloying Scripts +### Get Deploying Scripts Untar the job package (assume you will run from the current directory)
[jira] (SAMZA-1080) Standalone Samza with No Coordination
Title: Message Title Navina Ramesh updated an issue Samza / SAMZA-1080 Standalone Samza with No Coordination Change By: Navina Ramesh Fix Version/s: 0.13.0 Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[1/2] samza git commit: SAMZA-1080 : Initial Standalone JobCoordinator and StreamProcessor API
Repository: samza Updated Branches: refs/heads/master 154cda2c7 -> 2a3a5ac7f http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala -- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala deleted file mode 100644 index df63b97..000 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala +++ /dev/null @@ -1,360 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.coordinator - - -import java.util -import java.util.concurrent.atomic.AtomicReference - -import org.apache.samza.config.JobConfig.Config2Job -import org.apache.samza.config.SystemConfig.Config2System -import org.apache.samza.config.TaskConfig.Config2Task -import org.apache.samza.config.{Config, StorageConfig} -import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory -import org.apache.samza.container.grouper.task.{BalancingTaskNameGrouper, TaskNameGrouperFactory} -import org.apache.samza.container.{LocalityManager, TaskName} -import org.apache.samza.coordinator.server.{HttpServer, JobServlet} -import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory -import org.apache.samza.job.model.{JobModel, TaskModel} -import org.apache.samza.metrics.MetricsRegistryMap -import org.apache.samza.storage.ChangelogPartitionManager -import org.apache.samza.system.{ExtendedSystemAdmin, StreamMetadataCache, SystemFactory, SystemStreamPartition, SystemStreamPartitionMatcher} -import org.apache.samza.util.{Logging, Util} -import org.apache.samza.{Partition, SamzaException} - -import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ - - -/** - * Helper companion object that is responsible for wiring up a JobCoordinator - * given a Config object. - */ -object JobModelManager extends Logging { - - /** - * a volatile value to store the current instantiated JobCoordinator - */ - @volatile var currentJobModelManager: JobModelManager = null - val jobModelRef: AtomicReference[JobModel] = new AtomicReference[JobModel]() - - /** - * @param coordinatorSystemConfig A config object that contains job.name, - * job.id, and all system..* - * configuration. The method will use this config to read all configuration - * from the coordinator stream, and instantiate a JobCoordinator. - */ - def apply(coordinatorSystemConfig: Config, metricsRegistryMap: MetricsRegistryMap): JobModelManager = { -val coordinatorStreamSystemFactory: CoordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory() -val coordinatorSystemConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, metricsRegistryMap) -val coordinatorSystemProducer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(coordinatorSystemConfig, metricsRegistryMap) -info("Registering coordinator system stream.") -coordinatorSystemConsumer.register -debug("Starting coordinator system stream.") -coordinatorSystemConsumer.start -debug("Bootstrapping coordinator system stream.") -coordinatorSystemConsumer.bootstrap -val source = "Job-coordinator" -coordinatorSystemProducer.register(source) -info("Registering coordinator system stream producer.") -val config = coordinatorSystemConsumer.getConfig -info("Got config: %s" format config) -val changelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, source) -val localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer) - -val systemNames = getSystemNames(config) - -// Map the name of each system to the corresponding SystemAdmin -val systemAdmins = systemNames.map(systemName => { - val systemFactoryClassName = config -.getSystemFactory(systemName) -.getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configurati
[2/2] samza git commit: SAMZA-1080 : Initial Standalone JobCoordinator and StreamProcessor API
SAMZA-1080 : Initial Standalone JobCoordinator and StreamProcessor API This patch contains changes associated with the Standalone StreamProcessor, where there is no coordination. This will work for load-balanced consumers like new Kafka consumer and statically partitioned cases. Additionally, we have introduced TaskFactory for StreamTask and AsyncStreamTask. Author: navina Reviewers: xinyuiscool,fredji97 Closes #44 from navina/Noop-JC Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2a3a5ac7 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2a3a5ac7 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2a3a5ac7 Branch: refs/heads/master Commit: 2a3a5ac7f21dc4213ff6ec96e11a798bff096d04 Parents: 154cda2 Author: navina Authored: Mon Jan 30 18:30:02 2017 -0800 Committer: navina Committed: Mon Jan 30 18:30:02 2017 -0800 -- NOTICE | 1 + .../samza/task/AsyncStreamTaskFactory.java | 28 ++ .../apache/samza/task/StreamTaskFactory.java| 31 ++ .../samza/config/JobCoordinatorConfig.java | 40 ++ .../org/apache/samza/config/TaskConfigJava.java | 25 ++ .../apache/samza/container/RunLoopFactory.java | 20 +- .../AllSspToSingleTaskGrouperFactory.java | 69 .../task/SingleContainerGrouperFactory.java | 56 +++ .../samza/coordinator/JobCoordinator.java | 72 .../coordinator/JobCoordinatorFactory.java | 35 ++ .../processor/SamzaContainerController.java | 152 .../apache/samza/processor/StreamProcessor.java | 173 + .../standalone/StandaloneJobCoordinator.java| 148 .../StandaloneJobCoordinatorFactory.java| 31 ++ .../org/apache/samza/task/AsyncRunLoop.java | 37 +- .../org/apache/samza/config/JobConfig.scala | 3 + .../org/apache/samza/container/RunLoop.scala| 18 +- .../apache/samza/container/SamzaContainer.scala | 178 ++--- .../apache/samza/container/TaskInstance.scala | 21 +- .../samza/coordinator/JobCoordinator.scala | 360 -- .../samza/coordinator/JobModelManager.scala | 362 +++ .../samza/job/local/ThreadJobFactory.scala | 23 +- .../org/apache/samza/task/TestAsyncRunLoop.java | 20 +- .../apache/samza/container/TestRunLoop.scala| 15 +- .../samza/container/TestSamzaContainer.scala| 6 +- .../samza/container/TestTaskInstance.scala | 6 +- .../test/StandaloneIntegrationTestHarness.java | 84 + .../apache/samza/test/StandaloneTestUtils.java | 111 ++ .../test/processor/IdentityStreamTask.java | 72 .../test/processor/TestStreamProcessor.java | 226 .../AbstractIntegrationTestHarness.scala| 60 +++ .../AbstractKafkaServerTestHarness.scala| 123 +++ .../harness/AbstractZookeeperTestHarness.scala | 72 33 files changed, 2185 insertions(+), 493 deletions(-) -- http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/NOTICE -- diff --git a/NOTICE b/NOTICE index 3352dda..2ee5fdc 100644 --- a/NOTICE +++ b/NOTICE @@ -3,3 +3,4 @@ Copyright 2014 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). + http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java -- diff --git a/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java b/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java new file mode 100644 index 000..e5ce9c4 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +/** + * Build {@link AsyncStreamTask} instances. + * Implementations should return a new instance
[jira] (SAMZA-1080) Standalone Samza with No Coordination
Title: Message Title ASF GitHub Bot commented on SAMZA-1080 Re: Standalone Samza with No Coordination Github user asfgit closed the pull request at: https://github.com/apache/samza/pull/44 Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (SAMZA-1080) Standalone Samza with No Coordination
Title: Message Title Navina Ramesh resolved as Fixed Issue resolved by pull request 44 https://github.com/apache/samza/pull/44 Samza / SAMZA-1080 Standalone Samza with No Coordination Change By: Navina Ramesh Resolution: Fixed Status: In Progress Resolved Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)