[ 
https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15527305#comment-15527305
 ] 

ASF GitHub Bot commented on FLINK-3874:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2430#discussion_r80784331
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
 ---
    @@ -17,123 +17,79 @@
      */
     package org.apache.flink.streaming.connectors.kafka;
     
    -import org.apache.flink.api.common.functions.RichMapFunction;
    -import org.apache.flink.api.common.restartstrategy.RestartStrategies;
     import org.apache.flink.api.common.typeinfo.TypeInformation;
     import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
     import org.apache.flink.streaming.api.datastream.DataStream;
    -import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    -import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    -import org.apache.flink.streaming.api.functions.source.SourceFunction;
     import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
     import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    -import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    -import org.apache.flink.test.util.SuccessException;
    +import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
     import org.junit.Test;
     
     import java.io.Serializable;
    -import java.util.HashSet;
     import java.util.Properties;
     
    -import static org.apache.flink.test.util.TestUtils.tryExecute;
    +import static org.junit.Assert.assertArrayEquals;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotSame;
    +import static org.mockito.Matchers.eq;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.any;
    +import static org.mockito.Mockito.spy;
    +import static org.mockito.Mockito.verify;
     
    -public abstract class KafkaTableSinkTestBase extends KafkaTestBase 
implements Serializable {
    +public abstract class KafkaTableSinkTestBase implements Serializable {
     
    -   protected final static String TOPIC = "customPartitioningTestTopic";
    -   protected final static int PARALLELISM = 1;
    +   protected final static String TOPIC = "testTopic";
        protected final static String[] FIELD_NAMES = new String[] {"field1", 
"field2"};
        protected final static TypeInformation[] FIELD_TYPES = 
TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
     
    +   protected FlinkKafkaProducerBase<Row> kafkaProducer = 
mock(FlinkKafkaProducerBase.class);
    +
        @Test
        public void testKafkaTableSink() throws Exception {
    -           LOG.info("Starting 
KafkaTableSinkTestBase.testKafkaTableSink()");
    -
    -           createTestTopic(TOPIC, PARALLELISM, 1);
    -           StreamExecutionEnvironment env = createEnvironment();
    -
    -           createProducingTopology(env);
    -           createConsumingTopology(env);
    -
    -           tryExecute(env, "custom partitioning test");
    -           deleteTestTopic(TOPIC);
    -           LOG.info("Finished 
KafkaTableSinkTestBase.testKafkaTableSink()");
    -   }
    +           DataStream dataStream = mock(DataStream.class);
    +           KafkaTableSink kafkaTableSink = createTableSink();
    +           kafkaTableSink.emitDataStream(dataStream);
     
    -   private StreamExecutionEnvironment createEnvironment() {
    -           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    -           env.setRestartStrategy(RestartStrategies.noRestart());
    -           env.getConfig().disableSysoutLogging();
    -           return env;
    +           verify(dataStream).addSink(kafkaProducer);
        }
     
    -   private void createProducingTopology(StreamExecutionEnvironment env) {
    -           DataStream<Row> stream = env.addSource(new 
SourceFunction<Row>() {
    -                   private boolean running = true;
    -
    -                   @Override
    -                   public void run(SourceContext<Row> ctx) throws 
Exception {
    -                           long cnt = 0;
    -                           while (running) {
    -                                   Row row = new Row(2);
    -                                   row.setField(0, cnt);
    -                                   row.setField(1, "kafka-" + cnt);
    -                                   ctx.collect(row);
    -                                   cnt++;
    -                           }
    -                   }
    -
    -                   @Override
    -                   public void cancel() {
    -                           running = false;
    -                   }
    -           })
    -           .setParallelism(1);
    -
    -           KafkaTableSink kafkaTableSinkBase = createTableSink();
    -
    -           kafkaTableSinkBase.emitDataStream(stream);
    +   @Test
    +   public void testCorrectProducerIsCreated() throws Exception {
    +           DataStream dataStream = mock(DataStream.class);
    +           KafkaTableSink kafkaTableSink = spy(createTableSink());
    +           kafkaTableSink.emitDataStream(dataStream);
    +
    +           verify(kafkaTableSink).createKafkaProducer(
    +                   eq(TOPIC),
    +                   eq(createSinkProperties()),
    +                   any(JsonRowSerializationSchema.class),
    +                   any(CustomPartitioner.class));
        }
     
    -   private void createConsumingTopology(StreamExecutionEnvironment env) {
    -           DeserializationSchema<Row> deserializationSchema = 
createRowDeserializationSchema();
    -
    -           FlinkKafkaConsumerBase<Row> source = 
kafkaServer.getConsumer(TOPIC, deserializationSchema, standardProps);
    -
    -           env.addSource(source).setParallelism(PARALLELISM)
    -                   .map(new RichMapFunction<Row, Integer>() {
    -                           @Override
    -                           public Integer map(Row value) {
    -                                   return (Integer) 
value.productElement(0);
    -                           }
    -                   }).setParallelism(PARALLELISM)
    -
    -                   .addSink(new SinkFunction<Integer>() {
    -                           HashSet<Integer> ids = new HashSet<>();
    -                           @Override
    -                           public void invoke(Integer value) throws 
Exception {
    -                                   ids.add(value);
    -
    -                                   if (ids.size() == 100) {
    -                                           throw new SuccessException();
    -                                   }
    -                           }
    -                   }).setParallelism(1);
    +   @Test
    +   public void testConfigure() {
    +           KafkaTableSink kafkaTableSink = createTableSink();
    +           KafkaTableSink newKafkaTableSink = 
kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
    +           assertNotSame(kafkaTableSink, newKafkaTableSink);
    +
    +           assertArrayEquals(FIELD_NAMES, 
newKafkaTableSink.getFieldNames());
    +           assertArrayEquals(FIELD_TYPES, 
newKafkaTableSink.getFieldTypes());
    +           assertEquals(new RowTypeInfo(FIELD_TYPES), 
newKafkaTableSink.getOutputType());
        }
     
        protected KafkaPartitioner<Row> createPartitioner() {
                return new CustomPartitioner();
        }
     
        protected Properties createSinkProperties() {
    --- End diff --
    
    Please add a Properties object (non-empty) as a final member variable to 
the test base which is used. This will make the equals check more meaningful.


> Add a Kafka TableSink with JSON serialization
> ---------------------------------------------
>
>                 Key: FLINK-3874
>                 URL: https://issues.apache.org/jira/browse/FLINK-3874
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Fabian Hueske
>            Assignee: Ivan Mushketyk
>            Priority: Minor
>
> Add a TableSink that writes JSON serialized data to Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to