[ 
https://issues.apache.org/jira/browse/SAMOA-65?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050487#comment-16050487
 ] 

ASF GitHub Bot commented on SAMOA-65:
-------------------------------------

Github user nicolas-kourtellis commented on a diff in the pull request:

    https://github.com/apache/incubator-samoa/pull/64#discussion_r122203064
  
    --- Diff: 
samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorWithJsonTest.java
 ---
    @@ -0,0 +1,187 @@
    +/*
    + * Copyright 2017 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.samoa.streams.kafka;
    +
    +/*
    + * #%L
    + * SAMOA
    + * %%
    + * Copyright (C) 2014 - 2017 Apache Software Foundation
    + * %%
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + * 
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + * 
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + * #L%
    + */
    +import com.google.gson.Gson;
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.nio.file.Files;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.Random;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +import java.util.logging.Level;
    +import java.util.logging.Logger;
    +import org.apache.samoa.learners.InstanceContentEvent;
    +import org.junit.After;
    +import org.junit.AfterClass;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import static org.junit.Assert.*;
    +import kafka.admin.AdminUtils;
    +import kafka.admin.RackAwareMode;
    +import kafka.server.KafkaConfig;
    +import kafka.server.KafkaServer;
    +import kafka.utils.MockTime;
    +import kafka.utils.TestUtils;
    +import org.apache.kafka.common.utils.Time;
    +import kafka.utils.ZKStringSerializer$;
    +import kafka.utils.ZkUtils;
    +import kafka.zk.EmbeddedZookeeper;
    +import org.I0Itec.zkclient.ZkClient;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.samoa.instances.InstancesHeader;
    +
    +/**
    + *
    + * @author pwawrzyniak
    + * @author Jakub Jankowski
    + */
    +public class KafkaEntranceProcessorWithJsonTest {
    +
    +    private static final String ZKHOST = "127.0.0.1";
    +    private static final String BROKERHOST = "127.0.0.1";
    +    private static final String BROKERPORT = "9092";
    +    private static final String TOPIC_AVRO = "samoa_test-avro";
    +    private static final String TOPIC_JSON = "samoa_test-json";
    +    private static final int NUM_INSTANCES = 11111;
    +
    +    private static KafkaServer kafkaServer;
    +    private static EmbeddedZookeeper zkServer;
    +    private static ZkClient zkClient;
    +    private static String zkConnect;
    +    private static int TIMEOUT = 1000;
    +
    +    public KafkaEntranceProcessorWithJsonTest() {
    +    }
    +
    +    @BeforeClass
    +    public static void setUpClass() throws IOException {
    +        // setup Zookeeper
    +        zkServer = new EmbeddedZookeeper();
    +        zkConnect = ZKHOST + ":" + zkServer.port();
    +        zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
    +        ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
    +
    +        // setup Broker
    +        Properties brokerProps = new Properties();
    +        brokerProps.setProperty("zookeeper.connect", zkConnect);
    +        brokerProps.setProperty("broker.id", "0");
    +        brokerProps.setProperty("log.dirs", 
Files.createTempDirectory("kafka-").toAbsolutePath().toString());
    +        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + 
":" + BROKERPORT);
    +        KafkaConfig config = new KafkaConfig(brokerProps);
    +        Time mock = new MockTime();
    +        kafkaServer = TestUtils.createServer(config, mock);
    +
    +        // create topics
    +        AdminUtils.createTopic(zkUtils, TOPIC_AVRO, 1, 1, new 
Properties(), RackAwareMode.Disabled$.MODULE$);
    --- End diff --
    
    Similar with previous comments


> Apache Kafka integration components for SAMOA
> ---------------------------------------------
>
>                 Key: SAMOA-65
>                 URL: https://issues.apache.org/jira/browse/SAMOA-65
>             Project: SAMOA
>          Issue Type: New Feature
>          Components: SAMOA-API, SAMOA-Instances
>            Reporter: Piotr Wawrzyniak
>              Labels: kafka, sink, source, streaming
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> As of now Apache SAMOA includes no integration components for Apache Kafka, 
> meaning in particular no possibility to read data coming from Kafka and write 
> data with prediction results back to Kafka.
> The key assumptions for the development of Kafka-related components are as 
> follows:
> 1)    develop support for input data stream arriving to Apache Samoa via 
> Apache Kafka
> 2)    develop support for output data stream produced by Apache Samoa, 
> including the results of stream mining and forwarded to Apache Kafka to be 
> provided in this way to other modules consuming the stream.
> This makes the goal of this issue is to create the following components:
> 1)    KafkaEntranceProcessor in samoa-api. This entrance processor will be 
> able to accept incoming Kafka stream. It will require KafkaDeserializer 
> interface implementation to be delivered. The role of Deserializer would be 
> to translate incoming Apache Kafka messages into implementation of Instance 
> interface of SAMOA.
> 2)    KafkaDestinationProcessor in samoa-api. Similarly to the 
> KafkaEntranceProcessor, this processor would require KafkaSerializer 
> interface implementation to be delivered. The role of Serializer would be to 
> create a Kafka message from the underlying Instance class.
> 3)    KafkaStream, as the extension to existing streams (e.g. 
> InstanceStream), would take similar role to other streams, and will provide 
> the control over Instances flows in the entire topology.
> Moreover, the following assumptions are considered:
> 1)    Components would be implemented with the use of most up-to-date version 
> of Apache Kafka, i.e. 0.10
> 2)    Samples of aforementioned Serializer and Deserializer would be 
> delivered, both supporting AVRO and JSON serialization of Instance objects.
> 3)    Sample testing classes providing reference use of Kafka source and 
> destination would be included in the project as well.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to