[
https://issues.apache.org/jira/browse/SAMOA-65?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050487#comment-16050487
]
ASF GitHub Bot commented on SAMOA-65:
-------------------------------------
Github user nicolas-kourtellis commented on a diff in the pull request:
https://github.com/apache/incubator-samoa/pull/64#discussion_r122203064
--- Diff:
samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorWithJsonTest.java
---
@@ -0,0 +1,187 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import com.google.gson.Gson;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.samoa.learners.InstanceContentEvent;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import org.apache.kafka.common.utils.Time;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.samoa.instances.InstancesHeader;
+
+/**
+ *
+ * @author pwawrzyniak
+ * @author Jakub Jankowski
+ */
+public class KafkaEntranceProcessorWithJsonTest {
+
+ private static final String ZKHOST = "127.0.0.1";
+ private static final String BROKERHOST = "127.0.0.1";
+ private static final String BROKERPORT = "9092";
+ private static final String TOPIC_AVRO = "samoa_test-avro";
+ private static final String TOPIC_JSON = "samoa_test-json";
+ private static final int NUM_INSTANCES = 11111;
+
+ private static KafkaServer kafkaServer;
+ private static EmbeddedZookeeper zkServer;
+ private static ZkClient zkClient;
+ private static String zkConnect;
+ private static int TIMEOUT = 1000;
+
+ public KafkaEntranceProcessorWithJsonTest() {
+ }
+
+ @BeforeClass
+ public static void setUpClass() throws IOException {
+ // setup Zookeeper
+ zkServer = new EmbeddedZookeeper();
+ zkConnect = ZKHOST + ":" + zkServer.port();
+ zkClient = new ZkClient(zkConnect, 30000, 30000,
ZKStringSerializer$.MODULE$);
+ ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
+
+ // setup Broker
+ Properties brokerProps = new Properties();
+ brokerProps.setProperty("zookeeper.connect", zkConnect);
+ brokerProps.setProperty("broker.id", "0");
+ brokerProps.setProperty("log.dirs",
Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+ brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST +
":" + BROKERPORT);
+ KafkaConfig config = new KafkaConfig(brokerProps);
+ Time mock = new MockTime();
+ kafkaServer = TestUtils.createServer(config, mock);
+
+ // create topics
+ AdminUtils.createTopic(zkUtils, TOPIC_AVRO, 1, 1, new
Properties(), RackAwareMode.Disabled$.MODULE$);
--- End diff --
Similar with previous comments
> Apache Kafka integration components for SAMOA
> ---------------------------------------------
>
> Key: SAMOA-65
> URL: https://issues.apache.org/jira/browse/SAMOA-65
> Project: SAMOA
> Issue Type: New Feature
> Components: SAMOA-API, SAMOA-Instances
> Reporter: Piotr Wawrzyniak
> Labels: kafka, sink, source, streaming
> Original Estimate: 672h
> Remaining Estimate: 672h
>
> As of now Apache SAMOA includes no integration components for Apache Kafka,
> meaning in particular no possibility to read data coming from Kafka and write
> data with prediction results back to Kafka.
> The key assumptions for the development of Kafka-related components are as
> follows:
> 1) develop support for input data stream arriving to Apache Samoa via
> Apache Kafka
> 2) develop support for output data stream produced by Apache Samoa,
> including the results of stream mining and forwarded to Apache Kafka to be
> provided in this way to other modules consuming the stream.
> This makes the goal of this issue is to create the following components:
> 1) KafkaEntranceProcessor in samoa-api. This entrance processor will be
> able to accept incoming Kafka stream. It will require KafkaDeserializer
> interface implementation to be delivered. The role of Deserializer would be
> to translate incoming Apache Kafka messages into implementation of Instance
> interface of SAMOA.
> 2) KafkaDestinationProcessor in samoa-api. Similarly to the
> KafkaEntranceProcessor, this processor would require KafkaSerializer
> interface implementation to be delivered. The role of Serializer would be to
> create a Kafka message from the underlying Instance class.
> 3) KafkaStream, as the extension to existing streams (e.g.
> InstanceStream), would take similar role to other streams, and will provide
> the control over Instances flows in the entire topology.
> Moreover, the following assumptions are considered:
> 1) Components would be implemented with the use of most up-to-date version
> of Apache Kafka, i.e. 0.10
> 2) Samples of aforementioned Serializer and Deserializer would be
> delivered, both supporting AVRO and JSON serialization of Instance objects.
> 3) Sample testing classes providing reference use of Kafka source and
> destination would be included in the project as well.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)