Repository: samza
Updated Branches:
  refs/heads/master 58c39e371 -> 67ce608f2


Initial version for in memory system.

Getting the PR out to unblock sanil.
Pending tasks
 1. Add documentation
 2. Add more tests
 3. Currently initialization of the stream happens using createStream on admin. 
We need to find a way to expose the same functionality to low level users.
 4. To populate the initial stream, clients need to get a handle on producer 
and produce the messages. Alternatively, we can support serialization of source 
data and pass it as config to the system to initialize. We have a hook in place 
for this but not implemented it completely.
  5. Clean up consumed data on the buffer based on the lowest offsets of the 
consumers.

I will create JIRAs for all the pending tasks and fix them iteratively.

xinyuiscool ^^

Author: Bharath Kumarasubramanian <bkuma...@linkedin.com>

Reviewers: Xinyu Liu <xinyuliu...@gmail.com>

Closes #459 from bharathkk/single-node-testing


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/67ce608f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/67ce608f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/67ce608f

Branch: refs/heads/master
Commit: 67ce608f2e5bea6795b44213f0b90825141a217c
Parents: 58c39e3
Author: Bharath Kumarasubramanian <bkuma...@linkedin.com>
Authored: Tue Apr 17 09:09:01 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Tue Apr 17 09:09:01 2018 -0700

