[ 
https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15503504#comment-15503504
 ] 

ASF GitHub Bot commented on FLINK-3874:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2430#discussion_r79390399
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
 ---
    @@ -17,123 +17,60 @@
      */
     package org.apache.flink.streaming.connectors.kafka;
     
    -import org.apache.flink.api.common.functions.RichMapFunction;
    -import org.apache.flink.api.common.restartstrategy.RestartStrategies;
     import org.apache.flink.api.common.typeinfo.TypeInformation;
     import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
     import org.apache.flink.streaming.api.datastream.DataStream;
    -import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    -import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    -import org.apache.flink.streaming.api.functions.source.SourceFunction;
     import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
     import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    -import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    -import org.apache.flink.test.util.SuccessException;
     import org.junit.Test;
     
     import java.io.Serializable;
    -import java.util.HashSet;
     import java.util.Properties;
     
    -import static org.apache.flink.test.util.TestUtils.tryExecute;
    +import static org.junit.Assert.*;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.verify;
     
    -public abstract class KafkaTableSinkTestBase extends KafkaTestBase 
implements Serializable {
    +public abstract class KafkaTableSinkTestBase implements Serializable {
     
    -   protected final static String TOPIC = "customPartitioningTestTopic";
    -   protected final static int PARALLELISM = 1;
    +   protected final static String TOPIC = "testTopic";
        protected final static String[] FIELD_NAMES = new String[] {"field1", 
"field2"};
        protected final static TypeInformation[] FIELD_TYPES = 
TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
     
    +   protected FlinkKafkaProducerBase<Row> kafkaProducer = 
mock(FlinkKafkaProducerBase.class);
    +
        @Test
        public void testKafkaTableSink() throws Exception {
    -           LOG.info("Starting 
KafkaTableSinkTestBase.testKafkaTableSink()");
    -
    -           createTestTopic(TOPIC, PARALLELISM, 1);
    -           StreamExecutionEnvironment env = createEnvironment();
    -
    -           createProducingTopology(env);
    -           createConsumingTopology(env);
    -
    -           tryExecute(env, "custom partitioning test");
    -           deleteTestTopic(TOPIC);
    -           LOG.info("Finished 
KafkaTableSinkTestBase.testKafkaTableSink()");
    -   }
    -
    -   private StreamExecutionEnvironment createEnvironment() {
    -           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    -           env.setRestartStrategy(RestartStrategies.noRestart());
    -           env.getConfig().disableSysoutLogging();
    -           return env;
    -   }
    -
    -   private void createProducingTopology(StreamExecutionEnvironment env) {
    -           DataStream<Row> stream = env.addSource(new 
SourceFunction<Row>() {
    -                   private boolean running = true;
    -
    -                   @Override
    -                   public void run(SourceContext<Row> ctx) throws 
Exception {
    -                           long cnt = 0;
    -                           while (running) {
    -                                   Row row = new Row(2);
    -                                   row.setField(0, cnt);
    -                                   row.setField(1, "kafka-" + cnt);
    -                                   ctx.collect(row);
    -                                   cnt++;
    -                           }
    -                   }
    +           DataStream dataStream = mock(DataStream.class);
    +           KafkaTableSink kafkaTableSink = createTableSink();
    +           kafkaTableSink.emitDataStream(dataStream);
     
    -                   @Override
    -                   public void cancel() {
    -                           running = false;
    -                   }
    -           })
    -           .setParallelism(1);
    -
    -           KafkaTableSink kafkaTableSinkBase = createTableSink();
    -
    -           kafkaTableSinkBase.emitDataStream(stream);
    +           verify(dataStream).addSink(kafkaProducer);
    --- End diff --
    
    This test checks that `addSink` was called with a `FlinkKafkaProducerBase` 
object. I think it would be good to extend the test to make sure that
    - the right producer was used (Kafka 0.8, 0.9)
    - the correct topic is fetched
    - the properties are correctly passed on
    - the correct serialization scheme is used
    - the correct partitioner is used
    
    Would that be possible?


> Add a Kafka TableSink with JSON serialization
> ---------------------------------------------
>
>                 Key: FLINK-3874
>                 URL: https://issues.apache.org/jira/browse/FLINK-3874
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Fabian Hueske
>            Assignee: Ivan Mushketyk
>            Priority: Minor
>
> Add a TableSink that writes JSON serialized data to Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to