----------------------------------------------------------------------
 .../samza/system/inmemory/InMemoryManager.java  | 179 ++++++++++++++++
 .../system/inmemory/InMemorySystemAdmin.java    | 137 ++++++++++++
 .../system/inmemory/InMemorySystemConsumer.java | 148 +++++++++++++
 .../system/inmemory/InMemorySystemFactory.java  |  50 +++++
 .../system/inmemory/InMemorySystemProducer.java | 103 +++++++++
 .../system/inmemory/TestInMemorySystem.java     | 211 +++++++++++++++++++
 settings.gradle                                 |   4 +-
 sonar-project.properties                        |   6 +-
 8 files changed, 833 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/67ce608f/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
 
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
new file mode 100644
index 0000000..03bc8d7
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.inmemory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.system.EndOfStreamMessage;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Initial draft of in-memory manager. It is test only and not meant for 
production use right now.
+ */
+class InMemoryManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(InMemoryManager.class);
+  private static final int DEFAULT_PARTITION_COUNT = 1;
+
+  private final ConcurrentHashMap<SystemStreamPartition, 
List<IncomingMessageEnvelope>> bufferedMessages;
+  private final Map<SystemStream, Integer> systemStreamToPartitions;
+
+  public InMemoryManager() {
+    bufferedMessages = new ConcurrentHashMap<>();
+    systemStreamToPartitions = new ConcurrentHashMap<>();
+  }
+
+  private List<IncomingMessageEnvelope> newSynchronizedLinkedList() {
+    return  Collections.synchronizedList(new 
LinkedList<IncomingMessageEnvelope>());
+  }
+
+  /**
+   * Handles the produce request from {@link InMemorySystemProducer} and 
populates the underlying message queue.
+   *
+   * @param ssp system stream partition
+   * @param key key for message produced
+   * @param message actual payload
+   */
+  void put(SystemStreamPartition ssp, Object key, Object message) {
+    List<IncomingMessageEnvelope> messages = bufferedMessages.get(ssp);
+    String offset = String.valueOf(messages.size());
+
+    if (message instanceof EndOfStreamMessage) {
+      offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET;
+    }
+
+    IncomingMessageEnvelope messageEnvelope = new IncomingMessageEnvelope(ssp, 
offset, key, message);
+    bufferedMessages.get(ssp)
+        .add(messageEnvelope);
+  }
+
+  /**
+   * Handles the poll request from {@link InMemorySystemConsumer}. It uses the 
input offset as the starting offset for
+   * each {@link SystemStreamPartition}.
+   *
+   * @param sspsToOffsets ssps to offset mapping
+   *
+   * @return a {@link Map} of {@link SystemStreamPartition} to {@link List} of 
{@link IncomingMessageEnvelope}
+   */
+  Map<SystemStreamPartition, List<IncomingMessageEnvelope>> 
poll(Map<SystemStreamPartition, String> sspsToOffsets) {
+    return sspsToOffsets.entrySet()
+        .stream()
+        .collect(Collectors.toMap(
+            Map.Entry::getKey,
+            entry -> poll(entry.getKey(), entry.getValue())));
+  }
+
+  /**
+   * Populate the metadata for the {@link SystemStream} and initialize the 
buffer for {@link SystemStreamPartition}.
+   *
+   * @param streamSpec stream spec for the stream to be initialized
+   *
+   * @return true if successful, false otherwise
+   */
+  boolean initializeStream(StreamSpec streamSpec) {
+    LOG.info("Initializing the stream for {}", streamSpec.getId());
+    systemStreamToPartitions.put(streamSpec.toSystemStream(), 
streamSpec.getPartitionCount());
+
+    for (int partition = 0; partition < streamSpec.getPartitionCount(); 
partition++) {
+      bufferedMessages.put(
+          new SystemStreamPartition(streamSpec.toSystemStream(), new 
Partition(partition)),
+          newSynchronizedLinkedList());
+    }
+
+    return true;
+  }
+
+  /**
+   * Fetch system stream metadata for the given streams.
+   *
+   * @param streamNames set of input streams
+   *
+   * @return a {@link Map} of stream to {@link SystemStreamMetadata}
+   */
+  Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> 
streamNames) {
+    Map<String, Map<SystemStreamPartition, List<IncomingMessageEnvelope>>> 
result =
+        bufferedMessages.entrySet()
+            .stream()
+            .filter(map -> streamNames.contains(map.getKey().getStream()))
+            .collect(Collectors.groupingBy(entry -> entry.getKey().getStream(),
+                Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
+
+    return result.entrySet()
+        .stream()
+        .collect(Collectors.toMap(
+            Map.Entry::getKey,
+            entry -> constructSystemStreamMetadata(entry.getKey(), 
entry.getValue())));
+  }
+
+  /**
+   * Fetch partition count for the given input stream stream.
+   *
+   * @param systemStream input system stream
+   *
+   * @return the partition count if available or {@link 
InMemoryManager#DEFAULT_PARTITION_COUNT}
+   */
+  int getPartitionCountForSystemStream(SystemStream systemStream) {
+    return systemStreamToPartitions.getOrDefault(systemStream, 
DEFAULT_PARTITION_COUNT);
+  }
+
+  private SystemStreamMetadata constructSystemStreamMetadata(
+      String systemName,
+      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> 
sspToMessagesForSystem) {
+
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> 
partitionMetadata =
+        sspToMessagesForSystem
+            .entrySet()
+            .stream()
+            .collect(Collectors.toMap(entry -> entry.getKey().getPartition(), 
entry -> {
+                String oldestOffset = "0";
+                String newestOffset = String.valueOf(entry.getValue().size());
+                String upcomingOffset = String.valueOf(entry.getValue().size() 
+ 1);
+
+                return new 
SystemStreamMetadata.SystemStreamPartitionMetadata(oldestOffset, newestOffset, 
upcomingOffset);
+
+              }));
+
+    return new SystemStreamMetadata(systemName, partitionMetadata);
+  }
+
+  private List<IncomingMessageEnvelope> poll(SystemStreamPartition ssp, String 
offset) {
+    int startingOffset = Integer.parseInt(offset);
+    List<IncomingMessageEnvelope> messageEnvelopesForSSP = 
bufferedMessages.getOrDefault(ssp, new LinkedList<>());
+
+    if (startingOffset >= messageEnvelopesForSSP.size()) {
+      return new ArrayList<>();
+    }
+
+    return messageEnvelopesForSSP.subList(startingOffset, 
messageEnvelopesForSSP.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/67ce608f/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemAdmin.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemAdmin.java
 
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemAdmin.java
new file mode 100644
index 0000000..327615e
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemAdmin.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.inmemory;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.StreamValidationException;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * Initial draft of in-memory {@link SystemAdmin}. It is test only and not 
meant for production use right now.
+ */
+public class InMemorySystemAdmin implements SystemAdmin {
+  private final InMemoryManager inMemoryManager;
+
+  public InMemorySystemAdmin(InMemoryManager manager) {
+    inMemoryManager = manager;
+  }
+
+  @Override
+  public void start() {
+
+  }
+
+  @Override
+  public void stop() {
+
+  }
+
+  /**
+   * Fetches the offsets for the messages immediately after the supplied 
offsets
+   * for a group of SystemStreamPartitions.
+   *
+   * @param offsets
+   *          Map from SystemStreamPartition to current offsets.
+   * @return Map from SystemStreamPartition to offsets immediately after the
+   *         current offsets.
+   */
+  @Override
+  public Map<SystemStreamPartition, String> 
getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+    return offsets.entrySet()
+        .stream()
+        .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
+            String offset = entry.getValue();
+            return String.valueOf(Integer.valueOf(offset) + 1);
+          }));
+  }
+
+  /**
+   * Fetch metadata from a system for a set of streams.
+   *
+   * @param streamNames
+   *          The streams to to fetch metadata for.
+   * @return A map from stream name to SystemStreamMetadata for each stream
+   *         requested in the parameter set.
+   */
+  @Override
+  public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> 
streamNames) {
+    return inMemoryManager.getSystemStreamMetadata(streamNames);
+  }
+
+  /**
+   * Compare the two offsets. -1, 0, +1 means offset1 &lt; offset2,
+   * offset1 == offset2 and offset1 &gt; offset2 respectively. Return
+   * null if those two offsets are not comparable
+   *
+   * @param offset1 First offset for comparison.
+   * @param offset2 Second offset for comparison.
+   * @return -1 if offset1 &lt; offset2; 0 if offset1 == offset2; 1 if offset1 
&gt; offset2. Null if not comparable
+   */
+  @Override
+  public Integer offsetComparator(String offset1, String offset2) {
+    if (offset1 == null || offset2 == null) {
+      return null;
+    }
+
+    return Integer.compare(Integer.parseInt(offset1), 
Integer.parseInt(offset2));
+  }
+
+  /**
+   * Create a stream described by the spec.
+   *
+   * @param streamSpec  The spec, or blueprint from which the physical stream 
will be created on the system.
+   * @return            {@code true} if the stream was actually created and 
not pre-existing.
+   *                    {@code false} if the stream was pre-existing.
+   *                    A RuntimeException will be thrown if creation fails.
+   */
+  @Override
+  public boolean createStream(StreamSpec streamSpec) {
+    return inMemoryManager.initializeStream(streamSpec);
+  }
+
+  /**
+   * Validates the stream described by the streamSpec on the system.
+   * A {@link StreamValidationException} should be thrown for any validation 
error.
+   *
+   * @param streamSpec  The spec, or blueprint for the physical stream on the 
system.
+   * @throws StreamValidationException if validation fails.
+   */
+  @Override
+  public void validateStream(StreamSpec streamSpec) throws 
StreamValidationException {
+
+  }
+
+  /**
+   * Clear the stream described by the spec.
+   * @param streamSpec  The spec for the physical stream on the system.
+   * @return {@code true} if the stream was successfully cleared.
+   *         {@code false} if clearing stream failed.
+   */
+  @Override
+  public boolean clearStream(StreamSpec streamSpec) {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/67ce608f/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemConsumer.java
 
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemConsumer.java
new file mode 100644
index 0000000..08540ae
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemConsumer.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.inmemory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Initial draft of in-memory {@link SystemConsumer}. It is test only and not 
meant for production use right now.
+ */
+public class InMemorySystemConsumer implements SystemConsumer {
+  private static final Logger LOG = 
LoggerFactory.getLogger(InMemorySystemConsumer.class);
+
+  private final InMemoryManager memoryManager;
+  private final Map<SystemStreamPartition, String> sspToOffset;
+
+  public InMemorySystemConsumer(InMemoryManager manager) {
+    memoryManager = manager;
+    sspToOffset = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Tells the SystemConsumer to connect to the underlying system, and prepare
+   * to begin serving messages when poll is invoked.
+   */
+  @Override
+  public void start() {
+    LOG.info("Starting in memory system consumer...");
+  }
+
+  /**
+   * Tells the SystemConsumer to close all connections, release all resource,
+   * and shut down everything. The SystemConsumer will not be used again after
+   * stop is called.
+   */
+  @Override
+  public void stop() {
+    LOG.info("Stopping in memory system consumer...");
+  }
+
+  /**
+   * Register a SystemStreamPartition to this SystemConsumer. The 
SystemConsumer
+   * should try and read messages from all SystemStreamPartitions that are
+   * registered to it. SystemStreamPartitions should only be registered before
+   * start is called.
+   *  @param systemStreamPartition
+   *          The SystemStreamPartition object representing the Samza
+   *          SystemStreamPartition to receive messages from.
+   * @param offset
+   *          String representing the offset of the point in the stream to 
start
+   *          reading messages from. This is an inclusive parameter; if "7" 
were
+   *          specified, the first message for the system/stream/partition to 
be
+   *          consumed and returned would be a message whose offset is "7".
+   *          Note: For broadcast streams, different tasks may checkpoint the 
same ssp with different values. It
+   */
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, String 
offset) {
+    LOG.info("Registering ssp {} with starting offset {}", 
systemStreamPartition, offset);
+    sspToOffset.put(systemStreamPartition, offset);
+  }
+
+  /**
+   * Poll the SystemConsumer to get any available messages from the underlying
+   * system.
+   *
+   * <p>
+   * If the underlying implementation does not take care to adhere to the
+   * timeout parameter, the SamzaContainer's performance will suffer
+   * drastically. Specifically, if poll blocks when it's not supposed to, it
+   * will block the entire main thread in SamzaContainer, and no messages will
+   * be processed while blocking is occurring.
+   * </p>
+   *
+   * @param systemStreamPartitions
+   *          A set of SystemStreamPartition to poll for new messages. If
+   *          SystemConsumer has messages available for other registered
+   *          SystemStreamPartitions, but they are not in the
+   *          systemStreamPartitions set in a given poll invocation, they can't
+   *          be returned. It is illegal to pass in SystemStreamPartitions that
+   *          have not been registered with the SystemConsumer first.
+   * @param timeout
+   *          If timeout &lt; 0, poll will block unless all 
SystemStreamPartition
+   *          are at "head" (the underlying system has been checked, and
+   *          returned an empty set). If at head, an empty map is returned. If
+   *          timeout &gt;= 0, poll will return any messages that are currently
+   *          available for any of the SystemStreamPartitions specified. If no
+   *          new messages are available, it will wait up to timeout
+   *          milliseconds for messages from any SystemStreamPartition to 
become
+   *          available. It will return an empty map if the timeout is hit, and
+   *          no new messages are available.
+   * @return A map from SystemStreamPartitions to any available
+   *         IncomingMessageEnvelopes for the SystemStreamPartitions. If no
+   *         messages are available for a SystemStreamPartition that was
+   *         supplied in the polling set, the map will not contain a key for 
the
+   *         SystemStreamPartition. Will return an empty map, not null, if no
+   *         new messages are available for any SystemStreamPartitions in the
+   *         input set.
+   * @throws InterruptedException
+   *          Thrown when a blocking poll has been interrupted by another
+   *          thread.
+   */
+  @Override
+  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
+      Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws 
InterruptedException {
+    Map<SystemStreamPartition, String> sspOffsetPairToFetch = 
sspToOffset.entrySet()
+        .stream()
+        .filter(entry -> systemStreamPartitions.contains(entry.getKey()))
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> result = 
memoryManager.poll(sspOffsetPairToFetch);
+
+    for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> 
sspToMessage : result.entrySet()) {
+      sspToOffset.computeIfPresent(sspToMessage.getKey(), (ssp, offset) -> {
+          int newOffset = Integer.parseInt(offset) + 
sspToMessage.getValue().size();
+          return String.valueOf(newOffset);
+        });
+      // absent should never be the case
+    }
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/67ce608f/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemFactory.java
 
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemFactory.java
new file mode 100644
index 0000000..f78b7f4
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.inmemory;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+
+
+/**
+ * Initial draft of in-memory {@link SystemFactory}. It is test only and not 
meant for production use right now.
+ */
+public class InMemorySystemFactory implements SystemFactory {
+  private static final InMemoryManager MEMORY_MANAGER = new InMemoryManager();
+
+  @Override
+  public SystemConsumer getConsumer(String systemName, Config config, 
MetricsRegistry registry) {
+    return new InMemorySystemConsumer(MEMORY_MANAGER);
+  }
+
+  @Override
+  public SystemProducer getProducer(String systemName, Config config, 
MetricsRegistry registry) {
+    return new InMemorySystemProducer(systemName, MEMORY_MANAGER);
+  }
+
+  @Override
+  public SystemAdmin getAdmin(String systemName, Config config) {
+    return new InMemorySystemAdmin(MEMORY_MANAGER);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/67ce608f/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java
 
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java
new file mode 100644
index 0000000..052e72b
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.inmemory;
+
+import java.util.Optional;
+import org.apache.samza.Partition;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Initial draft of in-memory {@link SystemProducer}. It is test only and not 
meant for production use right now.
+ */
+public class InMemorySystemProducer implements SystemProducer {
+  private static final Logger LOG = 
LoggerFactory.getLogger(InMemorySystemProducer.class);
+  private final InMemoryManager memoryManager;
+  private final String systemName;
+
+  public InMemorySystemProducer(String systemName, InMemoryManager manager) {
+    this.systemName = systemName;
+    this.memoryManager = manager;
+  }
+
+  /**
+   * Start the SystemProducer. After this method finishes it should be ready 
to accept messages received from the send method.
+   */
+  @Override
+  public void start() {
+    LOG.info("Starting in memory system producer for {}", systemName);
+  }
+
+  /**
+   * Stop the SystemProducer. After this method finished, the system should 
have completed all necessary work, sent
+   * any remaining messages and will not receive any new calls to the send 
method.
+   */
+  @Override
+  public void stop() {
+    LOG.info("Stopping in memory system producer for {}", systemName);
+  }
+
+  /**
+   * Registers this producer to send messages from a specified Samza source, 
such as a StreamTask.
+
+   * @param source String representing the source of the message.
+   */
+  @Override
+  public void register(String source) {
+    LOG.info("Registering source {} with in memory producer", source);
+  }
+
+  /**
+   * Sends a specified message envelope from a specified Samza source.
+
+   * @param source String representing the source of the message.
+   * @param envelope Aggregate object representing the serialized message to 
send from the source.
+   */
+  @Override
+  public void send(String source, OutgoingMessageEnvelope envelope) {
+    Object key = envelope.getKey();
+    Object message = envelope.getMessage();
+
+    // use the hashcode from partition key in the outgoing message envelope or 
default to message hashcode
+    int hashCode = Optional.ofNullable(envelope.getPartitionKey())
+        .map(Object::hashCode)
+        .orElse(message.hashCode());
+    int partition = Math.abs(hashCode) % 
memoryManager.getPartitionCountForSystemStream(envelope.getSystemStream());
+
+    SystemStreamPartition ssp = new 
SystemStreamPartition(envelope.getSystemStream(), new Partition(partition));
+    memoryManager.put(ssp, key, message);
+  }
+
+  /**
+   * If the SystemProducer buffers messages before sending them to its 
underlying system, it should flush those
+   * messages and leave no messages remaining to be sent.
+   *
+
+   * @param source String representing the source of the message.
+   */
+  @Override
+  public void flush(String source) {
+    // nothing to do
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/67ce608f/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
 
b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
new file mode 100644
index 0000000..7d5dfd0
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.inmemory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.EndOfStreamMessage;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+public class TestInMemorySystem {
+  private static final String SYSTEM_NAME = "in-memory";
+  private static final String STREAM_NAME = "test-stream";
+  private static final String SOURCE = "test-in-memory-source";
+  private static final int PARTITION_COUNT = 5;
+  private static final int POLL_TIMEOUT_MS = 100;
+
+  private static final int TEST_MEMBER_X = 1234;
+  private static final int PAGE_ID_X = 3456;
+
+  private static final int TEST_MEMBER_Y = 2345;
+  private static final int PAGE_ID_Y = 2222;
+
+  private static final SystemStream SYSTEM_STREAM = new 
SystemStream(SYSTEM_NAME, STREAM_NAME);
+
+  private MetricsRegistry mockRegistry = mock(MetricsRegistry.class);
+  private Config config = new MapConfig();
+
+  private InMemorySystemFactory systemFactory;
+  private SystemAdmin systemAdmin;
+
+  public TestInMemorySystem() {
+    Config config = new MapConfig();
+
+    systemFactory = new InMemorySystemFactory();
+    systemAdmin = systemFactory.getAdmin(SYSTEM_NAME, config);
+    systemAdmin.createStream(new StreamSpec(STREAM_NAME, STREAM_NAME, 
SYSTEM_NAME, PARTITION_COUNT));
+  }
+
+  @Test
+  public void testMessageFlow() {
+    PageViewEvent event1 = new PageViewEvent(TEST_MEMBER_X, PAGE_ID_X, 
System.currentTimeMillis());
+    PageViewEvent event2 = new PageViewEvent(TEST_MEMBER_Y, PAGE_ID_Y, 
System.currentTimeMillis());
+
+    produceMessages(event1, event2);
+
+    Set<SystemStreamPartition> sspsToPoll = IntStream.range(0, PARTITION_COUNT)
+        .mapToObj(partition -> new SystemStreamPartition(SYSTEM_STREAM, new 
Partition(partition)))
+        .collect(Collectors.toSet());
+
+    List<PageViewEvent> results = consumeMessages(sspsToPoll);
+
+    assertEquals(2, results.size());
+    assertTrue(results.contains(event1));
+    assertTrue(results.contains(event2));
+  }
+
+  @Test
+  public void testConsumerRespectsOffset() {
+    PageViewEvent event = new PageViewEvent(TEST_MEMBER_X, PAGE_ID_X, 
System.currentTimeMillis());
+    PageViewEvent event1 = new PageViewEvent(TEST_MEMBER_Y, PAGE_ID_Y, 
System.currentTimeMillis());
+
+    produceMessages(event);
+
+    SystemConsumer consumer = systemFactory.getConsumer(SYSTEM_NAME, config, 
mockRegistry);
+
+    Set<SystemStreamPartition> sspsToPoll = IntStream.range(0, PARTITION_COUNT)
+        .mapToObj(partition -> new SystemStreamPartition(SYSTEM_STREAM, new 
Partition(partition)))
+        .collect(Collectors.toSet());
+
+    // register the consumer for ssps
+    for (SystemStreamPartition ssp : sspsToPoll) {
+      consumer.register(ssp, "0");
+    }
+
+    List<PageViewEvent> results = consumeMessages(consumer, sspsToPoll);
+    assertEquals(1, results.size());
+    assertTrue(results.contains(event));
+
+    // nothing to poll
+    results = consumeMessages(consumer, sspsToPoll);
+    assertEquals(0, results.size());
+
+    produceMessages(event1);
+
+    // got new message. check if the offset has progressed
+    results = consumeMessages(consumer, sspsToPoll);
+    assertEquals(1, results.size());
+    assertTrue(results.contains(event1));
+  }
+
+  @Test
+  public void testEndOfStreamMessage() {
+    EndOfStreamMessage eos = new EndOfStreamMessage("test-task");
+
+    produceMessages(eos);
+
+    Set<SystemStreamPartition> sspsToPoll = IntStream.range(0, PARTITION_COUNT)
+        .mapToObj(partition -> new SystemStreamPartition(SYSTEM_STREAM, new 
Partition(partition)))
+        .collect(Collectors.toSet());
+
+    List<IncomingMessageEnvelope> results = consumeRawMessages(sspsToPoll);
+
+    assertEquals(1, results.size());
+    assertTrue(results.get(0).isEndOfStream());
+  }
+
+
+  private <T> List<T> consumeMessages(Set<SystemStreamPartition> sspsToPoll) {
+    SystemConsumer systemConsumer = systemFactory.getConsumer(SYSTEM_NAME, 
config, mockRegistry);
+
+    // register the consumer for ssps
+    for (SystemStreamPartition ssp : sspsToPoll) {
+      systemConsumer.register(ssp, "0");
+    }
+
+    return consumeMessages(systemConsumer, sspsToPoll);
+  }
+
+  private <T> List<T> consumeMessages(SystemConsumer consumer, 
Set<SystemStreamPartition> sspsToPoll) {
+    return consumeRawMessages(consumer, sspsToPoll)
+        .stream()
+        .map(IncomingMessageEnvelope::getMessage)
+        .map(message -> (T) message)
+        .collect(Collectors.toList());
+  }
+
+  private List<IncomingMessageEnvelope> 
consumeRawMessages(Set<SystemStreamPartition> sspsToPoll) {
+    SystemConsumer systemConsumer = systemFactory.getConsumer(SYSTEM_NAME, 
config, mockRegistry);
+
+    // register the consumer for ssps
+    for (SystemStreamPartition ssp : sspsToPoll) {
+      systemConsumer.register(ssp, "0");
+    }
+
+    return consumeRawMessages(systemConsumer, sspsToPoll);
+  }
+
+  private List<IncomingMessageEnvelope> consumeRawMessages(SystemConsumer 
consumer, Set<SystemStreamPartition> sspsToPoll) {
+    try {
+      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> results = 
consumer.poll(sspsToPoll, POLL_TIMEOUT_MS);
+
+      return results.entrySet()
+          .stream()
+          .filter(entry -> entry.getValue().size() != 0)
+          .map(Map.Entry::getValue)
+          .flatMap(List::stream)
+          .collect(Collectors.toList());
+    } catch (Exception e) {
+      fail("Unable to consume messages");
+    }
+
+    return new ArrayList<>();
+  }
+
+  private void produceMessages(Object... events) {
+    SystemProducer systemProducer = systemFactory.getProducer(SYSTEM_NAME, 
config, mockRegistry);
+
+    Stream.of(events)
+        .forEach(event -> systemProducer.send(SOURCE, new 
OutgoingMessageEnvelope(SYSTEM_STREAM, event)));
+  }
+
+  private class PageViewEvent {
+    int memberId;
+    int pageId;
+    long viewTime;
+
+    public PageViewEvent(int memberId, int pageId, long viewTime) {
+      this.memberId = memberId;
+      this.pageId = pageId;
+      this.viewTime = viewTime;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/67ce608f/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 06141ac..99d27a0 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -19,13 +19,13 @@
 
 include \
   'samza-api',
+  'samza-aws',
+  'samza-azure',
   'samza-elasticsearch',
   'samza-log4j',
   'samza-rest',
   'samza-shell',
-  'samza-azure',
   'samza-sql',
-  'samza-aws',
   'samza-tools'
 
 def scalaModules = [

http://git-wip-us.apache.org/repos/asf/samza/blob/67ce608f/sonar-project.properties
----------------------------------------------------------------------
diff --git a/sonar-project.properties b/sonar-project.properties
index 706d99c..b6f3b7e 100644
--- a/sonar-project.properties
+++ b/sonar-project.properties
@@ -5,15 +5,15 @@
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
-# 
+#
 # http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-# 
+#
 
 sonar.projectKey=org.apache.samza
 sonar.projectName=Apache Samza

Reply via email to